Skip to content

Commit

Permalink
Merge pull request #392 from NannyML/upd_dq
Browse files Browse the repository at this point in the history
Data Quality minor Refactor
  • Loading branch information
nnansters authored May 29, 2024
2 parents 022b8fa + 236421e commit b8b237f
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 84 deletions.
17 changes: 0 additions & 17 deletions nannyml/data_quality/base.py

This file was deleted.

59 changes: 33 additions & 26 deletions nannyml/data_quality/missing/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from nannyml.base import AbstractCalculator, _list_missing
from nannyml.chunk import Chunker
from nannyml.data_quality.base import _add_alert_flag
from nannyml.exceptions import InvalidArgumentsException
from nannyml.sampling_error import SAMPLING_ERROR_RANGE
from nannyml.thresholds import StandardDeviationThreshold, Threshold, calculate_threshold_values
Expand Down Expand Up @@ -132,23 +131,6 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs):
count_nan, count_tot = self._calculate_missing_value_stats(reference_data[col])
self._sampling_error_components[col] = count_nan if self.normalize else count_nan / count_tot

for column in self.column_names:
reference_chunk_results = np.asarray(
[
self._calculate_missing_value_stats(chunk.data[column])[0]
for chunk in self.chunker.split(reference_data)
]
)
self._lower_alert_thresholds[column], self._upper_alert_thresholds[column] = calculate_threshold_values(
threshold=self.threshold,
data=reference_chunk_results,
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
metric_name=self.data_quality_metric,
override_using_none=True,
)

self.result = self._calculate(data=reference_data)
self.result.data[('chunk', 'period')] = 'reference'

Expand Down Expand Up @@ -190,6 +172,8 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
res = res.reset_index(drop=True)

if self.result is None:
self._set_metric_thresholds(res)
res = self._populate_alert_thresholds(res)
self.result = Result(
results_data=res,
column_names=self.column_names,
Expand All @@ -202,8 +186,9 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# Dropping the intermediate '_filter' and directly returning the correct 'Result' class works OK
# but this causes us to lose the "common behavior" in the top level 'filter' method when overriding.
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
res = self._populate_alert_thresholds(res)
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand All @@ -227,12 +212,37 @@ def _calculate_for_column(self, data: pd.DataFrame, column_name: str) -> Dict[st
result['value'] - SAMPLING_ERROR_RANGE * result['sampling_error'],
-np.inf if self.lower_threshold_value_limit is None else self.lower_threshold_value_limit,
)

result['upper_threshold'] = self._upper_alert_thresholds[column_name]
result['lower_threshold'] = self._lower_alert_thresholds[column_name]
result['alert'] = _add_alert_flag(result)
return result

def _set_metric_thresholds(self, result_data: pd.DataFrame):
for column_name in self.column_names:
self._lower_alert_thresholds[column_name], self._upper_alert_thresholds[column_name] = calculate_threshold_values( # noqa: E501
threshold=self.threshold,
data=result_data.loc[:, (column_name, 'value')],
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
)

def _populate_alert_thresholds(self, result_data: pd.DataFrame) -> pd.DataFrame:
for column_name in self.column_names:
result_data[(column_name, 'upper_threshold')] = self._upper_alert_thresholds[column_name]
result_data[(column_name, 'lower_threshold')] = self._lower_alert_thresholds[column_name]
result_data[(column_name, 'alert')] = result_data.apply(
lambda row: True
if (
row[(column_name, 'value')] > (
np.inf if row[(column_name, 'upper_threshold')] is None else row[(column_name, 'upper_threshold')] # noqa: E501
)
or row[(column_name, 'value')] < (
-np.inf if row[(column_name, 'lower_threshold')] is None else row[(column_name, 'lower_threshold')] # noqa: E501
)
)
else False,
axis=1,
)
return result_data


def _create_multilevel_index(
column_names,
Expand All @@ -247,9 +257,6 @@ def _create_multilevel_index(
'sampling_error',
'upper_confidence_boundary',
'lower_confidence_boundary',
'upper_threshold',
'lower_threshold',
'alert',
]
]
tuples = chunk_tuples + column_tuples
Expand Down
63 changes: 36 additions & 27 deletions nannyml/data_quality/unseen/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.data_quality.base import _add_alert_flag
# from nannyml.data_quality.base import _add_alert_flag
from nannyml.exceptions import InvalidArgumentsException
from nannyml.thresholds import ConstantThreshold, Threshold, calculate_threshold_values
from nannyml.usage_logging import UsageEvent, log_usage
Expand Down Expand Up @@ -145,25 +145,6 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs):
for col in self.column_names:
self._categorical_seen_values[col] = set(reference_data[col].unique())

# Calculate Alert Thresholds
for column in self.column_names:
_seen_values = self._categorical_seen_values[column]
reference_chunk_results = np.asarray(
[
self._calculate_unseen_value_stats(chunk.data[column], _seen_values)
for chunk in self.chunker.split(reference_data)
]
)
self._lower_alert_thresholds[column], self._upper_alert_thresholds[column] = calculate_threshold_values(
threshold=self.threshold,
data=reference_chunk_results,
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
metric_name=self.data_quality_metric,
override_using_none=True,
)

# By definition everything (sampling error and confidence boundaries) here is 0.
# We are not breaking pattern by artificially creating the result object
# But maybe we should? to be more efficient??
Expand Down Expand Up @@ -208,6 +189,8 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
res = res.reset_index(drop=True)

