Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(engine): added main engine mvp #7

Merged
merged 7 commits into from
Jul 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/ydata_quality/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""
YData open-source lib for Data Quality.
"""
from ydata_quality.core.data_quality import DataQuality

from .version import __version__


__all__ = [
"DataQuality"
]
6 changes: 3 additions & 3 deletions src/ydata_quality/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
Core functionality for Data Quality analysis.
"""

from ydata_quality.core.warnings import QualityWarning
from ydata_quality.core.engine import QualityEngine
from ydata_quality.core.warnings import QualityWarning

__all__ = [
"QualityWarning",
"QualityEngine"
"QualityEngine",
"QualityWarning"
]
90 changes: 84 additions & 6 deletions src/ydata_quality/core/data_quality.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,98 @@
"""
Implementation of main class for Data Quality checks.
"""
from typing import List, Union, Optional, Callable

import pandas as pd

from ydata_quality.core.warnings import QualityWarning, Priority
from ydata_quality.duplicates import DuplicateChecker
from ydata_quality.labelling import LabelInspector
from ydata_quality.missings import MissingsProfiler
from ydata_quality.valued_missing_values import VMVIdentifier
from ydata_quality.drift import DriftAnalyser

class DataQuality:
"DataQuality gathers the multiple data quality engines."
"DataQuality contains the multiple data quality engines."

def __init__(self,
df: pd.DataFrame,
label: str = None,
entities: List[Union[str, List[str]]] = [],
vmv_extensions: Optional[list]=[],
sample: Optional[pd.DataFrame] = None,
model: Callable = None
):
"""
Engines:
UrbanoFonseca marked this conversation as resolved.
Show resolved Hide resolved
- Duplicates
- Missing Values
- Labelling
- Valued Missing Values
- Drift Analysis

def __init__(self, df: pd.DataFrame):
Args:
df (pd.DataFrame): reference DataFrame used to run the DataQuality analysis.
label (str, optional): [MISSINGS, LABELLING, DRIFT ANALYSIS] target feature to be predicted.
If not specified, LABELLING is skipped.
entities: [DUPLICATES] entities relevant for duplicate analysis.
vmv_extensions: [VALUED MISSING VALUES] A list of user provided valued missing values to append to defaults.
sample: [DRIFT ANALYSIS] data against which drift is tested.
model: [DRIFT ANALYSIS] model wrapped by ModelWrapper used to test concept drift.
"""
self.df = df
self._warnings = set()
self._engines = { # Default list of engines
'duplicates': DuplicateChecker(df=df, entities=entities),
'missings': MissingsProfiler(df=df, target=label),
'valued-missing-values': VMVIdentifier(df=df, vmv_extensions=vmv_extensions),
'drift-analysis': DriftAnalyser(ref=df, sample=sample, label=label, model=model)
}

# Engines based on mandatory arguments
if label is not None:
self._engines['labelling'] = LabelInspector(df=df, label=label)
else:
print('Label is not defined. Skipping LABELLING engine.')


@property
def warnings(self):
"Set of warnings generated by individual QualityEngines."
return self._warnings

def get_warnings(self,
category: Optional[str] = None,
test: Optional[str] = None,
priority: Optional[Priority] = None) -> List[QualityWarning]:
"Retrieves warnings filtered by their properties."
filtered = list(self.warnings) # convert original set
filtered = [w for w in filtered if w.category == category] if category else filtered
filtered = [w for w in filtered if w.test == test] if test else filtered
filtered = [w for w in filtered if w.priority == Priority(priority)] if priority else filtered
filtered.sort() # sort by priority
return filtered

@property
def engines(self):
"Dictionary of instantiated engines to run data quality analysis."
return self._engines

def __store_warnings(self):
"Appends all warnings from individiual engines into warnings of DataQuality main class."
for engine in self.engines.values():
self._warnings = self._warnings.union(set(engine.get_warnings()))

def evaluate(self):
"Runs all the individual data quality checks and aggregates the results."
raise NotImplementedError

results = {name: engine.evaluate() for name, engine in self.engines.items()}
self.__store_warnings()
return results

def report(self):
"Returns a full list of warnings retrieved during the Data Quality checks."
raise NotImplementedError
"Prints a report containing all the warnings detected during the data quality analysis."
# TODO: Provide a count of warnings by priority
self.__store_warnings() # fetch all warnings from the engines
for warn in self.get_warnings():
print(warn)

