Source code for saqc.parsing.reader

#! /usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
# SPDX-License-Identifier: GPL-3.0-or-later
# -*- coding: utf-8 -*-

from __future__ import annotations

import ast
import io
import json
import logging
import textwrap
from pathlib import Path
from typing import Any, Dict, Iterable, List, Sequence, TextIO, Tuple
from urllib.request import urlopen

import pandas as pd

from saqc import SaQC
from saqc.exceptions import ParsingError
from saqc.lib.tools import isQuoted
from saqc.parsing.visitor import ConfigFunctionParser


def _readLines(
    it: Iterable[str], column_sep=";", comment_prefix="#", skip=0
) -> pd.DataFrame:
    out = []
    for i, line in enumerate(it):
        if (skip := skip - 1) > 0:
            continue
        if not (row := line.strip().split(comment_prefix, 1)[0]):
            continue
        parts = [p.strip() for p in row.split(column_sep)]
        if len(parts) != 2:
            raise ParsingError(
                f"The configuration format expects exactly two "
                f"columns, one for the variable name and one for "
                f"the tests, but {len(parts)} columns were found "
                f"in line {i}.\n\t{line!r}"
            )
        out.append([i + 1] + parts)
    if not out:
        raise ParsingError("Config file is empty")
    return pd.DataFrame(out[1:], columns=["lineno", "varname", "test"]).set_index(
        "lineno"
    )


def readFile(fname, skip=1) -> pd.DataFrame:
    """Read and parse a config file to a DataFrame"""

    def _open(file_or_buf) -> TextIO:
        if not isinstance(file_or_buf, (str, Path)):
            return file_or_buf
        try:
            fh = io.open(file_or_buf, "r", encoding="utf-8")
        except (OSError, ValueError):
            fh = io.StringIO(urlopen(str(file_or_buf)).read().decode("utf-8"))
            fh.seek(0)
        return fh

    def _close(fh):
        try:
            fh.close()
        except AttributeError:
            pass

    # mimic `with open(): ...`
    file = _open(fname)
    try:
        return _readLines(file, skip=skip)
    finally:
        _close(file)


[docs] def fromConfig(fname, *args, **func_kwargs): return _ConfigReader(*args, **func_kwargs).readCsv(fname).run()
class _ConfigReader: logger: logging.Logger saqc: SaQC file: str | None config: pd.DataFrame | None parsed: List[Tuple[Any, ...]] | None regex: bool | None varname: str | None lineno: int | None field: str | None test: str | None func: str | None func_kws: Dict[str, Any] | None def __init__(self, *args, **kwargs): self.logger = logging.getLogger(self.__class__.__name__) self.saqc = SaQC(*args, **kwargs) self.file = None self.config = None self.parsed = None self.regex = None self.varname = None self.lineno = None self.field = None self.test = None self.func = None self.func_kws = None def readCsv(self, file: str, skip=1): self.logger.debug(f"opening csv file: {file}") self.config = readFile(file, skip=skip) self.file = file return self def readRecords(self, seq: Sequence[Dict[str, Any]]): self.logger.debug(f"read records: {seq}") df = pd.DataFrame.from_records(seq) df.columns = ["varname", "func", "kwargs"] kws = df["kwargs"].apply( lambda e: ", ".join([f"{k}={v}" for k, v in e.items()]) ) df["test"] = df["func"] + "(" + kws + ")" self.config = df.loc[:, ["varname", "test"]].copy() return self def _readJson(self, d, unpack): if unpack is not None: d = unpack(d) elif isinstance(d, dict): raise TypeError("parsed json resulted in a dict, but a array/list is need") return self.readRecords(d) def readJson(self, file: str, unpack: callable | None = None): self.logger.debug(f"opening json file: {file}") with open(file, "r") as fh: d = json.load(fh) self.file = file return self._readJson(d, unpack) def readJsonString(self, jn: str, unpack: callable | None = None): self.logger.debug(f"read json string: {jn}") d = json.loads(jn) return self._readJson(d, unpack) def readString(self, s: str, line_sep="\n", column_sep=";"): self.logger.debug(f"read config string: {s}") lines = s.split(line_sep) self.config = _readLines(lines, column_sep=column_sep) return self def _parseLine(self): self.logger.debug(f"parse line {self.lineno}: {self.varname!r}; {self.test!r}") self.regex = isQuoted(self.varname) self.field = self.varname[1:-1] if self.regex else self.varname try: tree = ast.parse(self.test, mode="eval").body func, kws = ConfigFunctionParser().parse(tree) except Exception as e: # We raise a NEW exception here, because the # traceback hold no relevant info for a CLI user. err = type(e) if isinstance(e, NameError) else ParsingError meta = self._getFormattedInfo( "The exception occurred during parsing of a config" ) if hasattr(e, "add_note"): # python 3.11+ e = err(*e.args) e.add_note(meta) raise e from None raise err(str(e) + meta) from None if "field" in kws: kws["target"] = self.field else: kws["field"] = self.field self.func = func self.func_kws = kws def _execLine(self): self.logger.debug( f"execute line {self.lineno}: {self.varname!r}; {self.test!r}" ) # We explicitly route all function calls through SaQC.__getattr__ # in order to do a FUNC_MAP lookup. Otherwise, we wouldn't be able # to overwrite existing test functions with custom register calls. try: self.saqc = self.saqc.__getattr__(self.func)( regex=self.regex, **self.func_kws ) except Exception as e: # We use a special technique for raising here, because # we want this location of rising, line up in the traceback, # instead of showing up at last (most relevant). Also, we # want to include some meta information about the config. meta = self._getFormattedInfo( "The exception occurred during execution of a config" ) if hasattr(e, "add_note"): # python 3.11+ e.add_note(meta) raise e raise type(e)(str(e) + meta).with_traceback(e.__traceback__) from None def _getFormattedInfo(self, msg=None, indent=2): prefix = " " * indent info = textwrap.indent( f"file: {self.file!r}\n" f"line: {self.lineno}\n" f"varname: {self.varname!r}\n" f"test: {self.test!r}\n", prefix, ) if msg: info = textwrap.indent(f"{msg}\n{info}", prefix) return f"\n{info}" def run(self): """Parse and execute a config line by line.""" assert self.config is not None for lineno, varname, test in self.config.itertuples(): self.lineno = lineno self.varname = varname self.test = test self._parseLine() self._execLine() return self.saqc