Source code for saqc.core.translation.dmpscheme

#! /usr/bin/env python

# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later

# -*- coding: utf-8 -*-

from __future__ import annotations

import json

import numpy as np
import pandas as pd

from saqc import BAD, DOUBTFUL, GOOD, UNFLAGGED
from saqc.core import Flags, History
from saqc.core.frame import DictOfSeries
from saqc.core.translation.basescheme import BackwardMap, ForwardMap, MappingScheme

_QUALITY_CAUSES = [
    "",
    "BATTERY_LOW",
    "BELOW_MINIMUM",
    "ABOVE_MAXIMUM",
    "BELOW_OR_ABOVE_MIN_MAX",
    "ISOLATED_SPIKE",
    "DEFECTIVE_SENSOR",
    "LEFT_CENSORED_DATA",
    "RIGHT_CENSORED_DATA",
    "OTHER",
    "AUTOFLAGGED",
]

_QUALITY_LABELS = [
    "quality_flag",
    "quality_cause",
    "quality_comment",
]


[docs] class DmpScheme(MappingScheme): """ Implements the translation from and to the flagging scheme implemented in the UFZ - Datamanagementportal """ ARGUMENTS = {"comment": "", "cause": "OTHER"} DFILTER_DEFAULT = GOOD + 1 _FORWARD: ForwardMap = { "NIL": UNFLAGGED, "OK": GOOD, "DOUBTFUL": DOUBTFUL, "BAD": BAD, } _BACKWARD: BackwardMap = { UNFLAGGED: "NIL", np.nan: "NIL", GOOD: "OK", DOUBTFUL: "DOUBTFUL", BAD: "BAD", } def __init__(self): super().__init__(forward=self._FORWARD, backward=self._BACKWARD)
[docs] def toHistory(self, flags: pd.DataFrame): """ Translate a single field of external ``Flags`` to a ``History`` """ history = History(flags.index) for (flag, cause, comment), values in flags.groupby(_QUALITY_LABELS): if cause == "" and comment == "": continue try: comment = json.loads(comment) except json.decoder.JSONDecodeError: comment = {"test": "unknown", "comment": ""} column = pd.Series(np.nan, index=flags.index) column.loc[values.index] = self(flag) meta = { "func": comment["test"], "kwargs": {"comment": comment["comment"], "cause": cause}, } history.append(column, meta=meta) return history
[docs] def toInternal(self, flags: pd.DataFrame | DictOfSeries) -> Flags: """ Translate from 'external flags' to 'internal flags' Parameters ---------- df : pd.DataFrame The external flags to translate Returns ------- Flags object """ if isinstance(flags, pd.DataFrame): flags = DictOfSeries(flags) self.validityCheck(flags) data = {} if isinstance(flags, pd.DataFrame): fields = flags.columns.get_level_values(0).drop_duplicates() else: fields = flags.keys() for field in fields: data[str(field)] = self.toHistory(flags[field]) return Flags(data)
[docs] def toExternal( self, flags: Flags, attrs: dict | None = None, **kwargs ) -> DictOfSeries: """ Translate from 'internal flags' to 'external flags' Parameters ---------- flags : The external flags to translate attrs : dict or None, default None global meta information of saqc-object Returns ------- translated flags """ tflags = super().toExternal(flags, attrs=attrs) out = DictOfSeries() for field in tflags.columns: df = pd.DataFrame( { "quality_flag": tflags[field], "quality_cause": "", "quality_comment": "", } ) history = flags.history[field] for col in history.columns: valid = (history.hist[col] != UNFLAGGED) & history.hist[col].notna() # extract from meta meta = history.meta[col] keywords = meta.get("kwargs", {}) comment = json.dumps( { "test": meta.get("func", "unknown"), "comment": keywords.get("comment", self.ARGUMENTS["comment"]), } ) cause = keywords.get("cause", self.ARGUMENTS["cause"]) df.loc[valid, "quality_comment"] = comment df.loc[valid, "quality_cause"] = cause out[field] = df self.validityCheck(out) return out
[docs] @classmethod def validityCheck(cls, flags: DictOfSeries) -> None: """ Check wether the given causes and comments are valid. Parameters ---------- df : external flags """ for df in flags.values(): if not df.columns.isin(_QUALITY_LABELS).all(axis=None): raise TypeError( f"DMP-Flags expect the labels {list(_QUALITY_LABELS)} in the secondary level" ) flags = df["quality_flag"] causes = df["quality_cause"] comments = df["quality_comment"] if not flags.isin(cls._FORWARD.keys()).all(axis=None): raise ValueError( f"invalid quality flag(s) found, only the following values are supported: {set(cls._FORWARD.keys())}" ) if not causes.isin(_QUALITY_CAUSES).all(axis=None): raise ValueError( f"invalid quality cause(s) found, only the following values are supported: {_QUALITY_CAUSES}" ) if (~flags.isin(("OK", "NIL")) & (causes == "")).any(axis=None): raise ValueError( "quality flags other than 'OK and 'NIL' need a non-empty quality cause" ) if ((causes == "OTHER") & (comments == "")).any(axis=None): raise ValueError( "quality cause 'OTHER' needs a non-empty quality comment" )