Source code for meteo_qc._main

from __future__ import annotations

from collections import defaultdict
from datetime import tzinfo
from typing import TypedDict

import pandas as pd

from meteo_qc._colum_mapping import ColumnMapping
from meteo_qc._data import FUNCS
from meteo_qc._data import Result


class ColumnResult(TypedDict):
    results: dict[str, Result]
    passed: bool


[docs] class FinalResult(TypedDict): """ Final Result dictionary of the quality control. :param columns: column that were quality controlled, mapping to a dictionary of ``results`` being another dictionary mapping the check function to to its Result. :param passed: did the the entire quality control pass (all checks) :param data_start_date: timestamp in milliseconds of the **start** date of the provided input data :param data_end_date: timestamp in milliseconds of the **end** date of the provided input data. """ columns: dict[str, ColumnResult] passed: bool data_start_date: int data_end_date: int
[docs] def apply_qc(df: pd.DataFrame, column_mapping: ColumnMapping) -> FinalResult: """ Apply the quality control to a a ``pandas.DataFrame``. :param df: The DataFrame the quality control should be applied to :param column_mapping: A column mapping (:func:`meteo_qc.ColumnMapping`), that assigns groups to columns. See :func:`meteo_qc.ColumnMapping` for more information on how to create and customize one. :returns: A result as json serializable dictionary to be rendered in a an HTML template. .. code-block:: python { "columns": { { "temp": { "passed": False, "results": { "missing_timestamps": Result( function="missing_timestamps", passed=False, msg="missing 1 timestamps (assumed frequency: 10min)", data=None, ), "null_values": Result( function="null_values", passed=False, msg="found 7 values that are null", data=[ [1641034800000, None, True], [1641038400000, None, True], [1641042000000, None, True], [1641045600000, None, True], [1641049200000, None, True], [1641052800000, None, True], [1641056400000, None, True], ], ), "persistence_check": Result( function="persistence_check", passed=True, msg=None, data=None ), "range_check": Result( function="range_check", passed=True, msg=None, data=None ), "spike_dip_check": Result( function="spike_dip_check", passed=True, msg=None, data=None ), }, }, }, ... }, "data_end_date": 1641056400000, "data_start_date": 1641031200000, "passed": False, } """ # noqa: E501 if not isinstance(df.index, pd.DatetimeIndex): raise TypeError( f'the pandas.DataFrame index must be of type pandas.DatetimeIndex,' f' not {type(df.index)}', ) elif not isinstance(df.index.tzinfo, tzinfo): raise TypeError('the pandas.DataFrame index must be timezone aware') final_res: FinalResult = { 'columns': defaultdict( lambda: {'results': {}, 'passed': False}, ), 'passed': False, 'data_start_date': int(df.index.min().timestamp() * 1000), 'data_end_date': int(df.index.max().timestamp() * 1000), } # sort the data by the DateTimeIndex df_sorted = df.sort_index() for column in df_sorted.columns: # all groups associated with this column qc_types = column_mapping[column] final_res_col = final_res['columns'][column] for qc_type in qc_types: # all functions registered for this group registerd_funcs = FUNCS[qc_type] for func in registerd_funcs: call_result = func['func'](df_sorted[column], **func['kwargs']) final_res_col['results'][func['func'].__name__] = call_result # check if entire column passed final_res_col['passed'] = all( (i.passed for i in final_res_col['results'].values()), ) # check if the entire QC failed final_res['passed'] = all( (i['passed'] for i in final_res['columns'].values()), ) return final_res