Skip to content

Commit

Permalink
feat(engine): added main engine mvp (#7)
Browse files Browse the repository at this point in the history
Features:
- Run automatically whole bundle of available engines
- Graceful errors of individual tests (core)
  • Loading branch information
UrbanoFonseca authored Jul 25, 2021
1 parent 7e54a2d commit 41993ca
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 20 deletions.
6 changes: 6 additions & 0 deletions src/ydata_quality/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""
YData open-source lib for Data Quality.
"""
from ydata_quality.core.data_quality import DataQuality

from .version import __version__


__all__ = [
"DataQuality"
]
6 changes: 3 additions & 3 deletions src/ydata_quality/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
Core functionality for Data Quality analysis.
"""

from ydata_quality.core.warnings import QualityWarning
from ydata_quality.core.engine import QualityEngine
from ydata_quality.core.warnings import QualityWarning

__all__ = [
"QualityWarning",
"QualityEngine"
"QualityEngine",
"QualityWarning"
]
90 changes: 84 additions & 6 deletions src/ydata_quality/core/data_quality.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,98 @@
"""
Implementation of main class for Data Quality checks.
"""
from typing import List, Union, Optional, Callable

import pandas as pd

from ydata_quality.core.warnings import QualityWarning, Priority
from ydata_quality.duplicates import DuplicateChecker
from ydata_quality.labelling import LabelInspector
from ydata_quality.missings import MissingsProfiler
from ydata_quality.valued_missing_values import VMVIdentifier
from ydata_quality.drift import DriftAnalyser

class DataQuality:
"DataQuality gathers the multiple data quality engines."
"DataQuality contains the multiple data quality engines."

def __init__(self,
df: pd.DataFrame,
label: str = None,
entities: List[Union[str, List[str]]] = [],
vmv_extensions: Optional[list]=[],
sample: Optional[pd.DataFrame] = None,
model: Callable = None
):
"""
Engines:
- Duplicates
- Missing Values
- Labelling
- Valued Missing Values
- Drift Analysis
def __init__(self, df: pd.DataFrame):
Args:
df (pd.DataFrame): reference DataFrame used to run the DataQuality analysis.
label (str, optional): [MISSINGS, LABELLING, DRIFT ANALYSIS] target feature to be predicted.
If not specified, LABELLING is skipped.
entities: [DUPLICATES] entities relevant for duplicate analysis.
vmv_extensions: [VALUED MISSING VALUES] A list of user provided valued missing values to append to defaults.
sample: [DRIFT ANALYSIS] data against which drift is tested.
model: [DRIFT ANALYSIS] model wrapped by ModelWrapper used to test concept drift.
"""
self.df = df
self._warnings = set()
self._engines = { # Default list of engines
'duplicates': DuplicateChecker(df=df, entities=entities),
'missings': MissingsProfiler(df=df, target=label),
'valued-missing-values': VMVIdentifier(df=df, vmv_extensions=vmv_extensions),
'drift-analysis': DriftAnalyser(ref=df, sample=sample, label=label, model=model)
}

# Engines based on mandatory arguments
if label is not None:
self._engines['labelling'] = LabelInspector(df=df, label=label)
else:
print('Label is not defined. Skipping LABELLING engine.')


@property
def warnings(self):
"Set of warnings generated by individual QualityEngines."
return self._warnings

def get_warnings(self,
category: Optional[str] = None,
test: Optional[str] = None,
priority: Optional[Priority] = None) -> List[QualityWarning]:
"Retrieves warnings filtered by their properties."
filtered = list(self.warnings) # convert original set
filtered = [w for w in filtered if w.category == category] if category else filtered
filtered = [w for w in filtered if w.test == test] if test else filtered
filtered = [w for w in filtered if w.priority == Priority(priority)] if priority else filtered
filtered.sort() # sort by priority
return filtered

@property
def engines(self):
"Dictionary of instantiated engines to run data quality analysis."
return self._engines

def __store_warnings(self):
"Appends all warnings from individiual engines into warnings of DataQuality main class."
for engine in self.engines.values():
self._warnings = self._warnings.union(set(engine.get_warnings()))

def evaluate(self):
"Runs all the individual data quality checks and aggregates the results."
raise NotImplementedError

results = {name: engine.evaluate() for name, engine in self.engines.items()}
self.__store_warnings()
return results

def report(self):
"Returns a full list of warnings retrieved during the Data Quality checks."
raise NotImplementedError
"Prints a report containing all the warnings detected during the data quality analysis."
# TODO: Provide a count of warnings by priority
self.__store_warnings() # fetch all warnings from the engines
for warn in self.get_warnings():
print(warn)

17 changes: 12 additions & 5 deletions src/ydata_quality/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from typing import Optional

import pandas as pd
from ydata_quality.core import QualityWarning
from ydata_quality.core.warnings import Priority
from ydata_quality.core.warnings import QualityWarning, Priority
from ydata_quality.utils.modelling import infer_dtypes

class QualityEngine(ABC):
Expand Down Expand Up @@ -76,11 +75,12 @@ def get_warnings(self,
test: Optional[str] = None,
priority: Optional[Priority] = None):
"Retrieves warnings filtered by their properties."
filtered = self.warnings # original set
filtered = list(self.warnings) # convert original set
filtered = [w for w in filtered if w.category == category] if category else filtered
filtered = [w for w in filtered if w.test == test] if test else filtered
filtered = [w for w in filtered if w.priority == Priority(priority)] if priority else filtered
return set(filtered)
filtered.sort() # sort by priority
return filtered

@property
def tests(self):
Expand All @@ -97,4 +97,11 @@ def report(self):
def evaluate(self):
"Runs all the indidividual tests available within the same suite. Returns a dict of (name: results)."
self._warnings = set() # reset the warnings to avoid duplicates
return {test: getattr(self, test)() for test in self.tests}
results = {}
for test in self.tests:
try: # if anything fails
results[test] = getattr(self, test)()
except Exception as exc: # print a Warning and log the message
print(f'WARNING: Skipping test {test} due to failure during computation.')
results[test] = "[ERROR] Test failed to compute. Original exception: "+f"{exc}"
return results
1 change: 0 additions & 1 deletion src/ydata_quality/drift/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from scipy.stats import ks_2samp
from scipy.stats._continuous_distns import chi2_gen
from ydata_quality.core import QualityEngine, QualityWarning
from ydata_quality.utils.modelling import infer_dtypes


class ModelWrapper:
Expand Down
9 changes: 4 additions & 5 deletions src/ydata_quality/valued_missing_values/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, df: pd.DataFrame, vmv_extensions: Optional[list]=[]):
self._flatline_index = {}
self.__default_index_name = '__index'
self.vmvs = vmv_extensions

