from __future__ import annotations
import math
from datetime import timedelta
import pandas as pd
from meteo_qc._data import register
from meteo_qc._data import Result
pd.options.mode.chained_assignment = None
[docs]
def infer_freq(s: pd.Series[float]) -> str | None:
"""Infer the frequency of a :func:`pd.DateTimeIndex` by copying the dates
and shifting them by one timestamp then subtracting the dates from each
other taking the minimum.
:param s: a :func:`pd.Series` with a :func:`pd.DateTimeIndex`.
:returns: if the series is too short (< 3) ``None`` since the frequency
cannot be inferred. Else a ``freqstr`` e.g. ``10min``.
"""
# pd.infer_freq is not working with values missing. Instead compute the
# minimum frequency
# shift the (sorted) index by one
if len(s) < 3:
return None
idx_diff = s.index[1:] - s.index[:-1]
offset = pd.tseries.frequencies.to_offset(idx_diff.min())
freq = None
if offset is not None: # pragma no branch
freq = offset.freqstr
# pd.to_timedelta does not work with min, but needs 1min instead
if freq is not None and not freq[0].isdigit():
return f'1{freq}'
return freq
def _has_spikes_or_dip(
s: pd.Series[float],
delta: float,
) -> tuple[bool, pd.DataFrame]:
df = s.to_frame()
def _compare(s: pd.Series[float]) -> bool:
if len(s) == 1:
return False
diff = abs(s.iloc[0] - s.iloc[1])
if math.isnan(diff):
return False
return bool(diff > delta)
df['flag'] = df.rolling(
window=2,
min_periods=1,
closed='right',
).apply(_compare)
# if there are not enough valid obervations, rolling returns NaN.
# Converting this to a bool results in True sind float('nan') is truthy
# set all NaN to False, since we don't want to flag them here
df['flag'] = df['flag'].replace([float('nan')], [0.0]).astype(bool)
# TODO: also return where, and make sure the spike or dip is labelled
# correctly with surroundings, maybe an additional rolling?
data: pd.DataFrame = df[df['flag'] == True] # noqa: E712
return bool(df['flag'].any()), data
def _is_persistent(
s: pd.Series[float],
window: int,
excludes: list[float],
) -> tuple[bool, pd.DataFrame]:
df = s.to_frame()
df['flag'] = False
if len(df) <= window:
return False, df[df['flag'] == True] # noqa: E712
def _equals(x: pd.Series[float]) -> bool:
if len(x) >= window:
first_val = x.iloc[0]
return bool(((x == first_val) & (~x.isin(excludes))).all())
else:
return False
df['flag'] = df[s.name].rolling(
window=window,
min_periods=1,
closed='right',
).apply(_equals).astype(bool)
data: pd.DataFrame = df[df['flag'] == True] # noqa: E712
return bool(df['flag'].any()), data
[docs]
@register('temperature', lower_bound=-40, upper_bound=50)
@register('dew_point', lower_bound=-60, upper_bound=50)
@register('relhum', lower_bound=10, upper_bound=100)
@register('windspeed', lower_bound=0, upper_bound=30)
@register('winddirection', lower_bound=0, upper_bound=360)
@register('pressure', lower_bound=860, upper_bound=1055)
def range_check(
s: pd.Series[float],
lower_bound: float,
upper_bound: float,
) -> Result:
"""
A check function checking if values in the :func:`pd.Series` `s` are within
a range.
This function can be used to write your own custom range checks.
:param s: the :func:`pd.Series` to be checked
:param lower_bound: the lower bound of the allowed values (inclusive)
:param upper_bound: the lower bound of the allowed values (inclusive)
:returns: a :func:`meteo_qc.Result` object containing the outcome of the
applied check.
"""
df = s.to_frame()
df['flag'] = False
df['flag'] = (
df.iloc[:, 0].lt(lower_bound) |
df.iloc[:, 0].gt(upper_bound)
)
if df.index.name is None:
date_name = 'index'
else:
date_name = df.index.name
df = df.reset_index()
# we need something json serializable
# timestamp to milliseconds
df[date_name] = df[date_name].astype(int) // 1000000
# replace NaNs with NULLs, since json tokenizing can't handle them
df = df.replace([float('nan')], [None])
result = bool(df['flag'].any())
if result is True:
return Result(
function=range_check.__name__,
passed=False,
msg=f'out of allowed range of [{lower_bound} - {upper_bound}]',
data=df[df['flag'] == True].values.tolist(), # noqa: E712
)
else:
return Result(function=range_check.__name__, passed=True)
[docs]
@register('temperature', delta=0.3)
@register('dew_point', delta=0.3)
@register('relhum', delta=4)
@register('pressure', delta=0.3)
def spike_dip_check(s: pd.Series[float], delta: float) -> Result:
"""
A check function checking if values in the :func:`pd.Series` `s` have
sudden spikes or dips.
This function can be used to write your own custom spike dip checks.
:param s: the :func:`pd.Series` to be checked
:param delta: maximum allowed change per minute
:returns: a :func:`meteo_qc.Result` object containing the outcome of the
applied check.
"""
assert isinstance(s.index, pd.DatetimeIndex)
freqstr = s.index.freqstr
if freqstr is None:
freqstr = infer_freq(s)
if freqstr is None:
return Result(
function=spike_dip_check.__name__,
passed=False,
msg='cannot determine temporal resolution frequency',
)
freq_delta = pd.to_timedelta(freqstr)
_delta = (freq_delta.total_seconds() / 60) * delta
# reindex if values are missing
full_idx = pd.date_range(s.index.min(), s.index.max(), freq=freqstr)
s = s.reindex(full_idx)
result, df = _has_spikes_or_dip(s, delta=_delta)
if df.index.name is None:
date_name = 'index'
else: # pragma: no cover
date_name = df.index.name
df = df.reset_index()
# we need something json serializable
# timestamp to milliseconds
df[date_name] = df[date_name].astype(int) // 1000000
# replace NaNs with NULLs, since json tokenizing can't handle them
df = df.replace([float('nan')], [None])
if result is True:
return Result(
function=spike_dip_check.__name__,
passed=False,
msg=(
f'spikes or dips detected. Exceeded allowed delta of '
f'{delta} / min'
),
data=df.values.tolist(),
)
else:
return Result(function=spike_dip_check.__name__, passed=True)
[docs]
@register('temperature', window=timedelta(hours=2))
@register('dew_point', window=timedelta(hours=2))
@register('windspeed', window=timedelta(hours=5))
@register('relhum', window=timedelta(hours=5))
@register('pressure', window=timedelta(hours=6))
def persistence_check(
s: pd.Series[float],
window: timedelta,
excludes: list[float] = [],
) -> Result:
"""
A check function checking if values in the :func:`pd.Series` ``s`` are
persistent for a certain amount of time. "stuck values".
This function can be used to write your own custom persistence checks.
:param s: the :func:`pd.Series` to be checked
:param window: a timedelta after which the values must have changed
:param excludes: values to exclude from the check e.g. useful for radiation
or precipitation parameters that are ``0`` during the night or ``0``
without precipitation
:returns: a :func:`meteo_qc.Result` object containing the outcome of the
applied check.
"""
assert isinstance(s.index, pd.DatetimeIndex)
freqstr = s.index.freqstr
if freqstr is None:
freqstr = infer_freq(s)
if freqstr is None:
return Result(
function=persistence_check.__name__,
passed=False,
msg='cannot determine temporal resolution frequency',
)
freq_delta = pd.to_timedelta(freqstr)
timestamps_per_interval = window // freq_delta
# reindex if values are missing
full_idx = pd.date_range(s.index.min(), s.index.max(), freq=freqstr)
s = s.reindex(full_idx)
result, df = _is_persistent(
s,
window=timestamps_per_interval,
excludes=excludes,
)
if df.index.name is None:
date_name = 'index'
else: # pragma: no cover
date_name = df.index.name
df = df.reset_index()
# we need something json serializable
# timestamp to milliseconds
df[date_name] = df[date_name].astype(int) // 1000000
# replace NaNs with NULLs, since json tokenizing can't handle them
df = df.replace([float('nan')], [None])
if result is True:
return Result(
function=persistence_check.__name__,
passed=False,
msg=f'some values are the same for longer than {window}',
data=df.values.tolist(),
)
else:
return Result(function=persistence_check.__name__, passed=True)