diff --git a/cuallee/__init__.py b/cuallee/__init__.py index ad01ef77..f29ac838 100644 --- a/cuallee/__init__.py +++ b/cuallee/__init__.py @@ -55,9 +55,11 @@ except (ModuleNotFoundError, ImportError): logger.debug("KO: BigQuery") + class CustomComputeException(Exception): pass + class CheckLevel(enum.Enum): """Level of verifications in cuallee""" @@ -1179,7 +1181,7 @@ def is_custom( fn (Callable): A function that receives a dataframe as input and returns a dataframe with at least 1 column as result pct (float): The threshold percentage required to pass """ - + (Rule("is_custom", column, fn, CheckDataType.AGNOSTIC, pct) >> self._rule) return self diff --git a/cuallee/dagster/__init__.py b/cuallee/dagster/__init__.py index f1aa29db..07bbc585 100644 --- a/cuallee/dagster/__init__.py +++ b/cuallee/dagster/__init__.py @@ -1,7 +1,7 @@ -from dagster import asset_check, AssetCheckResult -from cuallee import Check +from dagster import asset_check, AssetCheckResult, AssetCheckSpec, AssetCheckSeverity +from cuallee import Check, CheckLevel import pandas as pd -from typing import List +from typing import List, Iterator def make_dagster_checks( @@ -27,3 +27,40 @@ def _check(): checks.append(_check) return checks + + +def get_severity(check: Check): + if check.level == CheckLevel.ERROR: + return AssetCheckSeverity.ERROR + else: + return AssetCheckSeverity.WARN + + +def make_check_specs(check: Check, asset_name: str) -> List[AssetCheckSpec]: + """To be used in the @asset decorator as input for check_specs""" + return [ + AssetCheckSpec(name=f"{rule.method}.{rule.column}", asset=asset_name) + for rule in check.rules + ] + + +def yield_check_results( + check: Check, dataframe: pd.DataFrame +) -> Iterator[AssetCheckResult]: + """Used in checks inside an asset, to yield all cuallee validations""" + results = check.validate(dataframe) + + for item in results.itertuples(): + yield AssetCheckResult( + name=f"{item.method}.{item.column}", + passed=(item.status == "PASS"), + metadata={ + "level": item.level, + "rows": int(item.rows), + "column": item.column, + "value": str(item.value), + "violations": int(item.violations), + "pass_rate": item.pass_rate, + }, + severity=get_severity(check), + ) diff --git a/cuallee/pyspark_validation.py b/cuallee/pyspark_validation.py index ab022914..7a529218 100644 --- a/cuallee/pyspark_validation.py +++ b/cuallee/pyspark_validation.py @@ -594,19 +594,23 @@ def is_custom(self, rule: Rule): def _execute(dataframe: DataFrame, key: str): try: - assert isinstance(rule.value, Callable), "Please provide a Callable/Function for validation" + assert isinstance( + rule.value, Callable + ), "Please provide a Callable/Function for validation" computed_frame = rule.value(dataframe) - assert isinstance(computed_frame, DataFrame), "Custom function does not return a PySpark DataFrame" - assert len(computed_frame.columns) >= 1, "Custom function should retun at least one column" + assert isinstance( + computed_frame, DataFrame + ), "Custom function does not return a PySpark DataFrame" + assert ( + len(computed_frame.columns) >= 1 + ), "Custom function should retun at least one column" computed_column = last(computed_frame.columns) return computed_frame.select( F.sum(F.col(f"`{computed_column}`").cast("integer")).alias(key) ) - - except Exception as err: - raise CustomComputeException(str(err)) - + except Exception as err: + raise CustomComputeException(str(err)) self.compute_instruction = ComputeInstruction( predicate, _execute, ComputeMethod.TRANSFORM @@ -614,6 +618,7 @@ def _execute(dataframe: DataFrame, key: str): return self.compute_instruction + def _field_type_filter( dataframe: DataFrame, field_type: Union[ @@ -796,12 +801,12 @@ def summary(check: Check, dataframe: DataFrame) -> DataFrame: spark = SparkSession.builder.getOrCreate() def _value(x): - """ Removes verbosity for Callable values""" + """Removes verbosity for Callable values""" if isinstance(x, Callable): return "f(x)" else: return str(x) - + # Compute the expression computed_expressions = compute(check._rule) if (int(spark.version.replace(".", "")[:3]) < 330) or ( diff --git a/pyproject.toml b/pyproject.toml index 83816670..20c357d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cuallee" -version = "0.11.1" +version = "0.12.0" authors = [ { name="Herminio Vazquez", email="canimus@gmail.com"}, { name="Virginie Grosboillot", email="vestalisvirginis@gmail.com" } diff --git a/setup.cfg b/setup.cfg index ca26515c..438e04a1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] name = cuallee -version = 0.11.1 +version = 0.12.0 [options] packages = find: \ No newline at end of file diff --git a/test/unit/dagster_checks/test_methods.py b/test/unit/dagster_checks/test_methods.py index d250778e..93d9508e 100644 --- a/test/unit/dagster_checks/test_methods.py +++ b/test/unit/dagster_checks/test_methods.py @@ -1,7 +1,7 @@ -from cuallee.dagster import make_dagster_checks +from cuallee.dagster import make_dagster_checks, make_check_specs, yield_check_results from cuallee import Check, CheckLevel import pandas as pd - +from typing import Iterator def test_make_checks(): df = pd.DataFrame({"id": [1, 2, 3, 4, 5]}) @@ -9,3 +9,19 @@ def test_make_checks(): check.is_complete("id") result = make_dagster_checks(check, "AssetName", df) assert isinstance(result, list) + + +def test_make_check_specs(): + df = pd.DataFrame({"id": [1, 2, 3, 4, 5]}) + check = Check(CheckLevel.WARNING, "Dagster") + check.is_complete("id") + specs = make_check_specs(check, "test_asset") + assert isinstance(specs, list) + + +def test_yield_check_specs(): + df = pd.DataFrame({"id": [1, 2, 3, 4, 5]}) + check = Check(CheckLevel.WARNING, "Dagster") + check.is_complete("id") + results = yield_check_results(check, df) + assert isinstance(results, Iterator) diff --git a/test/unit/pyspark_dataframe/test_is_custom.py b/test/unit/pyspark_dataframe/test_is_custom.py index db9a6b7d..15abd6bb 100644 --- a/test/unit/pyspark_dataframe/test_is_custom.py +++ b/test/unit/pyspark_dataframe/test_is_custom.py @@ -27,13 +27,12 @@ def test_negative(spark): def test_parameters(spark): df = spark.range(10) with pytest.raises( - CustomComputeException, match="Please provide a Callable/Function for validation" + CustomComputeException, + match="Please provide a Callable/Function for validation", ): check = Check(CheckLevel.WARNING, "pytest") check.is_custom("id", "wrong value") check.validate(df) - - def test_coverage(spark):