@property
def default_vmvs(self):
"""Returns the default list of Valued Missing Values.
Expand Down Expand Up @@ -61,7 +61,7 @@ def __get_flatline_index(self, column_name: str, th: Optional[int] = 1):
if column_name == self.__default_index_name:
df[self.__default_index_name] = df.index # Index now in columns to be processed next
column = df[column_name]
column.fillna('__filled') # So NaN values are considered
column.fillna('__filled') # So NaN values are considered
sequence_indexes = column.ne(column.shift()).cumsum() # Everytime shifted value is different from previous a new sequence starts
sequence_groups = column.index.to_series().groupby(sequence_indexes) # Group series indexes by sequence indexes
data = {'length': sequence_groups.count().values,
Expand Down Expand Up @@ -102,7 +102,6 @@ def predefined_valued_missing_values(self, skip: list=[], short: bool = True):
Raises warning based on the existence of these values.
VMVs of string type are case insensitive during search.
Returns a DataFrame with count distribution for each predefined type over each column.
The result DataFrame will ommit any
Arguments:
skip: List of columns that will not be target of search for vmvs.
Pass '__index' in skip to skip looking for flatlines at the index.
Expand All @@ -122,7 +121,7 @@ def predefined_valued_missing_values(self, skip: list=[], short: bool = True):
vmvs.drop(no_vmv_rows, inplace=True)
if vmvs.empty:
print("[PREDEFINED VALUED MISSING VALUES] No predefined vmvs from the set {} were found in the dataset.".format(
self.predefined_valued_missing_values
self.vmvs
))
else:
total_vmvs = vmvs.sum().sum()
Expand All @@ -131,4 +130,4 @@ def predefined_valued_missing_values(self, skip: list=[], short: bool = True):
test='Predefined Valued Missing Values', category='Valued Missing Values', priority=2, data=vmvs,
description=f"Found {total_vmvs} vmvs in the dataset."
))
return vmvs
return vmvs
231 changes: 231 additions & 0 deletions tutorials/data_quality.ipynb

Large diffs are not rendered by default.

0 comments on commit 41993ca

Please sign in to comment.