Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datarelations): Data Relations MVP #13

Merged
merged 5 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ydata_quality/bias_fairness/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def performance_discrimination(self):
"""
# TODO: support error rate parity metrics (e.g. false positive rate, positive rate)
if self.label is None:
print('Argument "label" must be defined to calculate performance discrimination metric. Skipping test.')
print('[BIAS&FAIRNESS] Argument "label" must be defined to calculate performance discrimination metric. Skipping test.')
pass

res = {}
Expand Down
23 changes: 19 additions & 4 deletions src/ydata_quality/core/data_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ydata_quality.erroneous_data import ErroneousDataIdentifier
from ydata_quality.data_expectations import DataExpectationsReporter
from ydata_quality.bias_fairness import BiasFairness
from ydata_quality.data_relations import DataRelationsDetector

class DataQuality:
"DataQuality contains the multiple data quality engines."
Expand All @@ -23,14 +24,20 @@ def __init__(self,
label: str = None,
random_state: Optional[int] = None,
entities: List[Union[str, List[str]]] = [],
is_close: bool= False,
ed_extensions: Optional[list]=[],
sample: Optional[pd.DataFrame] = None,
model: Callable = None,
results_json_path: str = None,
error_tol: int = 0,
rel_error_tol: Optional[float] = None,
minimum_coverage: Optional[float] = 0.75,
sensitive_features: List[str] = []
sensitive_features: List[str] = [],
dtypes: Optional[dict] = {},
corr_th: float = 0.8,
vif_th: float = 5,
p_th: float = 0.05,
plot: bool = True
):
"""
Engines:
Expand All @@ -41,6 +48,7 @@ def __init__(self,
- Drift Analysis
- Data Expectations
- Bias & Fairness
- Data Relations

Args:
df (pd.DataFrame): reference DataFrame used to run the DataQuality analysis.
Expand All @@ -49,6 +57,7 @@ def __init__(self,
random_state (int, optional): Integer seed for random reproducibility. Default is None.
Set to None for fully random behaviour, no reproducibility.
entities: [DUPLICATES] entities relevant for duplicate analysis.
is_close: [DUPLICATES] Pass True to use numpy.isclose instead of pandas.equals in column comparison.
ed_extensions: [ERRONEOUS DATA] A list of user provided erroneous data values to append to defaults.
sample: [DRIFT ANALYSIS] data against which drift is tested.
model: [DRIFT ANALYSIS] model wrapped by ModelWrapper used to test concept drift.
Expand All @@ -57,24 +66,30 @@ def __init__(self,
rel_error_tol (float): [EXPECTATIONS] Defines the maximum fraction of failed expectations, overrides error_tol.
minimum_coverage (float): [EXPECTATIONS] Minimum expected fraction of DataFrame columns covered by the expectation suite.
sensitive_features (List[str]): [BIAS & FAIRNESS] features deemed as sensitive attributes
dtypes (Optional[dict]): Maps names of the columns of the dataframe to supported dtypes. Columns not specified are automatically inferred.
corr_th (float): [DATA RELATIONS] Absolute threshold for high correlation detection. Defaults to 0.8.
vif_th (float): [DATA RELATIONS] Variance Inflation Factor threshold for numerical independence test, typically 5-10 is recommended. Defaults to 5.
p_th (float): [DATA RELATIONS] Fraction of the right tail of the chi squared CDF defining threshold for categorical independence test. Defaults to 0.05.
plot (bool): Pass True to produce all available graphical outputs, False to suppress all graphical output.
"""
#TODO: Refactor legacy engines (property based) and logic in this class to new base (lean objects)
self.df = df
self._warnings = list()
self._random_state = random_state
self._engines_legacy = { # Default list of engines
'duplicates': DuplicateChecker(df=df, entities=entities),
'duplicates': DuplicateChecker(df=df, entities=entities, is_close=is_close),
'missings': MissingsProfiler(df=df, target=label, random_state=self.random_state),
'erroneous-data': ErroneousDataIdentifier(df=df, ed_extensions=ed_extensions),
'drift': DriftAnalyser(ref=df, sample=sample, label=label, model=model, random_state=self.random_state)
}

self._engines_new = {}
self._engines_new = {'data-relations': DataRelationsDetector()}
self._eval_args = { # Argument lists for different engines
# TODO: centralize shared args in a dictionary to pass just like a regular kwargs to engines, pass specific args in arg list (define here)
# In new standard all engines can be run at the evaluate method only, the evaluate run expression can then be:
# results = {name: engine.evaluate(*self._eval_args.get(name,[]), **shared_args) for name, engine in self.engines.items()}
'expectations': [results_json_path, df, error_tol, rel_error_tol, minimum_coverage]
'expectations': [results_json_path, df, error_tol, rel_error_tol, minimum_coverage],
'data-relations': [df, dtypes, label, corr_th, vif_th, p_th, plot]
}

# Engines based on mandatory arguments
Expand Down
8 changes: 8 additions & 0 deletions src/ydata_quality/data_relations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
Tools to check dataset for data relations.
"""
from ydata_quality.data_relations.engine import DataRelationsDetector

__all__ = [
"DataRelationsDetector"
]
172 changes: 172 additions & 0 deletions src/ydata_quality/data_relations/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""
Implementation of DataRelationsDetector engine to run data relations analysis.
"""
from typing import Optional, Tuple, List

import numpy as np
import pandas as pd

from ydata_quality.core import QualityEngine, QualityWarning
from ydata_quality.utils.correlations import correlation_matrix, partial_correlation_matrix, correlation_plotter, vif_collinearity, chi2_collinearity
from ydata_quality.utils.modelling import infer_dtypes
from ydata_quality.utils.auxiliary import standard_normalize

class DataRelationsDetector(QualityEngine):
"""Main class to run data relations analysis.
"""

def __init__(self):
return # Override the base class init method

@property
def tests(self):
return ["_confounder_detection", "_collider_detection", "_feature_importance", "_inflated_variance_detection"]

@property
def dtypes(self):
return self._dtypes

@dtypes.setter
def dtypes(self, df_dtypes: Tuple[pd.DataFrame, dict]):
df, dtypes = df_dtypes
if not isinstance(dtypes, dict):
raise ValueError("Property 'dtypes' should be a dictionary.")
assert all(col in df.columns for col in dtypes), "All dtypes keys \
must be columns in the dataset."
supported_dtypes = ['numerical', 'categorical']
assert all(dtype in supported_dtypes for dtype in dtypes.values()), "Assigned dtypes\
must be in the supported broad dtype list: {}.".format(supported_dtypes)
df_col_set = set(df.columns)
dtypes_col_set = set(dtypes.keys())
missing_cols = df_col_set.difference(dtypes_col_set)
if missing_cols:
_dtypes = infer_dtypes(df, skip=df_col_set.difference(missing_cols))
for col, dtype in _dtypes.items():
dtypes[col] = dtype
self._dtypes = dtypes

def evaluate(self, df: pd.DataFrame, dtypes: Optional[dict] = None, label: str=None, corr_th: float=0.8, vif_th: float=5, p_th: float=0.05, plot: bool=True) -> dict:
"""Runs tests to the validation run results and reports based on found errors.
Note, we perform standard normalization of numerical features in order to unbias VIF and partial correlation methods.
This bias correction produces results equivalent to adding a constant feature to the dataset.

Args:
df (pd.DataFrame): The Pandas DataFrame on which you want to perform data relations analysis.
dtypes (Optional[dict]): A dictionary mapping df column names to numerical/categorical dtypes.
If a full map is not provided it will be determined/completed via inference method.
label (Optional[str]): A string identifying the label feature column
corr_th (float): Absolute threshold for high correlation detection. Defaults to 0.8.
vif_th (float): Variance Inflation Factor threshold for numerical independence test, typically 5-10 is recommended. Defaults to 5.
p_th (float): Fraction of the right tail of the chi squared CDF defining threshold for categorical independence test. Defaults to 0.05.
plot (bool): Pass True to produce all available graphical outputs, False to suppress all graphical output.
"""
assert label in df.columns or not label, "The provided label name does not exist as a column in the dataset"
self._warnings = [] # reset the warnings to avoid duplicates
if not dtypes:
dtypes = {}
self.dtypes = (df, dtypes) # Consider refactoring QualityEngine dtypes (df as argument of setter)
df = standard_normalize(df, dtypes)
UrbanoFonseca marked this conversation as resolved.
Show resolved Hide resolved
results = {}
corr_mat, _ = correlation_matrix(df, self.dtypes, True)
p_corr_mat = partial_correlation_matrix(corr_mat)
results['Correlations'] = {'Correlation matrix': corr_mat, 'Partial correlation matrix': p_corr_mat}
if plot:
correlation_plotter(corr_mat, title='Correlations', symmetric=True)
if p_corr_mat is not None:
if plot:
correlation_plotter(p_corr_mat, title='Partial Correlations', symmetric=True)
results['Confounders'] = self._confounder_detection(corr_mat, p_corr_mat, corr_th)
results['Colliders'] = self._collider_detection(corr_mat, p_corr_mat, corr_th)
else:
print('[DATA RELATIONS] The partial correlation matrix is not computable for this dataset. Skipping potential confounder and collider detection tests.')
if label:
results['Feature Importance'] = self._feature_importance(corr_mat, p_corr_mat, label, corr_th)
results['High Collinearity'] = self._high_collinearity_detection(df, self.dtypes, label, vif_th, p_th=p_th)
return results
UrbanoFonseca marked this conversation as resolved.
Show resolved Hide resolved

def _confounder_detection(self, corr_mat: pd.DataFrame, par_corr_mat: pd.DataFrame, corr_th: float) -> List[Tuple[str, str]]:
"""Detects pairwise variable relationships potentially affected by confounder effects of other covariates.

Taking the zero order correlations (i.e. without controlling for the influence of any other feature), all
candidate pairs are compared against the full order partial correlations.
Zero order coefficient above threshold and partial coefficient below threshold indicate existence of confounding effects."""
mask = np.ones(corr_mat.shape, dtype='bool')
mask[np.tril(mask)] = False # Drop pairs below diagonal
UrbanoFonseca marked this conversation as resolved.
Show resolved Hide resolved
mask[corr_mat.abs()<=corr_th] = False # Drop pairs with zero order correlation below threshold
mask[par_corr_mat.abs()>corr_th] = False # Drop pairs with correlation after controling all other covariates
confounded_pairs = [(corr_mat.index[i], corr_mat.columns[j]) for i, j in np.argwhere(mask)]
if len(confounded_pairs)>0:
self.store_warning(QualityWarning(
test='Confounded correlations', category='Data Relations', priority=2, data = confounded_pairs,
description="Found {} independently correlated variable pairs that disappeared after controling\
for the remaining variables. This is an indicator of potential confounder effects in the dataset.".format(len(confounded_pairs))))
return confounded_pairs

def _collider_detection(self, corr_mat: pd.DataFrame, par_corr_mat: pd.DataFrame, corr_th: float) -> List[Tuple[str, str]]:
"""Detects pairwise variable relationships potentially creating colliding effects with other covariates.

Taking the zero order correlations (i.e. without controlling for the influence of any other feature), all
candidate pairs are compared against the full order partial correlations.
Zero order coefficient below threshold and partial coefficient above threshold indicate existence of collider effects."""
mask = np.ones(corr_mat.shape, dtype='bool')
mask[np.tril(mask)] = False # Drop pairs below diagonal
mask[corr_mat.abs()>corr_th] = False # Drop pairs with zero order correlation above threshold
mask[par_corr_mat.abs()<=corr_th] = False # Drop pairs with correlation after controling all other covariates
colliding_pairs = [(corr_mat.index[i], corr_mat.columns[j]) for i, j in np.argwhere(mask)]
if len(colliding_pairs)>0:
self.store_warning(QualityWarning(
test='Collider correlations', category='Data Relations', priority=2, data = colliding_pairs,
description="Found {} independently uncorrelated variable pairs that showed correlation after\
controling for the remaining variables. This is an indicator of potential colliding bias with other covariates.".format(len(colliding_pairs))))
return colliding_pairs

def _feature_importance(self, corr_mat: pd.DataFrame, par_corr_mat: pd.DataFrame, label: str, corr_th: float) -> pd.DataFrame:
"""Identifies features with high importance.
Returns all features with absolute correlation to the label higher than corr_th.

This method returns a summary of all detected important features.
The summary contains zero, full order partial correlation and a note regarding potential confounding."""
assert label in corr_mat.columns, "The provided label {} does not exist as a column in the DataFrame.".format(label)
label_corrs = corr_mat.loc[label].drop(label)
mask = np.ones(label_corrs.shape, dtype='bool')
mask[label_corrs.abs()<=corr_th] = False # Drop pairs with zero order correlation below threshold
important_feats = [label_corrs.index[i][0] for i in np.argwhere(mask)]
summary = "[FEATURE IMPORTANCE] No important features were found in explaining {}. You might want to try lowering corr_th.".format(label)
if len(important_feats)>0:
if par_corr_mat is not None:
label_pcorrs = par_corr_mat.loc[label].drop(label)
summary = pd.DataFrame(data={'Correlations': label_corrs.loc[important_feats], 'Partial Correlations': label_pcorrs.loc[important_feats]})
summary['Note'] = 'OK'
summary.loc[summary['Partial Correlations'].abs()<corr_th, 'Note'] = 'Potential confounding detected'
else:
summary = pd.DataFrame(data={'Correlations': label_corrs.loc[important_feats]})
summary.sort_values(by='Correlations', ascending=False, inplace=True, key=abs)
return summary

def _high_collinearity_detection(self, df: pd.DataFrame, dtypes: dict, label: str=None, vif_th: float= 10., p_th: float=0.05) -> pd.DataFrame:
"""Detects independent variables with high collinearity. Categorical vars and continuous vars are studied as independent sets of variables.
Variance Inflation Factors are used to study continuous vars collinearity.
Chi-squared tests are used to test categorical vars collinearity.
Results are ranked from highest collinearity to lowest and segregated on type of variable.
"""
vif_scores = vif_collinearity(df, dtypes, p_th, label)
inflated = vif_scores.loc[vif_scores>vif_th]
chi2_tests = chi2_collinearity(df, dtypes, p_th,label)
unique_cats = list(set(list(chi2_tests['Var1'].unique())+list(chi2_tests['Var2'].unique())))
cat_coll_scores = [(c, chi2_tests[(c == chi2_tests[['Var1','Var2']]).any(axis=1)]['Adjusted Chi2'].mean()) for c in unique_cats]
cat_coll_scores = [c[0] for c in sorted(cat_coll_scores, key= lambda x: x[1], reverse=True)]
if len(inflated)>0:
self.store_warning(QualityWarning(
test='High Collinearity - Numerical', category='Data Relations', priority=2, data = inflated,
description="Found {} numerical variables with high Variance Inflation Factor (VIF>{:.1f}).\
The variables listed in results are highly collinear with other variables in the dataset. These will make model explainability harder and potentially give way to issues like overfitting.\
Depending on your end goal you might want to remove the highest VIF variables.".format(len(inflated), vif_th)))
if len(cat_coll_scores)>0:
# TODO: Determine and sort collinear_cats, as the average aggregated adjusted chi2 tests for each variable in chi2_tests
# TODO: Merge warning messages (make one warning for the whole test, summarizing findings from the numerical and categorical vars)
self.store_warning(QualityWarning(
test='High Collinearity - Categorical', category='Data Relations', priority=2, data = chi2_tests,
description="Found {} categorical variables with significant collinearity (p-value < {}).\
The variables listed in results are highly collinear with other variables in the dataset and sorted descending according to propensity. These will make model explainability harder and potentially give way to issues like overfitting.\
Depending on your end goal you might want to remove variables following the provided order.".format(len(cat_coll_scores), p_th)))
return {'Numerical': inflated, 'Categorical': cat_coll_scores}
18 changes: 7 additions & 11 deletions src/ydata_quality/duplicates/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
import pandas as pd

from ydata_quality.core import QualityEngine, QualityWarning
from ydata_quality.utils.auxiliary import find_duplicate_columns


class DuplicateChecker(QualityEngine):
"Engine for running analyis on duplicate records."

def __init__(self, df: pd.DataFrame, entities: List[Union[str, List[str]]] = []):
def __init__(self, df: pd.DataFrame, entities: List[Union[str, List[str]]] = [], is_close: bool=False):
super().__init__(df=df)
self._entities = entities
self._tests = ["exact_duplicates", "entity_duplicates", "duplicate_columns"]
self._is_close = is_close

@property
def entities(self):
Expand Down Expand Up @@ -97,21 +99,15 @@ def entity_duplicates(self, entity: Optional[Union[str, List[str]]] = None):
ent_dups.update(self.entity_duplicates(col))
return ent_dups


def duplicate_columns(self):
"Returns a mapping dictionary of columns with fully duplicated feature values."
dups = {}
for idx, col in enumerate(self.df.columns): # Iterate through all the columns of dataframe
ref = self.df[col] # Take the column values as reference.
for tgt_col in self.df.columns[idx+1:]: # Iterate through all other columns
if ref.equals(self.df[tgt_col]): # Take target values
dups[col] = tgt_col # Store if they match

if len(dups) > 0:
dups = find_duplicate_columns(self.df, self._is_close)
cols_with_dups = len(dups.keys())
if cols_with_dups > 0:
self.store_warning(
QualityWarning(
test='Duplicate Columns', category='Duplicates', priority=1, data=dups,
description=f"Found {len(dups)} columns with exactly the same feature values as other columns."
description=f"Found {cols_with_dups} columns with exactly the same feature values as other columns."
)
)
else:
Expand Down
Loading