17 changes: 12 additions & 5 deletions src/ydata_quality/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from typing import Optional

import pandas as pd
from ydata_quality.core import QualityWarning
from ydata_quality.core.warnings import Priority
from ydata_quality.core.warnings import QualityWarning, Priority
from ydata_quality.utils.modelling import infer_dtypes

class QualityEngine(ABC):
Expand Down Expand Up @@ -76,11 +75,12 @@ def get_warnings(self,
test: Optional[str] = None,
priority: Optional[Priority] = None):
"Retrieves warnings filtered by their properties."
filtered = self.warnings # original set
filtered = list(self.warnings) # convert original set
filtered = [w for w in filtered if w.category == category] if category else filtered
filtered = [w for w in filtered if w.test == test] if test else filtered
filtered = [w for w in filtered if w.priority == Priority(priority)] if priority else filtered
return set(filtered)
filtered.sort() # sort by priority
return filtered

@property
def tests(self):
Expand All @@ -97,4 +97,11 @@ def report(self):
def evaluate(self):
"Runs all the indidividual tests available within the same suite. Returns a dict of (name: results)."
self._warnings = set() # reset the warnings to avoid duplicates
return {test: getattr(self, test)() for test in self.tests}
results = {}
for test in self.tests:
try: # if anything fails
results[test] = getattr(self, test)()
except Exception as exc: # print a Warning and log the message
print(f'WARNING: Skipping test {test} due to failure during computation.')
results[test] = "[ERROR] Test failed to compute. Original exception: "+f"{exc}"
return results
1 change: 0 additions & 1 deletion src/ydata_quality/drift/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from scipy.stats import ks_2samp
from scipy.stats._continuous_distns import chi2_gen
from ydata_quality.core import QualityEngine, QualityWarning
from ydata_quality.utils.modelling import infer_dtypes


class ModelWrapper:
Expand Down
9 changes: 4 additions & 5 deletions src/ydata_quality/valued_missing_values/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, df: pd.DataFrame, vmv_extensions: Optional[list]=[]):
self._flatline_index = {}
self.__default_index_name = '__index'
self.vmvs = vmv_extensions

@property
def default_vmvs(self):
"""Returns the default list of Valued Missing Values.
Expand Down Expand Up @@ -61,7 +61,7 @@ def __get_flatline_index(self, column_name: str, th: Optional[int] = 1):
if column_name == self.__default_index_name:
df[self.__default_index_name] = df.index # Index now in columns to be processed next
column = df[column_name]
column.fillna('__filled') # So NaN values are considered
column.fillna('__filled') # So NaN values are considered
sequence_indexes = column.ne(column.shift()).cumsum() # Everytime shifted value is different from previous a new sequence starts
sequence_groups = column.index.to_series().groupby(sequence_indexes) # Group series indexes by sequence indexes
data = {'length': sequence_groups.count().values,
Expand Down Expand Up @@ -102,7 +102,6 @@ def predefined_valued_missing_values(self, skip: list=[], short: bool = True):
Raises warning based on the existence of these values.
VMVs of string type are case insensitive during search.
Returns a DataFrame with count distribution for each predefined type over each column.
The result DataFrame will ommit any
Arguments:
skip: List of columns that will not be target of search for vmvs.
Pass '__index' in skip to skip looking for flatlines at the index.
Expand All @@ -122,7 +121,7 @@ def predefined_valued_missing_values(self, skip: list=[], short: bool = True):
vmvs.drop(no_vmv_rows, inplace=True)
if vmvs.empty:
print("[PREDEFINED VALUED MISSING VALUES] No predefined vmvs from the set {} were found in the dataset.".format(
self.predefined_valued_missing_values
self.vmvs
))
else:
total_vmvs = vmvs.sum().sum()
Expand All @@ -131,4 +130,4 @@ def predefined_valued_missing_values(self, skip: list=[], short: bool = True):
test='Predefined Valued Missing Values', category='Valued Missing Values', priority=2, data=vmvs,
description=f"Found {total_vmvs} vmvs in the dataset."
))
return vmvs
return vmvs
231 changes: 231 additions & 0 deletions tutorials/data_quality.ipynb

Large diffs are not rendered by default.