From 1a364ce0766aa3e63fe221aea38eafd50653b718 Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Wed, 3 Jul 2024 18:55:22 +0100 Subject: [PATCH 1/5] Add "treat_as_continuous" parameter to univariate drift calculator --- nannyml/drift/univariate/calculator.py | 59 +++++++++++++++++++------- tests/drift/test_drift.py | 7 +-- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/nannyml/drift/univariate/calculator.py b/nannyml/drift/univariate/calculator.py index f072083e..ce764962 100644 --- a/nannyml/drift/univariate/calculator.py +++ b/nannyml/drift/univariate/calculator.py @@ -31,7 +31,7 @@ import warnings from logging import Logger -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np import pandas as pd @@ -61,6 +61,7 @@ class UnivariateDriftCalculator(AbstractCalculator): def __init__( self, column_names: Union[str, List[str]], + treat_as_numerical: Optional[Union[str, List[str]]] = None, treat_as_categorical: Optional[Union[str, List[str]]] = None, timestamp_column_name: Optional[str] = None, categorical_methods: Optional[Union[str, List[str]]] = None, @@ -79,6 +80,8 @@ def __init__( column_names: Union[str, List[str]] A string or list containing the names of features in the provided data set. A drift score will be calculated for each entry in this list. + treat_as_numerical: Union[str, List[str]] + A single column name or list of column names to be treated as numerical by the calculator. treat_as_categorical: Union[str, List[str]] A single column name or list of column names to be treated as categorical by the calculator. timestamp_column_name: str @@ -204,6 +207,12 @@ def __init__( column_names = [column_names] self.column_names = column_names + if not treat_as_numerical: + treat_as_numerical = [] + if isinstance(treat_as_numerical, str): + treat_as_numerical = [treat_as_numerical] + self.treat_as_numerical = treat_as_numerical + if not treat_as_categorical: treat_as_categorical = [] if isinstance(treat_as_categorical, str): @@ -255,21 +264,8 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> UnivariateDrift _list_missing(self.column_names, reference_data) - self.continuous_column_names, self.categorical_column_names = _split_features_by_type( - reference_data, self.column_names - ) - - for column_name in self.treat_as_categorical: - if column_name not in self.column_names: - self._logger.info( - f"ignoring 'treat_as_categorical' value '{column_name}' because it was not in " - f"listed column names" - ) - break - if column_name in self.continuous_column_names: - self.continuous_column_names.remove(column_name) - if column_name not in self.categorical_column_names: - self.categorical_column_names.append(column_name) + + self.continuous_column_names, self.categorical_column_names = self._split_continuous_and_categorical(reference_data) timestamps = reference_data[self.timestamp_column_name] if self.timestamp_column_name else None for column_name in self.continuous_column_names: @@ -398,6 +394,37 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: self.result.analysis_data = data.copy() return self.result + + def _split_continuous_and_categorical(self, data: pd.DataFrame) -> Tuple[List[str], List[str]]: + """Splits the features in the data set into continuous and categorical features.""" + treat_as_numerical_set, treat_as_categorical_set = set(self.treat_as_numerical), set(self.treat_as_categorical) + column_names_set = set(self.column_names) + + invalid_continuous_column_names = treat_as_numerical_set - column_names_set + treat_as_numerical_set = treat_as_numerical_set - invalid_continuous_column_names + if invalid_continuous_column_names: + self._logger.info( + f"ignoring 'treat_as_numerical' values {list(invalid_continuous_column_names)} because they were not in " + f"listed column names" + ) + + invalid_categorical_column_names = treat_as_categorical_set - column_names_set + treat_as_categorical_set = treat_as_categorical_set - invalid_categorical_column_names + if invalid_categorical_column_names: + self._logger.info( + f"ignoring 'treat_as_categorical' values {list(invalid_categorical_column_names)} because they were not in " + f"listed column names" + ) + + unspecified_columns = column_names_set - treat_as_numerical_set - treat_as_categorical_set + continuous_column_names, categorical_column_names = _split_features_by_type( + data, unspecified_columns + ) + + continuous_column_names = continuous_column_names + list(treat_as_numerical_set) + categorical_column_names = categorical_column_names + list(treat_as_categorical_set) + + return continuous_column_names, categorical_column_names def _calculate_for_column( diff --git a/tests/drift/test_drift.py b/tests/drift/test_drift.py index 907bd736..735e3b28 100644 --- a/tests/drift/test_drift.py +++ b/tests/drift/test_drift.py @@ -281,7 +281,7 @@ def test_univariate_drift_calculator_treat_as_categorical_for_categorical_column assert sorted(calc.categorical_column_names) == expected_categorical -def test_univariate_drift_calculator_treat_as_categorical_for_non_existing_column( # noqa: D103 +def test_univariate_drift_calculator_treat_as_for_non_existing_column( # noqa: D103 sample_drift_data, caplog ): caplog.set_level(logging.INFO) @@ -289,6 +289,7 @@ def test_univariate_drift_calculator_treat_as_categorical_for_non_existing_colum calc = UnivariateDriftCalculator( column_names=['f1', 'f2', 'f3', 'f4'], treat_as_categorical='foo', + treat_as_numerical='bar', timestamp_column_name='timestamp', continuous_methods=['jensen_shannon'], categorical_methods=['jensen_shannon'], @@ -299,8 +300,8 @@ def test_univariate_drift_calculator_treat_as_categorical_for_non_existing_colum assert sorted(calc.continuous_column_names) == expected_continuous assert sorted(calc.categorical_column_names) == expected_categorical - assert "ignoring 'treat_as_categorical' value 'foo' because it was not in listed column names" in caplog.messages - + assert "ignoring 'treat_as_categorical' values ['foo'] because they were not in listed column names" in caplog.messages + assert "ignoring 'treat_as_numerical' values ['bar'] because they were not in listed column names" in caplog.messages def test_univariate_drift_calculator_without_custom_thresholds(): # noqa: D103 sut = UnivariateDriftCalculator( From 5493e7c964cfc22ac192d239300d4ddbbe16fa9e Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Wed, 3 Jul 2024 18:56:22 +0100 Subject: [PATCH 2/5] Made _split_by_features more generic and predictable --- nannyml/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nannyml/base.py b/nannyml/base.py index 001141b3..7deb3cef 100644 --- a/nannyml/base.py +++ b/nannyml/base.py @@ -520,10 +520,10 @@ def _estimate(self, data: pd.DataFrame, *args, **kwargs) -> Result: raise NotImplementedError(f"'{self.__class__.__name__}' must implement the '_calculate' method") -def _split_features_by_type(data: pd.DataFrame, feature_column_names: List[str]) -> Tuple[List[str], List[str]]: - continuous_column_names = [col for col in feature_column_names if _column_is_continuous(data[col])] +def _split_features_by_type(data: pd.DataFrame, feature_column_names: Iterable[str]) -> Tuple[List[str], List[str]]: + continuous_column_names = [col for col in sorted(feature_column_names) if _column_is_continuous(data[col])] - categorical_column_names = [col for col in feature_column_names if _column_is_categorical(data[col])] + categorical_column_names = [col for col in sorted(feature_column_names) if _column_is_categorical(data[col])] return continuous_column_names, categorical_column_names From 3963a6259f8196a260243bdb46c61cd6f8a2672f Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Wed, 3 Jul 2024 18:56:50 +0100 Subject: [PATCH 3/5] Split up JS implementation into proper continuous and categorical implementations --- nannyml/drift/univariate/methods.py | 101 ++++++++++++------- tests/drift/test_univariate_drift_methods.py | 13 +-- 2 files changed, 73 insertions(+), 41 deletions(-) diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index a635c831..34dcb301 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -247,8 +247,7 @@ def inner_wrapper(wrapped_class: Type[Method]) -> Type[Method]: @MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CONTINUOUS) -@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CATEGORICAL) -class JensenShannonDistance(Method): +class ContinuousJensenShannonDistance(Method): """Calculates Jensen-Shannon distance. By default an alert will be raised if `distance > 0.1`. @@ -272,34 +271,17 @@ def __init__(self, **kwargs) -> None: lower_threshold_limit : float, default=0 An optional lower threshold for the performance metric. """ - self._treat_as_type: str self._bins: np.ndarray self._reference_proba_in_bins: np.ndarray def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None): reference_data = _remove_nans(reference_data) - if _column_is_categorical(reference_data): - treat_as_type = 'cat' - else: - n_unique_values = len(np.unique(reference_data)) - len_reference = len(reference_data) - if n_unique_values > 50 or n_unique_values / len_reference > 0.1: - treat_as_type = 'cont' - else: - treat_as_type = 'cat' + len_reference = len(reference_data) - if treat_as_type == 'cont': - bins = np.histogram_bin_edges(reference_data, bins='doane') - reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference - self._bins = bins - self._reference_proba_in_bins = reference_proba_in_bins - else: - reference_unique, reference_counts = np.unique(reference_data, return_counts=True) - reference_proba_per_unique = reference_counts / len(reference_data) - self._bins = reference_unique - self._reference_proba_in_bins = reference_proba_per_unique - - self._treat_as_type = treat_as_type + bins = np.histogram_bin_edges(reference_data, bins='doane') + reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference + self._bins = bins + self._reference_proba_in_bins = reference_proba_in_bins return self @@ -308,15 +290,9 @@ def _calculate(self, data: pd.Series): data = _remove_nans(data) if data.empty: return np.nan - if self._treat_as_type == 'cont': - len_data = len(data) - data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data - - else: - data_unique, data_counts = np.unique(data, return_counts=True) - data_counts_dic = dict(zip(data_unique, data_counts)) - data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins] - data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data) + + len_data = len(data) + data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data leftover = 1 - np.sum(data_proba_in_bins) if leftover > 0: @@ -325,11 +301,66 @@ def _calculate(self, data: pd.Series): distance = jensenshannon(reference_proba_in_bins, data_proba_in_bins, base=2) - del reference_proba_in_bins - return distance +@MethodFactory.register(key='jensen_shannon', feature_type=FeatureType.CATEGORICAL) +class CategoricalJensenShannonDistance(Method): + """Calculates Jensen-Shannon distance. + + By default an alert will be raised if `distance > 0.1`. + """ + + def __init__(self, **kwargs) -> None: + """Initialize Jensen-Shannon method.""" + super().__init__( + display_name='Jensen-Shannon distance', + column_name='jensen_shannon', + lower_threshold_limit=0, + **kwargs, + ) + """ + Parameters + ---------- + display_name : str, default='Jensen-Shannon distance' + The name of the metric. Used to display in plots. + column_name: str, default='jensen-shannon' + The name used to indicate the metric in columns of a DataFrame. + lower_threshold_limit : float, default=0 + An optional lower threshold for the performance metric. + """ + self._bins: np.ndarray + self._reference_proba_in_bins: np.ndarray + + def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None): + reference_data = _remove_nans(reference_data) + reference_unique, reference_counts = np.unique(reference_data, return_counts=True) + reference_proba_per_unique = reference_counts / len(reference_data) + self._bins = reference_unique + self._reference_proba_in_bins = reference_proba_per_unique + + return self + + def _calculate(self, data: pd.Series): + reference_proba_in_bins = copy(self._reference_proba_in_bins) + data = _remove_nans(data) + if data.empty: + return np.nan + + data_unique, data_counts = np.unique(data, return_counts=True) + data_counts_dic = dict(zip(data_unique, data_counts)) + data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins] + data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data) + + leftover = 1 - np.sum(data_proba_in_bins) + if leftover > 0: + data_proba_in_bins = np.append(data_proba_in_bins, leftover) + reference_proba_in_bins = np.append(reference_proba_in_bins, 0) + + distance = jensenshannon(reference_proba_in_bins, data_proba_in_bins, base=2) + + return distance + @MethodFactory.register(key='kolmogorov_smirnov', feature_type=FeatureType.CONTINUOUS) class KolmogorovSmirnovStatistic(Method): """Calculates the Kolmogorov-Smirnov d-stat. diff --git a/tests/drift/test_univariate_drift_methods.py b/tests/drift/test_univariate_drift_methods.py index 50d742c8..1e40a29e 100644 --- a/tests/drift/test_univariate_drift_methods.py +++ b/tests/drift/test_univariate_drift_methods.py @@ -5,8 +5,9 @@ from nannyml.chunk import CountBasedChunker, DefaultChunker from nannyml.drift.univariate.methods import ( + CategoricalJensenShannonDistance, HellingerDistance, - JensenShannonDistance, + ContinuousJensenShannonDistance, KolmogorovSmirnovStatistic, LInfinityDistance, WassersteinDistance, @@ -22,7 +23,7 @@ def test_js_for_0_distance(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.choice(np.linspace(0, 2, 6), 10_000), name='A') - js = JensenShannonDistance(chunker=chunker, threshold=threshold) + js = ContinuousJensenShannonDistance(chunker=chunker, threshold=threshold) js.fit(reference) distance = js.calculate(reference) assert distance == 0 @@ -32,7 +33,7 @@ def test_js_for_both_continuous(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.normal(0, 1, 10_000), name='A') analysis = pd.Series(np.random.normal(0, 1, 1000), name='A') - js = JensenShannonDistance(chunker=chunker, threshold=threshold) + js = ContinuousJensenShannonDistance(chunker=chunker, threshold=threshold) js.fit(reference) distance = js.calculate(analysis) assert np.round(distance, 2) == 0.05 @@ -42,7 +43,7 @@ def test_js_for_quasi_continuous(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.choice(np.linspace(0, 2, 6), 10_000), name='A') analysis = pd.Series(np.random.choice(np.linspace(0, 2, 3), 1000), name='A') - js = JensenShannonDistance(chunker=chunker, threshold=threshold) + js = ContinuousJensenShannonDistance(chunker=chunker, threshold=threshold) js.fit(reference) distance = js.calculate(analysis) assert np.round(distance, 2) == 0.73 @@ -52,7 +53,7 @@ def test_js_for_categorical(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.choice(['a', 'b', 'c', 'd'], 10_000), name='A') analysis = pd.Series(np.random.choice(['a', 'b', 'c', 'e'], 1000), name='A') - js = JensenShannonDistance(chunker=chunker, threshold=threshold) + js = CategoricalJensenShannonDistance(chunker=chunker, threshold=threshold) js.fit(reference) distance = js.calculate(analysis) assert np.round(distance, 2) == 0.5 @@ -183,7 +184,7 @@ def test_hellinger_for_categorical(): # noqa: D103 [ KolmogorovSmirnovStatistic(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), LInfinityDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), - JensenShannonDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), + ContinuousJensenShannonDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), WassersteinDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), HellingerDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), ], From 3ee722fe1d1e42d0f31de19e3c5b9f59645d4ad0 Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Wed, 3 Jul 2024 20:04:53 +0100 Subject: [PATCH 4/5] Split up Hellinger implementation into continuous and categorical implementation --- nannyml/drift/univariate/methods.py | 97 +++++++++++++------- tests/drift/test_univariate_drift_methods.py | 15 +-- 2 files changed, 71 insertions(+), 41 deletions(-) diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index 34dcb301..059da490 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -701,8 +701,7 @@ def _ecdf(self, vec: np.ndarray): @MethodFactory.register(key='hellinger', feature_type=FeatureType.CONTINUOUS) -@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL) -class HellingerDistance(Method): +class ContinuousHellingerDistance(Method): """Calculates the Hellinger Distance between two distributions.""" def __init__(self, **kwargs) -> None: @@ -724,34 +723,69 @@ def __init__(self, **kwargs) -> None: An optional lower threshold for the performance metric. """ - self._treat_as_type: str self._bins: np.ndarray self._reference_proba_in_bins: np.ndarray def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self: reference_data = _remove_nans(reference_data) - if _column_is_categorical(reference_data): - treat_as_type = 'cat' - else: - n_unique_values = len(np.unique(reference_data)) - len_reference = len(reference_data) - if n_unique_values > 50 or n_unique_values / len_reference > 0.1: - treat_as_type = 'cont' - else: - treat_as_type = 'cat' + len_reference = len(reference_data) - if treat_as_type == 'cont': - bins = np.histogram_bin_edges(reference_data, bins='doane') - reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference - self._bins = bins - self._reference_proba_in_bins = reference_proba_in_bins - else: - reference_unique, reference_counts = np.unique(reference_data, return_counts=True) - reference_proba_per_unique = reference_counts / len(reference_data) - self._bins = reference_unique - self._reference_proba_in_bins = reference_proba_per_unique + bins = np.histogram_bin_edges(reference_data, bins='doane') + reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference + self._bins = bins + self._reference_proba_in_bins = reference_proba_in_bins + + return self + + def _calculate(self, data: pd.Series): + data = _remove_nans(data) + if data.empty: + return np.nan + reference_proba_in_bins = copy(self._reference_proba_in_bins) + data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len(data) + + leftover = 1 - np.sum(data_proba_in_bins) + if leftover > 0: + data_proba_in_bins = np.append(data_proba_in_bins, leftover) + reference_proba_in_bins = np.append(reference_proba_in_bins, 0) + + distance = np.sqrt(np.sum((np.sqrt(reference_proba_in_bins) - np.sqrt(data_proba_in_bins)) ** 2)) / np.sqrt(2) + + return distance + +@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL) +class CategoricalHellingerDistance(Method): + """Calculates the Hellinger Distance between two distributions.""" + + def __init__(self, **kwargs) -> None: + """Initialize Hellinger Distance method.""" + super().__init__( + display_name='Hellinger distance', + column_name='hellinger', + lower_threshold_limit=0, + **kwargs, + ) + """ + Parameters + ---------- + display_name : str, default='Hellinger distance' + The name of the metric. Used to display in plots. + column_name: str, default='hellinger' + The name used to indicate the metric in columns of a DataFrame. + lower_threshold_limit : float, default=0 + An optional lower threshold for the performance metric. + """ - self._treat_as_type = treat_as_type + self._bins: np.ndarray + self._reference_proba_in_bins: np.ndarray + + def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self: + reference_data = _remove_nans(reference_data) + + reference_unique, reference_counts = np.unique(reference_data, return_counts=True) + reference_proba_per_unique = reference_counts / len(reference_data) + self._bins = reference_unique + self._reference_proba_in_bins = reference_proba_per_unique return self @@ -760,15 +794,11 @@ def _calculate(self, data: pd.Series): if data.empty: return np.nan reference_proba_in_bins = copy(self._reference_proba_in_bins) - if self._treat_as_type == 'cont': - len_data = len(data) - data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data - - else: - data_unique, data_counts = np.unique(data, return_counts=True) - data_counts_dic = dict(zip(data_unique, data_counts)) - data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins] - data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data) + + data_unique, data_counts = np.unique(data, return_counts=True) + data_counts_dic = dict(zip(data_unique, data_counts)) + data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins] + data_proba_in_bins = np.array(data_count_on_ref_bins) / len(data) leftover = 1 - np.sum(data_proba_in_bins) if leftover > 0: @@ -777,6 +807,5 @@ def _calculate(self, data: pd.Series): distance = np.sqrt(np.sum((np.sqrt(reference_proba_in_bins) - np.sqrt(data_proba_in_bins)) ** 2)) / np.sqrt(2) - del reference_proba_in_bins - return distance + \ No newline at end of file diff --git a/tests/drift/test_univariate_drift_methods.py b/tests/drift/test_univariate_drift_methods.py index 1e40a29e..9639ed2b 100644 --- a/tests/drift/test_univariate_drift_methods.py +++ b/tests/drift/test_univariate_drift_methods.py @@ -5,8 +5,9 @@ from nannyml.chunk import CountBasedChunker, DefaultChunker from nannyml.drift.univariate.methods import ( + CategoricalHellingerDistance, CategoricalJensenShannonDistance, - HellingerDistance, + ContinuousHellingerDistance, ContinuousJensenShannonDistance, KolmogorovSmirnovStatistic, LInfinityDistance, @@ -136,7 +137,7 @@ def test_hellinger_complete_overlap(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.normal(0, 1, 10_000), name='A') analysis = reference - hell_dist = HellingerDistance(chunker=chunker, threshold=threshold).fit(reference).calculate(analysis) + hell_dist = ContinuousHellingerDistance(chunker=chunker, threshold=threshold).fit(reference).calculate(analysis) hell_dist = np.round(hell_dist, 2) assert hell_dist == 0 @@ -145,7 +146,7 @@ def test_hellinger_no_overlap(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.normal(0, 1, 10_000), name='A') analysis = pd.Series(np.random.normal(7, 1, 10_000), name='A') - hell_dist = HellingerDistance(chunker=chunker, threshold=threshold).fit(reference).calculate(analysis) + hell_dist = ContinuousHellingerDistance(chunker=chunker, threshold=threshold).fit(reference).calculate(analysis) hell_dist = np.round(hell_dist, 2) assert hell_dist == 1 @@ -154,7 +155,7 @@ def test_hellinger_both_continuous_analysis_with_small_drift(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.normal(0, 1, 10_000), name='A') analysis = pd.Series(np.random.normal(-2, 1, 10_000), name='A') - hell_dist = HellingerDistance(chunker=chunker, threshold=threshold).fit(reference).calculate(analysis) + hell_dist = ContinuousHellingerDistance(chunker=chunker, threshold=threshold).fit(reference).calculate(analysis) hell_dist = np.round(hell_dist, 2) assert hell_dist == 0.63 @@ -163,7 +164,7 @@ def test_hellinger_for_quasi_continuous(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.choice(np.linspace(0, 2, 6), 10_000), name='A') analysis = pd.Series(np.random.choice(np.linspace(0, 2, 3), 1000), name='A') - hell_dist = HellingerDistance(chunker=chunker, threshold=threshold) + hell_dist = ContinuousHellingerDistance(chunker=chunker, threshold=threshold) hell_dist.fit(reference) distance = hell_dist.calculate(analysis) assert np.round(distance, 2) == 0.72 @@ -173,7 +174,7 @@ def test_hellinger_for_categorical(): # noqa: D103 np.random.seed(1) reference = pd.Series(np.random.choice(['a', 'b', 'c', 'd'], 10_000), name='A') analysis = pd.Series(np.random.choice(['a', 'b', 'c', 'e'], 1000), name='A') - hell_dist = HellingerDistance(chunker=chunker, threshold=threshold) + hell_dist = CategoricalHellingerDistance(chunker=chunker, threshold=threshold) hell_dist.fit(reference) distance = hell_dist.calculate(analysis) assert np.round(distance, 2) == 0.5 @@ -186,7 +187,7 @@ def test_hellinger_for_categorical(): # noqa: D103 LInfinityDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), ContinuousJensenShannonDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), WassersteinDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), - HellingerDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), + ContinuousHellingerDistance(chunker=DefaultChunker(), threshold=ConstantThreshold(lower=-1, upper=None)), ], ) def test_method_logs_warning_when_lower_threshold_is_overridden_by_metric_limits(caplog, method): # noqa: D103 From 336a056563db79bf1b77093ada4037312c5d1924 Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Wed, 3 Jul 2024 20:07:48 +0100 Subject: [PATCH 5/5] Linting --- nannyml/drift/univariate/calculator.py | 21 ++++++++-------- nannyml/drift/univariate/methods.py | 13 +++++----- tests/drift/test_drift.py | 35 ++++++++++++-------------- 3 files changed, 33 insertions(+), 36 deletions(-) diff --git a/nannyml/drift/univariate/calculator.py b/nannyml/drift/univariate/calculator.py index ce764962..40629597 100644 --- a/nannyml/drift/univariate/calculator.py +++ b/nannyml/drift/univariate/calculator.py @@ -264,8 +264,9 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> UnivariateDrift _list_missing(self.column_names, reference_data) - - self.continuous_column_names, self.categorical_column_names = self._split_continuous_and_categorical(reference_data) + self.continuous_column_names, self.categorical_column_names = self._split_continuous_and_categorical( + reference_data + ) timestamps = reference_data[self.timestamp_column_name] if self.timestamp_column_name else None for column_name in self.continuous_column_names: @@ -394,7 +395,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: self.result.analysis_data = data.copy() return self.result - + def _split_continuous_and_categorical(self, data: pd.DataFrame) -> Tuple[List[str], List[str]]: """Splits the features in the data set into continuous and categorical features.""" treat_as_numerical_set, treat_as_categorical_set = set(self.treat_as_numerical), set(self.treat_as_categorical) @@ -404,22 +405,20 @@ def _split_continuous_and_categorical(self, data: pd.DataFrame) -> Tuple[List[st treat_as_numerical_set = treat_as_numerical_set - invalid_continuous_column_names if invalid_continuous_column_names: self._logger.info( - f"ignoring 'treat_as_numerical' values {list(invalid_continuous_column_names)} because they were not in " - f"listed column names" + f"ignoring 'treat_as_numerical' values {list(invalid_continuous_column_names)} because " + f"they were not in listed column names" ) invalid_categorical_column_names = treat_as_categorical_set - column_names_set treat_as_categorical_set = treat_as_categorical_set - invalid_categorical_column_names if invalid_categorical_column_names: self._logger.info( - f"ignoring 'treat_as_categorical' values {list(invalid_categorical_column_names)} because they were not in " - f"listed column names" + f"ignoring 'treat_as_categorical' values {list(invalid_categorical_column_names)} because " + f"they were not in listed column names" ) - + unspecified_columns = column_names_set - treat_as_numerical_set - treat_as_categorical_set - continuous_column_names, categorical_column_names = _split_features_by_type( - data, unspecified_columns - ) + continuous_column_names, categorical_column_names = _split_features_by_type(data, unspecified_columns) continuous_column_names = continuous_column_names + list(treat_as_numerical_set) categorical_column_names = categorical_column_names + list(treat_as_categorical_set) diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index 059da490..fe847fa0 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -29,7 +29,7 @@ from scipy.stats import chi2_contingency, ks_2samp, wasserstein_distance from nannyml._typing import Self -from nannyml.base import _column_is_categorical, _remove_nans +from nannyml.base import _remove_nans from nannyml.chunk import Chunker from nannyml.exceptions import InvalidArgumentsException, NotFittedException from nannyml.thresholds import Threshold, calculate_threshold_values @@ -290,7 +290,7 @@ def _calculate(self, data: pd.Series): data = _remove_nans(data) if data.empty: return np.nan - + len_data = len(data) data_proba_in_bins = np.histogram(data, bins=self._bins)[0] / len_data @@ -346,7 +346,7 @@ def _calculate(self, data: pd.Series): data = _remove_nans(data) if data.empty: return np.nan - + data_unique, data_counts = np.unique(data, return_counts=True) data_counts_dic = dict(zip(data_unique, data_counts)) data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins] @@ -361,6 +361,7 @@ def _calculate(self, data: pd.Series): return distance + @MethodFactory.register(key='kolmogorov_smirnov', feature_type=FeatureType.CONTINUOUS) class KolmogorovSmirnovStatistic(Method): """Calculates the Kolmogorov-Smirnov d-stat. @@ -734,7 +735,7 @@ def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None reference_proba_in_bins = np.histogram(reference_data, bins=bins)[0] / len_reference self._bins = bins self._reference_proba_in_bins = reference_proba_in_bins - + return self def _calculate(self, data: pd.Series): @@ -753,6 +754,7 @@ def _calculate(self, data: pd.Series): return distance + @MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL) class CategoricalHellingerDistance(Method): """Calculates the Hellinger Distance between two distributions.""" @@ -794,7 +796,7 @@ def _calculate(self, data: pd.Series): if data.empty: return np.nan reference_proba_in_bins = copy(self._reference_proba_in_bins) - + data_unique, data_counts = np.unique(data, return_counts=True) data_counts_dic = dict(zip(data_unique, data_counts)) data_count_on_ref_bins = [data_counts_dic[key] if key in data_counts_dic else 0 for key in self._bins] @@ -808,4 +810,3 @@ def _calculate(self, data: pd.Series): distance = np.sqrt(np.sum((np.sqrt(reference_proba_in_bins) - np.sqrt(data_proba_in_bins)) ** 2)) / np.sqrt(2) return distance - \ No newline at end of file diff --git a/tests/drift/test_drift.py b/tests/drift/test_drift.py index 735e3b28..b066a84c 100644 --- a/tests/drift/test_drift.py +++ b/tests/drift/test_drift.py @@ -14,13 +14,13 @@ from nannyml._typing import Key, Result, Self from nannyml.base import Abstract1DResult, AbstractCalculator from nannyml.chunk import CountBasedChunker, DefaultChunker, PeriodBasedChunker, SizeBasedChunker +from nannyml.datasets import load_synthetic_car_loan_dataset from nannyml.drift.multivariate.data_reconstruction import DataReconstructionDriftCalculator from nannyml.drift.multivariate.domain_classifier import DomainClassifierCalculator from nannyml.drift.univariate.calculator import DEFAULT_THRESHOLDS, UnivariateDriftCalculator from nannyml.exceptions import InvalidArgumentsException from nannyml.performance_estimation.confidence_based import CBPE from nannyml.thresholds import ConstantThreshold, StandardDeviationThreshold -from nannyml.datasets import load_synthetic_car_loan_dataset @pytest.fixture(scope="module") @@ -183,9 +183,7 @@ def test_base_drift_calculator_uses_default_chunker_when_no_chunker_specified(sa @pytest.mark.parametrize('column_names, expected', [('f1', ['f1']), (['f1', 'f2'], ['f1', 'f2'])]) -def test_univariate_drift_calculator_create_with_single_or_list_of_column_names( # noqa: D103 - column_names, expected -): +def test_univariate_drift_calculator_create_with_single_or_list_of_column_names(column_names, expected): # noqa: D103 calc = UnivariateDriftCalculator( column_names=column_names, timestamp_column_name='timestamp', @@ -264,9 +262,7 @@ def test_univariate_drift_calculator_treat_as_categorical_for_continuous_column( assert sorted(calc.categorical_column_names) == expected_categorical -def test_univariate_drift_calculator_treat_as_categorical_for_categorical_column( # noqa: D103 - sample_drift_data -): +def test_univariate_drift_calculator_treat_as_categorical_for_categorical_column(sample_drift_data): # noqa: D103 calc = UnivariateDriftCalculator( column_names=['f1', 'f2', 'f3', 'f4'], treat_as_categorical='f3', @@ -281,9 +277,7 @@ def test_univariate_drift_calculator_treat_as_categorical_for_categorical_column assert sorted(calc.categorical_column_names) == expected_categorical -def test_univariate_drift_calculator_treat_as_for_non_existing_column( # noqa: D103 - sample_drift_data, caplog -): +def test_univariate_drift_calculator_treat_as_for_non_existing_column(sample_drift_data, caplog): # noqa: D103 caplog.set_level(logging.INFO) calc = UnivariateDriftCalculator( @@ -300,8 +294,13 @@ def test_univariate_drift_calculator_treat_as_for_non_existing_column( # noqa: assert sorted(calc.continuous_column_names) == expected_continuous assert sorted(calc.categorical_column_names) == expected_categorical - assert "ignoring 'treat_as_categorical' values ['foo'] because they were not in listed column names" in caplog.messages - assert "ignoring 'treat_as_numerical' values ['bar'] because they were not in listed column names" in caplog.messages + assert ( + "ignoring 'treat_as_categorical' values ['foo'] because they were not in listed column names" in caplog.messages + ) + assert ( + "ignoring 'treat_as_numerical' values ['bar'] because they were not in listed column names" in caplog.messages + ) + def test_univariate_drift_calculator_without_custom_thresholds(): # noqa: D103 sut = UnivariateDriftCalculator( @@ -602,7 +601,7 @@ def test_base_drift_calculator_given_non_empty_features_list_should_only_calcula # See https://github.com/NannyML/nannyml/issues/192 def test_univariate_drift_calculator_returns_distinct_but_consistent_results_when_reused( # noqa: D103 - sample_drift_data + sample_drift_data, ): ref_data = sample_drift_data.loc[sample_drift_data['period'] == 'reference'] sut = UnivariateDriftCalculator( @@ -871,7 +870,7 @@ def test_input_dataframes_are_not_altered_by_univ_calculator(): # noqa: D103 'loan_length', 'repaid_loan_on_prev_car', 'size_of_downpayment', - 'driver_tenure' + 'driver_tenure', ] calc = UnivariateDriftCalculator( column_names=feature_column_names, @@ -897,7 +896,7 @@ def test_input_dataframes_are_not_altered_by_dre_calculator(): # noqa: D103 'loan_length', 'repaid_loan_on_prev_car', 'size_of_downpayment', - 'driver_tenure' + 'driver_tenure', ] calc = DataReconstructionDriftCalculator( column_names=feature_column_names, @@ -920,12 +919,10 @@ def test_input_dataframes_are_not_altered_by_dc_calculator(): # noqa: D103 'loan_length', 'repaid_loan_on_prev_car', 'size_of_downpayment', - 'driver_tenure' + 'driver_tenure', ] calc = DomainClassifierCalculator( - feature_column_names=feature_column_names, - timestamp_column_name='timestamp', - chunk_number=1 + feature_column_names=feature_column_names, timestamp_column_name='timestamp', chunk_number=1 ) calc.fit(reference2) results = calc.calculate(monitored2) # noqa: F841