Skip to content

Commit

Permalink
qc: save results in SQLite DB
Browse files Browse the repository at this point in the history
  • Loading branch information
abhidg committed Oct 10, 2023
1 parent 3f02fe2 commit 5e0ed09
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.linting.flake8Enabled": true,
"python.linting.enabled": true
"python.linting.enabled": true,
"python.analysis.typeCheckingMode": "basic"
}
5 changes: 3 additions & 2 deletions adtl/qc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ def wrapper(df, **kwargs):
for c in set(columns) - set(df.columns):
df[c] = None
series = func(df, **kwargs)
assert len(series) == len(df), \
"Returned series must have same cardinality as source dataframe"
assert len(series) == len(
df
), "Returned series must have same cardinality as source dataframe"
rows_fail_idx = [i for i, val in enumerate(series) if val is False]
if isinstance(series, (pd.Series, np.ndarray)):
rows_success: int = series.sum()
Expand Down
76 changes: 64 additions & 12 deletions adtl/qc/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
Quality Control module for ADTL, runner submodule
"""
import sys
import copy
import json
import argparse
import functools
import importlib
import sqlite3
import multiprocessing
from typing import List
from typing import List, Optional, Dict, Any
from pathlib import Path
from collections import defaultdict
from fnmatch import fnmatch
Expand All @@ -17,6 +21,25 @@

DEFAULT_PATTERN = "*.csv"

DDL_RESULTS = """CREATE TABLE IF NOT EXISTS results (
rule TEXT,
dataset TEXT,
file TEXT,
rows_success INTEGER,
rows_fail INTEGER,
ratio_success REAL,
rows_fail_idx TEXT,
success INTEGER,
mostly REAL,
fail_data TEXT
)"""

INSERT_RESULTS = """INSERT INTO results VALUES (
:rule, :dataset, :file, :rows_success,
:rows_fail, :ratio_success, :rows_fail_idx,
:success, :mostly, :fail_data
)"""


def collect_datasets(
root: Path = Path("."), file_formats: List[str] = ["csv"]
Expand All @@ -27,7 +50,7 @@ def collect_datasets(
folders = defaultdict(list)
for f in files:
folders[f.parent.stem].append(f)
return [dict(dataset=folder, files=folders[folder]) for folder in folders]
return [Dataset(dataset=folder, files=folders[folder]) for folder in folders]


def collect_rules(root: Path = Path("qc")) -> List[Rule]:
Expand All @@ -46,7 +69,7 @@ def collect_rules(root: Path = Path("qc")) -> List[Rule]:

def make_rule(module, rule_name: str) -> Rule:
r = getattr(module, rule_name)
return dict(
return Rule(
module=module.__name__,
name=r.__name__,
description=r.__doc__,
Expand Down Expand Up @@ -74,7 +97,25 @@ def collect_work_units(datasets: List[Dataset], rules: List[Rule]) -> List[WorkU
return out


def process_work_unit(unit: WorkUnit) -> WorkUnitResult:
def prepare_result_for_insertion(work_unit_result: WorkUnitResult) -> Dict[str, Any]:
result: Dict[str, Any] = copy.deepcopy(work_unit_result) # type: ignore
result["fail_data"] = (
None
if result["fail_data"].empty
else json.dumps(result["fail_data"].to_dict(orient="records"))
)
result["rows_fail_idx"] = (
None
if not result["rows_fail_idx"]
else ",".join(map(str, result["rows_fail_idx"]))
)
result["dataset"] = work_unit_result["dataset"]["dataset"]
result["file"] = str(result["file"])
result["success"] = bool(result["success"])
return result


def process_work_unit(unit: WorkUnit, save_db: Optional[str] = None) -> WorkUnitResult:
rule = unit["rule"]
module = importlib.import_module(rule["module"])
rule_function = getattr(module, rule["name"])
Expand All @@ -84,25 +125,37 @@ def process_work_unit(unit: WorkUnit) -> WorkUnitResult:
result.update(
dict(rule=unit["rule"]["name"], dataset=unit["dataset"], file=unit["file"])
)
if save_db:
con = sqlite3.connect(save_db)
cur = con.cursor()
cur.execute(DDL_RESULTS)
pprint(prepare_result_for_insertion(result))
cur.execute(INSERT_RESULTS, prepare_result_for_insertion(result))
con.commit()
return result


def start(
data_path: Path,
rules_path: Path = Path("qc"),
data_file_formats: List[str] = ["csv"],
store_database: Optional[str] = None,
) -> List[WorkUnitResult]:
rules = collect_rules(rules_path)
datasets = collect_datasets(data_path, data_file_formats)
work_units = collect_work_units(datasets, rules)
pool = multiprocessing.Pool()
return pool.map(process_work_unit, work_units)
process_work_unit_db = functools.partial(process_work_unit, save_db=store_database)
return pool.map(process_work_unit_db, work_units)


def _main(args=None):
parser = argparse.ArgumentParser(prog="adtl-qc", description="ADTL Quality Control")
parser.add_argument("data", help="path to datasets")
parser.add_argument("-r", "--rule-root", help="path to rules", default="qc")
parser.add_argument(
"-d", "--database", help="Database to store QC results", default="adtl-qc.db"
)
parser.add_argument(
"--format",
help="file formats (comma separated) to include in datasets",
Expand All @@ -112,10 +165,9 @@ def _main(args=None):
"-n", "--no-report", help="do not generate HTML report", action="store_true"
)
args = parser.parse_args(args)
pprint(
start(
Path(args.data),
Path(args.rule_root),
data_file_formats=args.format.split(","),
)
)
start(
Path(args.data),
Path(args.rule_root),
data_file_formats=args.format.split(","),
store_database=args.database,
)

0 comments on commit 5e0ed09

Please sign in to comment.