if self.result is None:
self._set_metric_thresholds(res)
res = self._populate_alert_thresholds(res)
self.result = Result(
results_data=res,
column_names=self.column_names,
Expand All @@ -220,9 +203,9 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# Dropping the intermediate '_filter' and directly returning the correct 'Result' class works OK
# but this causes us to lose the "common behavior" in the top level 'filter' method when overriding.
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
res = self._populate_alert_thresholds(res)
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data.sort_index(inplace=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand All @@ -231,11 +214,37 @@ def _calculate_for_column(self, data: pd.DataFrame, column_name: str) -> Dict[st
seen_values = self._categorical_seen_values[column_name]
value = self._calculate_unseen_value_stats(data[column_name], seen_values)
result['value'] = value
result['upper_threshold'] = self._upper_alert_thresholds[column_name]
result['lower_threshold'] = self._lower_alert_thresholds[column_name]
result['alert'] = _add_alert_flag(result)
return result

def _set_metric_thresholds(self, result_data: pd.DataFrame):
for column_name in self.column_names:
self._lower_alert_thresholds[column_name], self._upper_alert_thresholds[column_name] = calculate_threshold_values( # noqa: E501
threshold=self.threshold,
data=result_data.loc[:, (column_name, 'value')],
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
)

def _populate_alert_thresholds(self, result_data: pd.DataFrame) -> pd.DataFrame:
for column_name in self.column_names:
result_data[(column_name, 'upper_threshold')] = self._upper_alert_thresholds[column_name]
result_data[(column_name, 'lower_threshold')] = self._lower_alert_thresholds[column_name]
result_data[(column_name, 'alert')] = result_data.apply(
lambda row: True
if (
row[(column_name, 'value')] > (
np.inf if row[(column_name, 'upper_threshold')] is None else row[(column_name, 'upper_threshold')] # noqa: E501
)
or row[(column_name, 'value')] < (
-np.inf if row[(column_name, 'lower_threshold')] is None else row[(column_name, 'lower_threshold')] # noqa: E501
)
)
else False,
axis=1,
)
return result_data


def _convert_int_columns_to_categorical(
data: pd.DataFrame, column_names: List[str], logger: Optional[logging.Logger]
Expand All @@ -262,9 +271,9 @@ def _create_multilevel_index(
chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period']
chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names]
column_tuples = [
(column_name, el)
(column_name, 'value')
for column_name in column_names
for el in ['value', 'upper_threshold', 'lower_threshold', 'alert']
# for el in ['value', 'upper_threshold', 'lower_threshold', 'alert']
]
tuples = chunk_tuples + column_tuples
return MultiIndex.from_tuples(tuples)
2 changes: 1 addition & 1 deletion nannyml/distribution/categorical/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
self.result = Result(result_data, self.column_names, self.timestamp_column_name, self.chunker)
else:
# self.result = self.result.data.loc[self.result.data['period'] == 'reference', :]
self.result.data = pd.concat([self.result.data, result_data]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, result_data], ignore_index=True)

return self.result

Expand Down
2 changes: 1 addition & 1 deletion nannyml/distribution/continuous/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
self.result = Result(result_data, self.column_names, self.timestamp_column_name, self.chunker)
else:
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, result_data]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, result_data], ignore_index=True)

return self.result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
else:
self.result = self.result.filter(period='reference')
res = self._populate_thresholds(results=res)
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)
return self.result

def _calculate_chunk_record(self, data: pd.DataFrame) -> Dict[str, float]:
Expand Down
4 changes: 2 additions & 2 deletions nannyml/drift/multivariate/domain_classifier/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
else:
res = self._populate_alert_thresholds(res)
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)
return self.result

def _calculate_chunk(self, data: pd.DataFrame):
Expand All @@ -284,7 +284,7 @@ def _calculate_chunk(self, data: pd.DataFrame):
reference_X = self._reference_X
chunk_y = np.ones(len(chunk_X))
reference_y = np.zeros(len(reference_X))
X = pd.concat([reference_X, chunk_X]).reset_index(drop=True)
X = pd.concat([reference_X, chunk_X], ignore_index=True)
y = np.concatenate([reference_y, chunk_y])

X, y = drop_matching_duplicate_rows(X, y, self.feature_column_names)
Expand Down
2 changes: 1 addition & 1 deletion nannyml/drift/univariate/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# but this causes us to lose the "common behavior" in the top level 'filter' method when overriding.
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)
self.result.analysis_data = data.copy()

return self.result
Expand Down
2 changes: 1 addition & 1 deletion nannyml/performance_calculation/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
)
else:
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down
2 changes: 1 addition & 1 deletion nannyml/performance_estimation/confidence_based/cbpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def _estimate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
)
else:
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def _estimate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
)
else:
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down
2 changes: 1 addition & 1 deletion nannyml/stats/avg/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
self.result = self.result.filter(period='reference')
res = self._populate_thresholds(results=res)
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down
2 changes: 1 addition & 1 deletion nannyml/stats/count/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
self.result = self.result.filter(period='reference')
res = self._populate_thresholds(results=res)
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down
2 changes: 1 addition & 1 deletion nannyml/stats/median/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
self.result = self.result.filter(period='reference')
res = self._populate_thresholds(results=res)
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down
2 changes: 1 addition & 1 deletion nannyml/stats/std/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
self.result = self.result.filter(period='reference')
res = self._populate_thresholds(results=res)
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down
2 changes: 1 addition & 1 deletion nannyml/stats/sum/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
# Applicable here but to many of the base classes as well (e.g. fitting and calculating)
self.result = self.result.filter(period='reference')
res = self._populate_thresholds(results=res)
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.data = pd.concat([self.result.data, res], ignore_index=True)

return self.result

Expand Down

0 comments on commit b8b237f

Please sign in to comment.