From 3544688def0957b7ef234f82a9451c51b88e412b Mon Sep 17 00:00:00 2001 From: dev-rinchin Date: Tue, 25 Jul 2023 11:29:35 +0000 Subject: [PATCH 1/2] init addon-matching --- examples/demo14.py | 20 + examples/tutorials/Tutorial_12_Matching.ipynb | 619 +++++++++++++ lightautoml/addons/hypex/__init__.py | 3 + .../addons/hypex/algorithms/faiss_matcher.py | 828 ++++++++++++++++++ lightautoml/addons/hypex/matcher.py | 432 +++++++++ lightautoml/addons/hypex/readme.md | 8 + .../hypex/selectors/lama_feature_selector.py | 120 +++ .../addons/hypex/selectors/outliers_filter.py | 82 ++ .../addons/hypex/selectors/spearman_filter.py | 70 ++ lightautoml/addons/hypex/utils/metrics.py | 159 ++++ lightautoml/addons/hypex/utils/psi_pandas.py | 494 +++++++++++ lightautoml/addons/hypex/utils/validators.py | 91 ++ 12 files changed, 2926 insertions(+) create mode 100644 examples/demo14.py create mode 100644 examples/tutorials/Tutorial_12_Matching.ipynb create mode 100644 lightautoml/addons/hypex/__init__.py create mode 100644 lightautoml/addons/hypex/algorithms/faiss_matcher.py create mode 100644 lightautoml/addons/hypex/matcher.py create mode 100644 lightautoml/addons/hypex/readme.md create mode 100644 lightautoml/addons/hypex/selectors/lama_feature_selector.py create mode 100644 lightautoml/addons/hypex/selectors/outliers_filter.py create mode 100644 lightautoml/addons/hypex/selectors/spearman_filter.py create mode 100644 lightautoml/addons/hypex/utils/metrics.py create mode 100644 lightautoml/addons/hypex/utils/psi_pandas.py create mode 100644 lightautoml/addons/hypex/utils/validators.py diff --git a/examples/demo14.py b/examples/demo14.py new file mode 100644 index 00000000..e103d369 --- /dev/null +++ b/examples/demo14.py @@ -0,0 +1,20 @@ +import pandas as pd + +from lightautoml.addons.hypex import Matcher + + +df = pd.read_csv("data/sampled_matching.csv").drop(["Unnamed: 0"], axis=1) + +print(df.shape) +print(df.columns) + +target = "created_variable" +treatment = "is_tb_pilot" + + +matcher = Matcher(df, target, treatment, is_feature_select=False, quality_check=True) + +matcher.estimate() + +print(matcher.matcher.ATE) +print(matcher.matcher.quality_dict) diff --git a/examples/tutorials/Tutorial_12_Matching.ipynb b/examples/tutorials/Tutorial_12_Matching.ipynb new file mode 100644 index 00000000..a801279a --- /dev/null +++ b/examples/tutorials/Tutorial_12_Matching.ipynb @@ -0,0 +1,619 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# How to match? " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 0. Import libraries " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "'nlp' extra dependecy package 'gensim' isn't installed. Look at README.md in repo 'LightAutoML' for installation instructions.\n", + "'nlp' extra dependecy package 'nltk' isn't installed. Look at README.md in repo 'LightAutoML' for installation instructions.\n", + "'nlp' extra dependecy package 'transformers' isn't installed. Look at README.md in repo 'LightAutoML' for installation instructions.\n", + "'nlp' extra dependecy package 'gensim' isn't installed. Look at README.md in repo 'LightAutoML' for installation instructions.\n", + "'nlp' extra dependecy package 'nltk' isn't installed. Look at README.md in repo 'LightAutoML' for installation instructions.\n", + "'nlp' extra dependecy package 'transformers' isn't installed. Look at README.md in repo 'LightAutoML' for installation instructions.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/20224761/PycharmProjects/matcher/lightautoml/ml_algo/dl_model.py:41: UserWarning: 'transformers' - package isn't installed\n", + " warnings.warn(\"'transformers' - package isn't installed\")\n", + "/Users/20224761/PycharmProjects/matcher/lightautoml/text/nn_model.py:22: UserWarning: 'transformers' - package isn't installed\n", + " warnings.warn(\"'transformers' - package isn't installed\")\n", + "/Users/20224761/PycharmProjects/matcher/lightautoml/text/dl_transformers.py:25: UserWarning: 'transformers' - package isn't installed\n", + " warnings.warn(\"'transformers' - package isn't installed\")\n" + ] + } + ], + "source": [ + "import pandas as pd\n", + "import warnings\n", + "import numpy as np\n", + "from lightautoml.addons.hypex import Matcher\n", + "\n", + "warnings.filterwarnings('ignore')\n", + "%config Completer.use_jedi = False" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Create or upload your dataset \n", + "In this case we will create random dataset with known effect size \n", + "If you have your own dataset, go to the part 2 \n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " user_id signup_month month spend treat\n0 0 0 1 496 False\n1 0 0 2 488 False\n2 0 0 3 487 False\n3 0 0 4 457 False\n4 0 0 5 431 False", + "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
user_idsignup_monthmonthspendtreat
0001496False
1002488False
2003487False
3004457False
4005431False
\n
" + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Simulating dataset with known effect size\n", + "num_users = 10000\n", + "num_months = 12\n", + "\n", + "signup_months = np.random.choice(np.arange(1, num_months), num_users) * np.random.randint(0,2, size=num_users) # signup_months == 0 means customer did not sign up\n", + "df = pd.DataFrame({\n", + " 'user_id': np.repeat(np.arange(num_users), num_months),\n", + " 'signup_month': np.repeat(signup_months, num_months), # signup month == 0 means customer did not sign up\n", + " 'month': np.tile(np.arange(1, num_months+1), num_users), # months are from 1 to 12\n", + " 'spend': np.random.poisson(500, num_users*num_months) #np.random.beta(a=2, b=5, size=num_users * num_months)*1000 # centered at 500\n", + "})\n", + "# A customer is in the treatment group if and only if they signed up\n", + "df[\"treat\"] = df[\"signup_month\"]>0\n", + "# Simulating an effect of month (monotonically decreasing--customers buy less later in the year)\n", + "df[\"spend\"] = df[\"spend\"] - df[\"month\"]*10\n", + "# Simulating a simple treatment effect of 100\n", + "after_signup = (df[\"signup_month\"] < df[\"month\"]) & (df[\"treat\"])\n", + "df.loc[after_signup,\"spend\"] = df[after_signup][\"spend\"] + 100\n", + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " user_id signup_month treat pre_spends post_spends\n0 0 0 False 492.0 416.444444\n1 2 0 False 500.5 412.111111\n2 4 0 False 480.0 419.888889\n3 6 0 False 488.5 433.111111\n4 7 0 False 467.0 425.777778\n... ... ... ... ... ...\n5409 9995 0 False 457.0 407.222222\n5410 9996 0 False 477.5 443.333333\n5411 9997 0 False 483.5 416.000000\n5412 9998 0 False 505.5 424.000000\n5413 9999 0 False 504.5 402.777778\n\n[5414 rows x 5 columns]", + "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
user_idsignup_monthtreatpre_spendspost_spends
000False492.0416.444444
120False500.5412.111111
240False480.0419.888889
360False488.5433.111111
470False467.0425.777778
..................
540999950False457.0407.222222
541099960False477.5443.333333
541199970False483.5416.000000
541299980False505.5424.000000
541399990False504.5402.777778
\n

5414 rows × 5 columns

\n
" + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Setting the signup month (for ease of analysis)\n", + "i = 3\n", + "df_i_signupmonth = (\n", + " df[df.signup_month.isin([0, i])]\n", + " .groupby([\"user_id\", \"signup_month\", \"treat\"])\n", + " .apply(\n", + " lambda x: pd.Series(\n", + " {\n", + " \"pre_spends\": x.loc[x.month < i, \"spend\"].mean(),\n", + " \"post_spends\": x.loc[x.month > i, \"spend\"].mean(),\n", + " }\n", + " )\n", + " )\n", + " .reset_index()\n", + ")\n", + "df_i_signupmonth" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " user_id signup_month treat pre_spends post_spends age is_male \\\n0 0 0 0 492.0 416.444444 21 1 \n1 2 0 0 500.5 412.111111 65 0 \n2 4 0 0 480.0 419.888889 63 1 \n3 6 0 0 488.5 433.111111 51 1 \n4 7 0 0 467.0 425.777778 64 1 \n\n industry \n0 2 \n1 1 \n2 1 \n3 2 \n4 2 ", + "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
user_idsignup_monthtreatpre_spendspost_spendsageis_maleindustry
0000492.0416.4444442112
1200500.5412.1111116501
2400480.0419.8888896311
3600488.5433.1111115112
4700467.0425.7777786412
\n
" + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Additional category features\n", + "gender = np.random.choice(a=[0,1], size=df_i_signupmonth.user_id.nunique())\n", + "age = np.random.choice(a=range(18, 70), size=df_i_signupmonth.user_id.nunique())\n", + "industry = np.random.choice(a=range(1, 3), size=df_i_signupmonth.user_id.nunique())\n", + "df_i_signupmonth['age'] = age\n", + "df_i_signupmonth['is_male'] = gender\n", + "df_i_signupmonth['industry'] = industry\n", + "df_i_signupmonth['industry'] = df_i_signupmonth['industry'].astype('str')\n", + "df_i_signupmonth['treat'] = df_i_signupmonth['treat'].astype(int)\n", + "df_i_signupmonth.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": "Index(['user_id', 'signup_month', 'treat', 'pre_spends', 'post_spends', 'age',\n 'is_male', 'industry'],\n dtype='object')" + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df_i_signupmonth.columns" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Matching \n", + "### 2.0 Init params\n", + "info_col used to define informative attributes that should not be part of matching, such as user_id \n", + "But to explicitly store this column in the table, so that you can compare directly after computation" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "info_col = ['user_id', 'signup_month']\n", + "\n", + "outcome = 'post_spends'\n", + "treatment = 'treat'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2.1 Simple matching\n", + "This is the easiest way to initialize and calculate metrics on a Matching task \n", + "Use it when you are clear about each attribute or if you don't have any additional task conditions (Strict equality for certain features) " + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " 0%| | 0/5414 [00:00\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
FeatureImportance
0pre_spends38600.570068
3industry38600.570068
1age29652.719604
2is_male2970.189941
\n" + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "features_importance = model.lama_feature_select()\n", + "features_importance" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "features = features_importance['Feature'].to_list()" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " 0%| | 0/4 [00:00\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
indexpre_spendsageis_maleindustrypre_spends_matchedage_matchedis_male_matchedindex_matchedindustry_matchedpost_spendspost_spends_matchedpost_spends_matched_biastreattreat_matched
04985472.04801471.548.00.0[1864]1522.444444421.888889100.54975310
14986494.03801493.538.00.0[1784]1515.444444423.22222292.21641910
24987492.54101493.041.00.0[927]1529.666667424.111111105.56135910
34988479.05911478.059.00.0[1128]1513.333333417.77777895.54395010
44989490.55201490.052.00.0[2196]1515.555556413.333333102.21641910
................................................
49804980502.54502501.542.01.0[310]2420.333333523.666667103.20643001
49814981477.54912478.048.01.0[244]2421.555556521.22222299.64277701
49824982488.06912488.569.00.0[233]2403.222222519.555556116.34438001
49834983505.54012503.039.01.0[376]2424.000000533.222222109.13205101
49844984504.52612504.528.00.0[347]2402.777778504.444444101.73654001
\n

5414 rows × 15 columns

\n" + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df_matched" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Results \n", + "### 3.1 ATE, ATT, ATC" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " effect_size std_err p-val ci_lower ci_upper\nATE 100.972968 0.580032 0.0 99.836106 102.109830\nATC 100.963662 0.589903 0.0 99.807452 102.119872\nATT 101.081102 0.693178 0.0 99.722472 102.439732", + "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
effect_sizestd_errp-valci_lowerci_upper
ATE100.9729680.5800320.099.836106102.109830
ATC100.9636620.5899030.099.807452102.119872
ATT101.0811020.6931780.099.722472102.439732
\n
" + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# model.matcher.results\n", + "results" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 3.2 SMD, PSI, KS-test, repeats" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": "{'psi': column_treated anomaly_score_treated check_result_treated \\\n 0 age_treated 0.00 OK \n 1 industry_treated 0.00 OK \n 2 post_spends_treated 16.11 NOK \n 3 pre_spends_treated 0.01 OK \n \n column_untreated anomaly_score_untreated check_result_untreated \n 0 age_untreated 0.01 OK \n 1 industry_untreated 0.00 OK \n 2 post_spends_untreated 8.27 NOK \n 3 pre_spends_untreated 0.01 OK ,\n 'ks_test': match_control_to_treat match_treat_to_control\n age 1.000000 0.419063\n is_male 0.320575 0.070995\n pre_spends 1.000000 0.510057,\n 'smd': match_control_to_treat match_treat_to_control\n age 0.004089 0.001075\n is_male 0.010479 0.040518\n pre_spends 0.002626 0.001597,\n 'repeats': {'match_control_to_treat': 0.93, 'match_treat_to_control': 0.08}}" + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# matching quality result - SMD\n", + "model.quality_result" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " column_treated anomaly_score_treated check_result_treated \\\n0 age_treated 0.00 OK \n1 industry_treated 0.00 OK \n2 post_spends_treated 16.11 NOK \n3 pre_spends_treated 0.01 OK \n\n column_untreated anomaly_score_untreated check_result_untreated \n0 age_untreated 0.01 OK \n1 industry_untreated 0.00 OK \n2 post_spends_untreated 8.27 NOK \n3 pre_spends_untreated 0.01 OK ", + "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
column_treatedanomaly_score_treatedcheck_result_treatedcolumn_untreatedanomaly_score_untreatedcheck_result_untreated
0age_treated0.00OKage_untreated0.01OK
1industry_treated0.00OKindustry_untreated0.00OK
2post_spends_treated16.11NOKpost_spends_untreated8.27NOK
3pre_spends_treated0.01OKpre_spends_untreated0.01OK
\n
" + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# matching quality result - PSI\n", + "model.quality_result['psi']" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " match_control_to_treat match_treat_to_control\nage 1.000000 0.419063\nis_male 0.320575 0.070995\npre_spends 1.000000 0.510057", + "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
match_control_to_treatmatch_treat_to_control
age1.0000000.419063
is_male0.3205750.070995
pre_spends1.0000000.510057
\n
" + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# matching quality result - KS-test\n", + "\n", + "model.quality_result['ks_test']" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": "{'match_control_to_treat': 0.93, 'match_treat_to_control': 0.08}" + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# matching quality result - repeats\n", + "model.quality_result['repeats']" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 3.3 Validation\n", + "validate result with placebo treatment or random feature or random subset\n", + "our new effect size is close to zero (it should be)" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": " 0%| | 0/10 [00:00 dict: + """Prepare the object for serialization. + + This method is called when the object is about to be serialized. + It removes the `tqdm` attribute from the object's dictionary + because `tqdm` objects cannot be serialized. + + Returns: + dict: A copy of the object's dictionary with the `tqdm` attribute removed. + """ + state = self.__dict__.copy() + if "tqdm" in state: + del state["tqdm"] + return state + + def __setstate__(self, state: dict): + """Restore the object after deserialization. + + This method is called when the object is deserialized. + It adds the `tqdm` attribute back to the object's dictionary + if the `pbar` attribute is True. + + Args: + state: dict + The deserialized state of the object + """ + if "pbar" in state and state["pbar"]: + state["tqdm"] = None + self.__dict__.update(state) + + def _get_split(self, df: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame): + """Creates split data by treatment column. + + Separate treatment column with 1 (treated) an 0 (untreated), + scales and transforms treatment column + + Args: + df: pd.DataFrame + The input dataframe + + Returns: + tuple: Tuple of dataframes - one for treated (df[self.treatment] == 1]) and + one for untreated (df[self.treatment] == 0]). Drops self.outcomes and + `self.treatment` columns + + """ + logger.debug("Creating split data by treatment column") + + treated = df[df[self.treatment] == 1].drop([self.outcomes, self.treatment], axis=1) + untreated = df[df[self.treatment] == 0].drop([self.outcomes, self.treatment], axis=1) + + return treated, untreated + + def _predict_outcome(self, std_treated: pd.DataFrame, std_untreated: pd.DataFrame): + """Applies LinearRegression to input arrays. + + Calculate biases of treated and untreated values, + creates dict of y - regular, matched and without bias. + + Args: + std_treated: pd.DataFrame + The dataframe of treated data + std_untreated: pd.DataFrame + The dataframe of untreated data + + """ + logger.debug("Predicting target by Linear Regression") + + start_time = dt.datetime.now() + logger.debug("start --") + + self.dict_outcome_untreated = {} + self.dict_outcome_treated = {} + df = self.df.drop(columns=self.info_col) + + for outcome in [self.outcomes]: + y_untreated = df[df[self.treatment] == 0][outcome].to_numpy() + y_treated = df[df[self.treatment] == 1][outcome].to_numpy() + + x_treated = std_treated.to_numpy() + x_untreated = std_untreated.to_numpy() + y_match_treated = np.array([y_untreated[idx].mean() for idx in self.treated_index]) + y_match_untreated = np.array([y_treated[idx].mean() for idx in self.untreated_index]) + x_match_treated = np.array([x_untreated[idx].mean(0) for idx in self.treated_index]) + x_match_untreated = np.array([x_treated[idx].mean(0) for idx in self.untreated_index]) + bias_coefs_c = bias_coefs(self.untreated_index, y_treated, x_treated) + bias_coefs_t = bias_coefs(self.treated_index, y_untreated, x_untreated) + bias_c = bias(x_untreated, x_match_untreated, bias_coefs_c) + bias_t = bias(x_treated, x_match_treated, bias_coefs_t) + + y_match_treated_bias = y_treated - y_match_treated + bias_t + y_match_untreated_bias = y_match_untreated - y_untreated - bias_c + + self.dict_outcome_untreated[outcome] = y_untreated + self.dict_outcome_untreated[outcome + POSTFIX] = y_match_untreated + self.dict_outcome_untreated[outcome + POSTFIX_BIAS] = y_match_untreated_bias + + self.dict_outcome_treated[outcome] = y_treated + self.dict_outcome_treated[outcome + POSTFIX] = y_match_treated + self.dict_outcome_treated[outcome + POSTFIX_BIAS] = y_match_treated_bias + + end_time = dt.datetime.now() + total = dt.datetime.strptime(str(end_time - start_time), "%H:%M:%S.%f").strftime("%H:%M:%S") + logger.debug(f"end -- [work time{total}]") + + def _create_outcome_matched_df(self, dict_outcome: dict, is_treated: bool) -> pd.DataFrame: + """Creates dataframe with outcomes values and treatment. + + Args: + dict_outcome: dict + A dictionary containing outcomes + is_treated: bool + A boolean value indicating whether the outcome is treated or not + + Returns: + pd.DataFrame: A dataframe with matched outcome and treatment columns + + """ + df_pred = pd.DataFrame(dict_outcome) + df_pred[self.treatment] = int(is_treated) + df_pred[self.treatment + POSTFIX] = int(not is_treated) + + return df_pred + + def _create_features_matched_df(self, index: np.ndarray, is_treated: bool) -> pd.DataFrame: + """Creates matched dataframe with features. + + Args: + index: np.ndarray + An array of indices + is_treated: bool + A boolean value indicating whether the outcome is treated or not + + + Returns: + pd.DataFrame: A dataframe of matched features + + """ + df = self.df.drop(columns=[self.outcomes] + self.info_col) + + if self.group_col is None: + filtered = df.loc[df[self.treatment] == int(not is_treated)].values + untreated_df = pd.DataFrame( + data=np.array([filtered[idx].mean(axis=0) for idx in index]), columns=df.columns + ) + untreated_df["index"] = pd.Series(list(index)) + treated_df = df[df[self.treatment] == int(is_treated)].reset_index() + else: + filtered = df.loc[df[self.treatment] == int(not is_treated)] + cols_untreated = [col for col in filtered.columns if col != self.group_col] + filtered = filtered.drop(columns=self.group_col).to_numpy() + untreated_df = pd.DataFrame( + data=np.array([filtered[idx].mean(axis=0) for idx in index]), columns=cols_untreated + ) + untreated_df["index"] = pd.Series(list(index)) + treated_df = df[df[self.treatment] == int(is_treated)].reset_index() + grp = treated_df[self.group_col] + untreated_df[self.group_col] = grp + untreated_df.columns = [col + POSTFIX for col in untreated_df.columns] + + x = pd.concat([treated_df, untreated_df], axis=1).drop( + columns=[self.treatment, self.treatment + POSTFIX], axis=1 + ) + return x + + def _create_matched_df(self) -> pd.DataFrame: + """Creates matched df of features and outcome. + + Return: + pd.DataFrame: Matched dataframe + """ + df_pred_treated = self._create_outcome_matched_df(self.dict_outcome_treated, True) + df_pred_untreated = self._create_outcome_matched_df(self.dict_outcome_untreated, False) + + df_matched = pd.concat([df_pred_treated, df_pred_untreated]) + + treated_x = self._create_features_matched_df(self.treated_index, True) + untreated_x = self._create_features_matched_df(self.untreated_index, False) + + untreated_x = pd.concat([treated_x, untreated_x]) + + columns = list(untreated_x.columns) + list(df_matched.columns) + + df_matched = pd.concat([untreated_x, df_matched], axis=1, ignore_index=True) + df_matched.columns = columns + + return df_matched + + def calc_atc(self, df: pd.DataFrame, outcome: str) -> tuple: + """Calculates Average Treatment Effect for the control group (ATC). + + Effect on control group if it was affected + + Args: + df: pd.DataFrame + Input dataframe + outcome: str + The outcome to be considered for treatment effect + + Returns: + tuple: Contains ATC, scaled counts, and variances as numpy arrays + + """ + logger.debug("Calculating ATC") + + df = df[df[self.treatment] == 0] + N_c = len(df) + ITT_c = df[outcome + POSTFIX_BIAS] + scaled_counts_c = scaled_counts(N_c, self.treated_index, self.silent) + + vars_c = np.repeat(ITT_c.var(), N_c) # conservative + atc = ITT_c.mean() + + return atc, scaled_counts_c, vars_c + + def calc_att(self, df: pd.DataFrame, outcome: str) -> tuple: + """Calculates Average Treatment Effect for the treated (ATT). + + Args: + df: pd.DataFrame + Input dataframe + outcome: str + The outcome to be considered for treatment effect + + Returns: + tuple: Contains ATT, scaled counts, and variances as numpy arrays + + """ + logger.debug("Calculating ATT") + + df = df[df[self.treatment] == 1] + N_t = len(df) + ITT_t = df[outcome + POSTFIX_BIAS] + scaled_counts_t = scaled_counts(N_t, self.untreated_index, self.silent) + + vars_t = np.repeat(ITT_t.var(), N_t) # conservative + att = ITT_t.mean() + + return att, scaled_counts_t, vars_t + + def _calculate_ate_all_target(self, df: pd.DataFrame): + """Creates dictionaries of all effect: ATE, ATC, ATT. + + Args: + df: pd.DataFrame + Input dataframe + + Returns: + None + + """ + logger.debug("Creating dicts of all effects: ATE, ATC, ATT") + + att_dict = {} + atc_dict = {} + ate_dict = {} + N = len(df) + N_t = df[self.treatment].sum() + N_c = N - N_t + + for outcome in [self.outcomes]: + att, scaled_counts_t, vars_t = self.calc_att(df, outcome) + atc, scaled_counts_c, vars_c = self.calc_atc(df, outcome) + ate = (N_c / N) * atc + (N_t / N) * att + + att_se = calc_att_se(vars_c, vars_t, scaled_counts_c) + atc_se = calc_atc_se(vars_c, vars_t, scaled_counts_t) + ate_se = calc_ate_se(vars_c, vars_t, scaled_counts_c, scaled_counts_t) + + ate_dict[outcome] = [ + ate, + ate_se, + pval_calc(ate / ate_se), + ate - self.sigma * ate_se, + ate + self.sigma * ate_se, + ] + atc_dict[outcome] = [ + atc, + atc_se, + pval_calc(atc / atc_se), + atc - self.sigma * atc_se, + atc + self.sigma * atc_se, + ] + att_dict[outcome] = [ + att, + att_se, + pval_calc(att / att_se), + att - self.sigma * att_se, + att + self.sigma * att_se, + ] + + self.ATE, self.ATC, self.ATT = ate_dict, atc_dict, att_dict + self.val_dict = ate_dict + + def matching_quality(self, df_matched) -> Dict[str, Union[Dict[str, float], float]]: + """Estimated the quality of covariates balance and repeat fraction. + + Calculates population stability index,Standardized mean difference + and Kolmogorov-Smirnov test for numeric values. Returns a dictionary of reports. + + Args: + df_matched: pd.DataFrame + Matched DataFrame to calculate quality + + Returns: + dict: dictionary containing PSI, KS-test, SMD data and repeat fractions + + """ + if self.silent: + logger.debug("Estimating quality of matching") + else: + logger.info("Estimating quality of matching") + + psi_columns = self.columns_match + psi_columns.remove(self.treatment) + psi_data, ks_data, smd_data = matching_quality( + df_matched, self.treatment, sorted(self.features_quality), sorted(psi_columns), self.silent + ) + + rep_dict = { + "match_control_to_treat": check_repeats(np.concatenate(self.treated_index), silent=self.silent), + "match_treat_to_control": check_repeats(np.concatenate(self.untreated_index), silent=self.silent), + } + + self.quality_dict = {"psi": psi_data, "ks_test": ks_data, "smd": smd_data, "repeats": rep_dict} + + rep_df = pd.DataFrame.from_dict(rep_dict, orient="index").rename(columns={0: "value"}) + self.rep_dict = rep_df + + if self.silent: + logger.debug(f"PSI info: \n {psi_data.head(10)} \nshape:{psi_data.shape}") + logger.debug(f"Kolmogorov-Smirnov test info: \n {ks_data.head(10)} \nshape:{ks_data.shape}") + logger.debug(f"Standardised mean difference info: \n {smd_data.head(10)} \nshape:{smd_data.shape}") + logger.debug(f"Repeats info: \n {rep_df.head(10)}") + else: + logger.info(f"PSI info: \n {psi_data.head(10)} \nshape:{psi_data.shape}") + logger.info(f"Kolmogorov-Smirnov test info: \n {ks_data.head(10)} \nshape:{ks_data.shape}") + logger.info(f"Standardised mean difference info: \n {smd_data.head(10)} \nshape:{smd_data.shape}") + logger.info(f"Repeats info: \n {rep_df.head(10)}") + + return self.quality_dict + + def group_match(self): + """Matches the dataframe if it divided by groups. + + Returns: + tuple: A tuple containing the matched dataframe and metrics such as ATE, ATT and ATC + + """ + df = self.df.drop(columns=self.info_col) + groups = sorted(df[self.group_col].unique()) + matches_c = [] + matches_t = [] + group_arr_c = df[df[self.treatment] == 0][self.group_col].to_numpy() + group_arr_t = df[df[self.treatment] == 1][self.group_col].to_numpy() + treat_arr_c = df[df[self.treatment] == 0][self.treatment].to_numpy() + treat_arr_t = df[df[self.treatment] == 1][self.treatment].to_numpy() + + if self.pbar: + self.tqdm = tqdm(total=len(groups) * 2) + + for group in groups: + df_group = df[df[self.group_col] == group] + temp = df_group[self.columns_match + [self.group_col]] + temp = temp.loc[:, (temp != 0).any(axis=0)].drop(columns=self.group_col) + treated, untreated = self._get_split(temp) + + std_treated_np, std_untreated_np = _transform_to_np(treated, untreated) + + if self.pbar: + self.tqdm.set_description(desc=f"Get untreated index by group {group}") + matches_u_i = _get_index(std_treated_np, std_untreated_np, self.n_neighbors) + + if self.pbar: + self.tqdm.update(1) + self.tqdm.set_description(desc=f"Get treated index by group {group}") + matches_t_i = _get_index(std_untreated_np, std_treated_np, self.n_neighbors) + if self.pbar: + self.tqdm.update(1) + self.tqdm.refresh() + + group_mask_c = group_arr_c == group + group_mask_t = group_arr_t == group + matches_c_mask = np.arange(treat_arr_t.shape[0])[group_mask_t] + matches_u_i = [matches_c_mask[i] for i in matches_u_i] + matches_t_mask = np.arange(treat_arr_c.shape[0])[group_mask_c] + matches_t_i = [matches_t_mask[i] for i in matches_t_i] + matches_c.extend(matches_u_i) + matches_t.extend(matches_t_i) + + if self.pbar: + self.tqdm.close() + + self.untreated_index = np.array(matches_c) + self.treated_index = np.array(matches_t) + df_group = df[self.columns_match].drop(columns=self.group_col) + treated, untreated = self._get_split(df_group) + self._predict_outcome(treated, untreated) + df_matched = self._create_matched_df() + self._calculate_ate_all_target(df_matched) + + if self.validation: + return self.val_dict + + return self.report_view(), df_matched + + def match(self): + """Matches the dataframe. + + Returns: + tuple: A tuple containing the matched dataframe and metrics such as ATE, ATT and ATC + + """ + if self.group_col is not None: + return self.group_match() + + df = self.df[self.columns_match] + treated, untreated = self._get_split(df) + + std_treated_np, std_untreated_np = _transform_to_np(treated, untreated) + + if self.pbar: + self.tqdm = tqdm(total=len(std_treated_np) + len(std_untreated_np)) + self.tqdm.set_description(desc="Get untreated index") + + untreated_index = _get_index(std_treated_np, std_untreated_np, self.n_neighbors) + + if self.pbar: + self.tqdm.update(len(std_treated_np)) + self.tqdm.set_description(desc="Get treated index") + treated_index = _get_index(std_untreated_np, std_treated_np, self.n_neighbors) + + if self.pbar: + self.tqdm.update(len(std_untreated_np)) + self.tqdm.refresh() + self.tqdm.close() + + self.untreated_index = untreated_index + self.treated_index = treated_index + + self._predict_outcome(treated, untreated) + + df_matched = self._create_matched_df() + self._calculate_ate_all_target(df_matched) + + if self.validation: + return self.val_dict + + return self.report_view(), df_matched + + def report_view(self) -> pd.DataFrame: + """Formats the ATE, ATC, and ATT results into a Pandas DataFrame for easy viewing. + + Returns: + pd.DataFrame: DataFrame containing ATE, ATC, and ATT results + """ + result = (self.ATE, self.ATC, self.ATT) + self.results = pd.DataFrame( + [list(x.values())[0] for x in result], + columns=["effect_size", "std_err", "p-val", "ci_lower", "ci_upper"], + index=["ATE", "ATC", "ATT"], + ) + return self.results + + +def _get_index(base: np.ndarray, new: np.ndarray, n_neighbors: int) -> np.ndarray: + """Gets array of indexes that match a new array. + + Args: + base: np.ndarray + A numpy array serving as the reference for matching + new: np.ndarray + A numpy array that needs to be matched with the base + n_neighbors: int + The number of neighbors to use for the matching + + Returns: + np.ndarray: An array of indexes containing all neighbours with minimum distance + """ + index = faiss.IndexFlatL2(base.shape[1]) + index.add(base) + dist, indexes = index.search(new, n_neighbors) + map_func = lambda x: np.where(x == x[0])[0] + equal_dist = list(map(map_func, dist)) + f2 = lambda x, y: x[y] + indexes = np.array([f2(i, j) for i, j in zip(indexes, equal_dist)]) + return indexes + + +def _transform_to_np(treated: pd.DataFrame, untreated: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]: + """Transforms df to numpy and transform via Cholesky decomposition. + + Args: + treated: pd.DataFrame + Test subset DataFrame to be transformed + untreated: pd.DataFrame + Control subset DataFrame to be transformed + + Returns: + tuple: A tuple of transformed numpy arrays for treated and untreated data respectively + """ + xc = untreated.to_numpy() + xt = treated.to_numpy() + + cov_c = np.cov(xc, rowvar=False, ddof=0) + cov_t = np.cov(xt, rowvar=False, ddof=0) + cov = (cov_c + cov_t) / 2 + + epsilon = 1e-6 + cov = cov + epsilon * np.eye(cov.shape[0]) + + L = np.linalg.cholesky(cov) + mahalanobis_transform = np.linalg.inv(L) + yc = np.dot(xc, mahalanobis_transform.T) + yt = np.dot(xt, mahalanobis_transform.T) + + return yt.copy(order="C").astype("float32"), yc.copy(order="C").astype("float32") + + +def calc_atx_var(vars_c: np.ndarray, vars_t: np.ndarray, weights_c: np.ndarray, weights_t: np.ndarray) -> float: + """Calculates Average Treatment Effect for the treated (ATT) variance. + + Args: + vars_c: np.ndarray + Control group variance + vars_t: np.ndarray + Treatment group variance + weights_c: np.ndarray + Control group weights + weights_t: np.ndarray + Treatment group weights + + Returns: + float: The calculated ATT variance + + """ + N_c, N_t = len(vars_c), len(vars_t) + summands_c = weights_c ** 2 * vars_c + summands_t = weights_t ** 2 * vars_t + + return summands_t.sum() / N_t ** 2 + summands_c.sum() / N_c ** 2 + + +def calc_atc_se(vars_c: np.ndarray, vars_t: np.ndarray, scaled_counts_t: np.ndarray) -> float: + """Calculates Average Treatment Effect for the control group (ATC) standard error. + + Args: + vars_c: np.ndarray + Control group variance + vars_t: np.ndarray + Treatment group variance + scaled_counts_t: np.ndarray + Scaled counts for treatment group + + Returns: + float: The calculated ATC standard error + """ + N_c, N_t = len(vars_c), len(vars_t) + weights_c = np.ones(N_c) + weights_t = (N_t / N_c) * scaled_counts_t + + var = calc_atx_var(vars_c, vars_t, weights_c, weights_t) + + return np.sqrt(var) + + +def calc_att_se(vars_c: np.ndarray, vars_t: np.ndarray, scaled_counts_c: np.ndarray) -> float: + """Calculates Average Treatment Effect for the treated (ATT) standard error. + + Args: + vars_c: np.ndarray + Control group variance + vars_t: np.ndarray + Treatment group variance + scaled_counts_c: np.ndarray + Scaled counts for control group + + Returns: + float: The calculated ATT standard error + """ + N_c, N_t = len(vars_c), len(vars_t) + weights_c = (N_c / N_t) * scaled_counts_c + weights_t = np.ones(N_t) + + var = calc_atx_var(vars_c, vars_t, weights_c, weights_t) + + return np.sqrt(var) + + +def calc_ate_se(vars_c: np.ndarray, vars_t: np.ndarray, + scaled_counts_c: np.ndarray, scaled_counts_t: np.ndarray) -> float: + """Calculates Average Treatment Effect for the control group (ATC) standard error. + + Args: + vars_c: np.ndarray + Control group variance + vars_t: np.ndarray + Treatment group variance + scaled_counts_c: np.ndarray + Scaled counts for control group + scaled_counts_t: np.ndarray + Scaled counts for treatment group + + Returns: + float: The calculated ATE standard error + """ + N_c, N_t = len(vars_c), len(vars_t) + N = N_c + N_t + weights_c = (N_c / N) * (1 + scaled_counts_c) + weights_t = (N_t / N) * (1 + scaled_counts_t) + + var = calc_atx_var(vars_c, vars_t, weights_c, weights_t) + + return np.sqrt(var) + + +def pval_calc(z): + """Calculates p-value of the normal cumulative distribution function based on z. + + Args: + z: float + The z-score for which the p-value is calculated + + Returns: + float: The calculated p-value rounded to 2 decimal places + + """ + return round(2 * (1 - norm.cdf(abs(z))), 2) + + +def scaled_counts(N: int, matches: np.ndarray, silent: bool = True) -> np.ndarray: + """Counts the number of times each subject has appeared as a match. + + In the case of multiple matches, each subject only gets partial credit. + + Args: + N: int + The length of original treated or control group + matches: np.ndarray + A numpy array of matched indexes from control or treated group + silent: bool, optional + If true logger in info mode + Returns: + numpy.ndarray: An array representing the number of times each subject has appeared as a match + """ + s_counts = np.zeros(N) + + for matches_i in matches: + scale = 1 / len(matches_i) + for match in matches_i: + s_counts[match] += scale + + if silent: + logger.debug(f"Calculated the number of times each subject has appeared as a match: {len(s_counts)}") + else: + logger.info(f"Calculated the number of times each subject has appeared as a match: {len(s_counts)}") + + return s_counts + + +def bias_coefs(matches, Y_m, X_m): + """Computes Ordinary Least Squares (OLS) coefficient in bias correction regression. + + Constructs data for regression by including (possibly multiple times) every + observation that has appeared in the matched sample. + + Args: + matches: np.ndarray + A numpy array of matched indexes + Y_m: np.ndarray + The dependent variable values + X_m: np.ndarray: + The independent variable values + + Returns: + np.ndarray: The calculated OLS coefficients excluding the intercept + """ + flat_idx = np.concatenate(matches) + N, K = len(flat_idx), X_m.shape[1] + + Y = Y_m[flat_idx] + X = np.empty((N, K + 1)) + X[:, 0] = 1 # intercept term + X[:, 1:] = X_m[flat_idx] + + return np.linalg.lstsq(X, Y)[0][1:] # don't need intercept coef + + +def bias(X, X_m, coefs): + """Computes bias correction term. + + It is approximated by the dot product of the + matching discrepancy (i.e., X-X_matched) and the + coefficients from the bias correction regression. + + Args: + X: np.ndarray + The original independent variable values + X_m: np.ndarray + The matched independent variable values + coefs: np.ndarray + The coefficients from the bias correction regression + + Returns: + np.ndarray: The calculated bias correction terms for each observation + """ + bias_list = [(X_j - X_i).dot(coefs) for X_i, X_j in zip(X, X_m)] + + return np.array(bias_list) diff --git a/lightautoml/addons/hypex/matcher.py b/lightautoml/addons/hypex/matcher.py new file mode 100644 index 00000000..bf8ea298 --- /dev/null +++ b/lightautoml/addons/hypex/matcher.py @@ -0,0 +1,432 @@ +"""Base Matcher class.""" +import logging +import pickle + +import numpy as np +import pandas as pd + +from tqdm.auto import tqdm + +from .algorithms.faiss_matcher import FaissMatcher +from .selectors.lama_feature_selector import LamaFeatureSelector +from .selectors.outliers_filter import OutliersFilter +from .selectors.spearman_filter import SpearmanFilter +from .utils.validators import random_feature +from .utils.validators import random_treatment +from .utils.validators import subset_refuter +from .utils.validators import test_significance + + +REPORT_FEAT_SELECT_DIR = "report_feature_selector" +REPORT_PROP_MATCHER_DIR = "report_matcher" +NAME_REPORT = "lama_interactive_report.html" +N_THREADS = 1 +N_FOLDS = 4 +RANDOM_STATE = 123 +TEST_SIZE = 0.2 +TIMEOUT = 600 +VERBOSE = 2 +USE_ALGOS = ["lgb"] +PROP_SCORES_COLUMN = "prop_scores" +GENERATE_REPORT = True +SAME_TARGET_THRESHOLD = 0.7 +OUT_INTER_COEFF = 1.5 +OUT_MODE_PERCENT = True +OUT_MIN_PERCENT = 0.02 +OUT_MAX_PERCENT = 0.98 + +logger = logging.getLogger("hypex") +console_out = logging.StreamHandler() +logging.basicConfig( + handlers=(console_out,), + format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", + datefmt="%d.%m.%Y %H:%M:%S", + level=logging.INFO, +) + + +class Matcher: + """Main class.""" + + def __init__( + self, + input_data: pd.DataFrame, + outcome: str, + treatment: str, + outcome_type: str = "numeric", + group_col: str = None, + info_col: list = None, + generate_report: bool = GENERATE_REPORT, + report_feat_select_dir: str = REPORT_FEAT_SELECT_DIR, + timeout: int = TIMEOUT, + n_threads: int = N_THREADS, + n_folds: int = N_FOLDS, + verbose: bool = VERBOSE, + use_algos: list = None, + same_target_threshold: float = SAME_TARGET_THRESHOLD, + interquartile_coeff: float = OUT_INTER_COEFF, + drop_outliers_by_percentile: bool = OUT_MODE_PERCENT, + min_percentile: float = OUT_MIN_PERCENT, + max_percentile: float = OUT_MAX_PERCENT, + n_neighbors: int = 10, + silent: bool = True, + pbar: bool = True, + ): + """Initialize the Matcher object. + + Args: + input_data: pd.DataFrame + Input dataframe + outcome: str + Target column + treatment: str + Column determine control and test groups + outcome_type: str, optional + Values type of target column. Defaults to "numeric" + group_col: str, optional + Column for grouping. Defaults to None. + info_col: list, optional + Columns with id, date or metadata, not taking part in calculations. Defaults to None + generate_report: bool, optional + Flag to create report. Defaults to True + report_feat_select_dir: str, optional + Folder for report files. Defaults to "report_feature_selector" + timeout: int, optional + Limit work time of code LAMA. Defaults to 600 + n_threads: int, optional + Maximum number of threads. Defau;ts to 1 + n_folds: int, optional + Number of folds for cross-validation. Defaults to 4 + verbose: int, optional + Flag to show process stages. Defaults to 2 + use_algos: list, optional + List of names of LAMA algorithms for feature selection. Defaults to ["lgb"] + same_target_threshold: float, optional + Threshold for correlation coefficient filter (Spearman). Default to 0.7 + interquartile_coeff: float, optional + Percent for drop outliers. Default to 1.5 + drop_outliers_by_percentile: bool, optional + Flag to drop outliers by custom percentiles. Defaults to True + min_percentile: float, optional + Minimum percentile to drop outliers. Defaults to 0.02 + max_percentile: float, optional + Maximum percentile to drop outliers. Defaults to 0.98 + n_neighbors: int, optional + Number of neighbors to match. Defaults to 10 + silent: bool, optional + Write logs in debug mode + pbar: bool, optional + Display progress bar while get index + """ + if use_algos is None: + use_algos = USE_ALGOS + self.input_data = input_data + self.outcome = outcome + self.treatment = treatment + self.group_col = group_col + self.outcome_type = outcome_type + self.generate_report = generate_report + self.report_feat_select_dir = report_feat_select_dir + self.timeout = timeout + self.n_threads = n_threads + self.n_folds = n_folds + self.verbose = verbose + self.use_algos = use_algos + self.same_target_threshold = same_target_threshold + self.interquartile_coeff = interquartile_coeff + self.mode_percentile = drop_outliers_by_percentile + self.min_percentile = min_percentile + self.max_percentile = max_percentile + self.info_col = info_col + self.features_importance = None + self.matcher = None + self.val_dict = None + self.pval_dict = None + self.new_treatment = None + self.validate = None + self.n_neighbors = n_neighbors + self.silent = silent + self.pbar = pbar + self._preprocessing_data() + + def _convert_categorical_to_dummy(self, columns_to_drop): + """Converts categorical variables to dummy variables. + + Args: + columns_to_drop: list: + List of column names to drop before the conversion. + + Returns: + pandas.DataFrame: Data with categorical variables converted to dummy variables. + """ + data = self.input_data.drop(columns=columns_to_drop) + dummy_data = pd.get_dummies(data, drop_first=True) + return dummy_data + + def _preprocessing_data(self): + """Converts categorical features into dummy variables.""" + nan_counts = self.input_data.isna().sum().sum() + if nan_counts != 0: + self._log(f"Number of NaN values filled with zeros: {nan_counts}", silent=False) + self.input_data = self.input_data.fillna(0) + + if self.group_col is not None: + group_col = self.input_data[[self.group_col]] + if self.info_col is not None: + info_col = self.input_data[self.info_col] + + if self.group_col is None: + self.input_data = self._convert_categorical_to_dummy(self.info_col) + else: + self.input_data = self._convert_categorical_to_dummy([self.group_col] + self.info_col) + self.input_data = pd.concat([self.input_data, group_col], axis=1) + + if self.info_col is not None: + self.input_data = pd.concat([self.input_data, info_col], axis=1) + + self._log("Categorical features turned into dummy") + + def _apply_filter(self, filter_class, *filter_args): + """Applies a filter to the input data. + + Args: + filter_class: class + The class of the filter to apply. + *filter_args: (args) + Arguments to pass to the filter class. + """ + filter_instance = filter_class(*filter_args) + self.input_data = filter_instance.perform_filter(self.input_data) + + def _spearman_filter(self): + """Applies a filter by dropping columns correlated with the outcome column. + + This method uses the Spearman filter to eliminate features from the dataset + that are highly correlated with the outcome columns, based on a pre-set threshold + """ + self._log("Applying filter by spearman test - drop columns correlated with outcome") + self._apply_filter(SpearmanFilter, self.outcome, self.treatment, self.same_target_threshold) + + def _outliers_filter(self): + """Removes outlier values from the dataset. + + This method employs an OutliersFilter. If `drop_outliers_by_percentile` is True, + it retains only the values between the min and max percentiles + If `drop_outliers_by_percentile` is False, it retains only the values between 2nd and 98th percentiles + """ + self._log( + f"Applying filter of outliers\n" + f"interquartile_coeff={self.interquartile_coeff}\n" + f"mode_percentile={self.mode_percentile}\n" + f"min_percentile={self.min_percentile}\n" + f"max_percentile={self.max_percentile}" + ) + + self._apply_filter( + OutliersFilter, self.interquartile_coeff, self.mode_percentile, self.min_percentile, self.max_percentile + ) + + def lama_feature_select(self) -> pd.DataFrame: + """Calculates the importance of each feature. + + This method use LamaFeatureSelector to rank the importance of each feature in the dataset + The features are then sorted by their importance with the most important feature first + + Returns: + pd.DataFrame + The feature importances, sorted in descending order + """ + self._log("Counting feature importance") + + feat_select = LamaFeatureSelector( + outcome=self.outcome, + outcome_type=self.outcome_type, + treatment=self.treatment, + timeout=self.timeout, + n_threads=self.n_threads, + n_folds=self.n_folds, + verbose=self.verbose, + generate_report=self.generate_report, + report_dir=self.report_feat_select_dir, + use_algos=self.use_algos, + ) + df = self.input_data if self.group_col is None else self.input_data.drop(columns=self.group_col) + + if self.info_col is not None: + df = df.drop(columns=self.info_col) + + features = feat_select.perform_selection(df=df) + if self.group_col is None: + self.features_importance = features + else: + self.features_importance = features.append( + {"Feature": self.group_col, "Importance": features.Importance.max()}, ignore_index=True + ) + return self.features_importance.sort_values("Importance", ascending=False) + + def _create_faiss_matcher(self, df=None, validation=None): + """Creates a FaissMatcher object. + + Args: + df: pd.DataFrame, optional + The dataframe to use. If None, uses self.input_data. + validation: bool, optional + Whether to use the matcher for validation. If None, determines based on whether + """ + if df is None: + df = self.input_data + self.matcher = FaissMatcher( + df, + self.outcome, + self.treatment, + info_col=self.info_col, + features=self.features_importance, + group_col=self.group_col, + validation=validation, + n_neighbors=self.n_neighbors, + pbar=False if validation else self.pbar, + ) + + def _perform_validation(self): + """Performs validation using the FaissMatcher.""" + if self.group_col is None: + sim = self.matcher.match() + else: + sim = self.matcher.group_match() + for key in self.val_dict.keys(): + self.val_dict[key].append(sim[key][0]) + + def _log(self, message, silent=None): + """Logs a message at the appropriate level. + + Args: + message: str + The message to log. + silent: bool, optional + If silent, logs will be only info + """ + if silent is None: + silent = self.silent + if silent: + logger.debug(message) + else: + logger.info(message) + + def _matching(self) -> tuple: + """Performs matching considering the presence of groups. + + Returns: + tuple: Results of matching and matching quality metrics + + """ + self._create_faiss_matcher() + self._log("Applying matching") + + self.results, df_matched = self.matcher.match() + + self.quality_result = self.matcher.matching_quality(df_matched) + + return self.results, self.quality_result, df_matched + + def validate_result(self, refuter: str = "random_feature", n_sim: int = 10, fraction: float = 0.8) -> dict: + """Validates estimated ATE (Average Treatment Effect). + + Validates estimated effect: + 1) by replacing real treatment with random placebo treatment. + Estimated effect must be droped to zero, p-val < 0.05; + 2) by adding random feature (`random_feature`). Estimated effect shouldn't change + significantly, p-val > 0.05; + 3) estimates effect on subset of data (default fraction is 0.8). Estimated effect + shouldn't change significantly, p-val > 0.05. + + Args: + refuter: str + Refuter type (`random_treatment`, `random_feature`, `subset_refuter`) + n_sim: int + Number of simulations + fraction: float + Subset fraction for subset refuter only + + Returns: + dict: Dictionary of outcome_name: (mean_effect on validation, p-value) + """ + self._log(f"Perform validation with {refuter} refuter") + + self.val_dict = {k: [] for k in [self.outcome]} + self.pval_dict = dict() + + for i in tqdm(range(n_sim)): + if refuter in ["random_treatment", "random_feature"]: + if refuter == "random_treatment": + self.input_data, orig_treatment, self.validate = random_treatment(self.input_data, self.treatment) + elif refuter == "random_feature": + self.input_data, self.validate = random_feature(self.input_data) + if self.features_importance is not None and i == 0: + self.features_importance.append("random_feature") + elif refuter == "subset_refuter": + self.input_data, self.validate = subset_refuter(self.input_data, self.treatment, fraction) + else: + logger.error("Incorrect refuter name") + raise NameError( + "Incorrect refuter name! Available refuters: `random_feature`, `random_treatment`, `subset_refuter`" + ) + + self._create_faiss_matcher(self.input_data, self.validate) + self._perform_validation() + + for outcome in [self.outcome]: + self.pval_dict.update({outcome: [np.mean(self.val_dict[outcome])]}) + self.pval_dict[outcome].append( + test_significance(self.results.loc["ATE"]["effect_size"], self.val_dict[outcome]) + ) + if refuter == "random_treatment": + self.input_data[self.treatment] = orig_treatment + elif refuter == "random_feature": + self.input_data = self.input_data.drop(columns="random_feature") + if self.features_importance is not None: + self.features_importance.remove("random_feature") + + return self.pval_dict + + def estimate(self, features: list = None) -> tuple: + """Applies filters and outliers, then performs matching. + + Args: + features: list + Type List or feature_importances from LAMA + + Returns: + tuple: Results of matching and matching quality metrics + """ + if features is not None: + self.features_importance = features + return self._matching() + + def save(self, filename): + """Save the object to a file using pickle. + + This method serializes the object and writes it to a file + + Args: + filename: str + The name of the file to write to. + """ + with open(filename, "wb") as f: + pickle.dump(self, f) + + @classmethod + def load(cls, filename): + """Load an object from a file. + + This method reads a file and deserializes the object from it + + Args: + filename: str + The name of the file to read from. + + Returns: + object: + The deserialized object + """ + with open(filename, "rb") as f: + return pickle.load(f) diff --git a/lightautoml/addons/hypex/readme.md b/lightautoml/addons/hypex/readme.md new file mode 100644 index 00000000..d1a53633 --- /dev/null +++ b/lightautoml/addons/hypex/readme.md @@ -0,0 +1,8 @@ +# Digital Twins + +[![Telegram](https://img.shields.io/badge/chat-on%20Telegram-2ba2d9.svg)](https://t.me/lamamatcher) + + +# Usage +You can see the examples of usages this addons [here](https://github.com/sb-ai-lab/LightAutoML/blob/master/examples/tutorials/Tutorial_12_Matching.ipynb) + diff --git a/lightautoml/addons/hypex/selectors/lama_feature_selector.py b/lightautoml/addons/hypex/selectors/lama_feature_selector.py new file mode 100644 index 00000000..df21039e --- /dev/null +++ b/lightautoml/addons/hypex/selectors/lama_feature_selector.py @@ -0,0 +1,120 @@ +"""Feature selection class using LAMA.""" +import logging + +import pandas as pd +from typing import List + +from ....automl.presets.tabular_presets import TabularAutoML +from ....report import ReportDeco +from ....tasks import Task + +logger = logging.getLogger("lama_feature_selector") +console_out = logging.StreamHandler() +logging.basicConfig( + handlers=(console_out,), + format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", + datefmt="%d.%m.%Y %H:%M:%S", + level=logging.INFO, +) + + +class LamaFeatureSelector: + """The main class of LAMA Feature selector. + + Select top features. By default, use LGM""" + def __init__( + self, + outcome: str, + outcome_type: str, + treatment: str, + timeout: int, + n_threads: int, + n_folds: int, + verbose: bool, # не используется + generate_report: bool, + report_dir: str, + use_algos: List[str], + ): + """Initialize the LamaFeatureSelector. + + Args: + outcome: str + The target column + outcome_type: str + The type of target column + treatment: str + The column that determines control and test groups + timeout: int + Time limit for the execution of the code + n_threads: int + Maximum number of threads to be used + n_folds: int + Number of folds for cross-validation + verbose: bool + Flag to control the verbosity of the process stages + generate_report: bool + Flag to control whether to create a report or not + report_dir: str + Directory for storing report files + use_algos: List[str] + List of names of LAMA algorithms for feature selection + """ + self.outcome = outcome + self.outcome_type = outcome_type + self.treatment = treatment + self.use_algos = use_algos + self.timeout = timeout + self.n_threads = n_threads + self.n_folds = n_folds + self.verbose = verbose + self.generate_report = generate_report + self.report_dir = report_dir + + def perform_selection(self, df: pd.DataFrame) -> pd.DataFrame: + """Trains a model and returns feature scores. + + This method defines metrics, applies the model, creates a report, and returns feature scores + + Args: + df: pd.DataFrame + Input data + + Returns: + pd.DataFrame: A DataFrame containing the feature scores from the model + + """ + roles = { + "target": self.outcome, + "drop": [self.treatment], + } + + if self.outcome_type == "numeric": + task_name = "reg" + loss = "mse" + metric = "mse" + elif self.outcome_type == "binary": + task_name = "binary" + loss = "logloss" + metric = "logloss" + else: + task_name = "multiclass" + loss = "crossentropy" + metric = "crossentropy" + + task = Task(name=task_name, loss=loss, metric=metric) + + automl = TabularAutoML( + task=task, + timeout=self.timeout, + cpu_limit=self.n_threads, + general_params={"use_algos": [self.use_algos]}, + reader_params={"n_jobs": self.n_threads, "cv": self.n_folds, }, + ) + + if self.generate_report: + report = ReportDeco(output_path=self.report_dir) + automl = report(automl) + + _ = automl.fit_predict(df, roles=roles) + + return automl.model.get_feature_scores() diff --git a/lightautoml/addons/hypex/selectors/outliers_filter.py b/lightautoml/addons/hypex/selectors/outliers_filter.py new file mode 100644 index 00000000..6bbc1643 --- /dev/null +++ b/lightautoml/addons/hypex/selectors/outliers_filter.py @@ -0,0 +1,82 @@ +"""Outliers filter.""" +import logging + +import pandas as pd + +logger = logging.getLogger("outliers_filter") +console_out = logging.StreamHandler() +logging.basicConfig( + handlers=(console_out,), + format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", + datefmt="%d.%m.%Y %H:%M:%S", + level=logging.INFO, +) + + +class OutliersFilter: + """The main class of Outliers Filter. + + It creates a row indices that should be deleted by percentile.""" + def __init__(self, interquartile_coeff, mode_percentile, min_percentile, max_percentile): + """ + Initializes the OutliersFilter. + + Args: + interquartile_coeff: float + Coefficient for the interquartile range to determine outliers + mode_percentile: bool + If True, outliers are determined by custom percentiles + min_percentile: float + The lower percentile. Values below this percentile are considered outliers. + max_percentile: float + The upper percentile. Values above this percentile are considered outliers + """ + self.interquartile_coeff = interquartile_coeff + self.mode_percentile = mode_percentile + self.min_percentile = min_percentile + self.max_percentile = max_percentile + + def perform_filter(self, df: pd.DataFrame, interquartile: bool = True) -> set: + """Identifies rows with outliers. + + This method creates a set of row indices to be removed, which contains values less than + `min_percentile` and larger than `max_percentile` (if `mode_percentile` is True), or values + smaller than the 0.2 and larget than 0.8 (if `mode_percentile` is False) + + Args: + df: pd.DataFrame + The input DataFrame + interquartile: bool, optional + If True, uses the interquartile range to determine outliers. Defaults to True + + Returns: + set: The set of row indices with outliers + """ + columns_names = df.select_dtypes(include="number").columns + rows_for_del = [] + for column in columns_names: + if self.mode_percentile: + min_value = df[column].quantile(self.min_percentile) + max_value = df[column].quantile(self.max_percentile) + elif interquartile: + upper_quantile = df[column].quantile(0.8) + lower_quantile = df[column].quantile(0.2) + + interquartile_range = upper_quantile - lower_quantile + min_value = lower_quantile - self.interquartile_coeff * interquartile_range + max_value = upper_quantile + self.interquartile_coeff * interquartile_range + else: + mean_value = df[column].mean() + standard_deviation = df[column].std() + nstd_lower, nstd_upper = 3, 3 + + min_value = mean_value - nstd_lower * standard_deviation + max_value = mean_value + nstd_upper * standard_deviation + + rows_for_del_column = (df[column] < min_value) | (df[column] > max_value) + rows_for_del_column = df.index[rows_for_del_column].tolist() + rows_for_del.extend(rows_for_del_column) + rows_for_del = set(rows_for_del) + logger.info(f"Drop {len(rows_for_del)} rows") + + return rows_for_del diff --git a/lightautoml/addons/hypex/selectors/spearman_filter.py b/lightautoml/addons/hypex/selectors/spearman_filter.py new file mode 100644 index 00000000..ec8f51f4 --- /dev/null +++ b/lightautoml/addons/hypex/selectors/spearman_filter.py @@ -0,0 +1,70 @@ +"""Spearman filter.""" +import logging + +import pandas as pd +from scipy.stats import spearmanr + +PVALUE = 0.05 + +logger = logging.getLogger("spearman_filter") +console_out = logging.StreamHandler() +logging.basicConfig( + handlers=(console_out,), + format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", + datefmt="%d.%m.%Y %H:%M:%S", + level=logging.INFO, +) + + +class SpearmanFilter: + """A class to filter columns based on the Spearman correlation coefficient. + + The class is utilized to filter dataframe columns that do not exhibit a significant + correlation (based on a provided threshold) with a specified outcome column. + The significance of the correlation is determined using the Spearman correlation coefficient + and a p-value threshold of 0.05 + """ + + def __init__(self, outcome: str, treatment: str, threshold: float): + """Initialize spearman filter. + + Args: + outcome: str + The name of target column + treatment: str + The name of the column that determines control and test groups + threshold: float + The threshold for the Spearman correlation coefficient filter + """ + self.outcome: str = outcome + self.treatment: str = treatment + self.threshold: float = threshold + + def perform_filter(self, df: pd.DataFrame) -> pd.DataFrame: + """Filters columns based on their correlation with the outcome column. + + The method tests the correlation using the Spearman correlation coefficient. + Columns that have an absolute correlation coefficient value less than the provided threshold, + and a p-value less than 0.05, are considered insignificant and are removed from the dataframe + + Args: + df: pd.DataFrame + The input DataFrame + + Returns: + pd.DataFrame: The filtered DataFrame, containing only columns that + are significantly correlated with the outcome column + """ + selected = [] + columns = df.drop([self.treatment, self.outcome], 1).columns + for column in columns: + result = spearmanr(df[self.outcome].values, df[column].values) + if (abs(result[0] < self.threshold)) and (result[1] < PVALUE): + selected.append(column) + + logger.info(f"Drop columns {list(set(columns) - set(selected))}") + + columns = selected + [self.treatment, self.outcome] + df = df[columns] + + return df diff --git a/lightautoml/addons/hypex/utils/metrics.py b/lightautoml/addons/hypex/utils/metrics.py new file mode 100644 index 00000000..38faf520 --- /dev/null +++ b/lightautoml/addons/hypex/utils/metrics.py @@ -0,0 +1,159 @@ +"""Calculate metrics.""" +import logging + +import numpy as np +import pandas as pd +from scipy.stats import ks_2samp + +from ..utils.psi_pandas import report + +logger = logging.getLogger("metrics") +console_out = logging.StreamHandler() +logging.basicConfig( + handlers=(console_out,), + format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", + datefmt="%d.%m.%Y %H:%M:%S", + level=logging.INFO, +) + + +def smd(orig: pd.DataFrame, matched: pd.DataFrame, silent=False) -> pd.DataFrame: + """Calculates the standardised mean difference to evaluate matching quality. + + Args: + orig: pd.DataFrame + Initial dataframe + matched: pd.DataFrame + Matched dataframe + silent: bool, optional + If silent, logger in info mode + + Returns: + pd.DataFrame: The standard mean deviation between initial and matched dataframes + """ + smd_data = abs(orig.mean(0) - matched.mean(0)) / orig.std(0) + + if silent: + logger.debug(f"Standardised mean difference:\n{smd_data}") + else: + logger.info(f"Standardised mean difference:\n{smd_data}") + + return smd_data + + +def ks(orig: pd.DataFrame, matched: pd.DataFrame, silent=False) -> dict: + """Performs a Kolmogorov-Smirnov test to evaluate matching quality per columns. + + Args: + orig: pd.DataFrame + Initial dataframe + matched: pd.DataFrame + Matched dataframe + silent: bool, optional + If silent, logger in info mode + + + Returns: + dict: dict of p-values + + """ + ks_dict = dict() + matched.columns = orig.columns + for col in orig.columns: + ks_pval_1 = ks_2samp(orig[col].values, matched[col].values)[1] + ks_dict.update({col: ks_pval_1}) + + filter_list = list(ks_dict.keys())[:3] + list(ks_dict.keys())[-3:] + dict_to_show = {key: val for key, val in ks_dict.items() if key in filter_list} + + if silent: + logger.debug(f"Kolmogorov-Smirnov test to check matching quality: \n{dict_to_show}") + else: + logger.info(f"Kolmogorov-Smirnov test to check matching quality: \n{dict_to_show}") + + return ks_dict + + +def matching_quality(data: pd.DataFrame, treatment: str, features: list, features_psi: list, + silent: bool = False) -> tuple: + """Wraps the functionality for estimating matching quality. + + Args: + data: pd.DataFrame + The dataframe of matched data + treatment: str + The column determining control and test groups + features: list + The list of features, ks-test and smd accept only numeric values + features_psi: list + The list of features for calculating Population Stability Index (PSI) + silent: bool, optional + If silent, logger in info mode + + + Returns: + tuple: A tuple of dataframes with estimated metrics for matched treated to control and control to treated + + """ + orig_treated = data[data[treatment] == 1][features] + orig_untreated = data[data[treatment] == 0][features] + matched_treated = data[data[treatment] == 1][sorted([f + "_matched" for f in features])] + matched_treated.columns = list(map(lambda x: x.replace("_matched", ""), matched_treated.columns)) + matched_untreated = data[data[treatment] == 0][sorted([f + "_matched" for f in features])] + matched_untreated.columns = list(map(lambda x: x.replace("_matched", ""), matched_untreated.columns)) + + psi_treated = data[data[treatment] == 1][features_psi] + psi_treated_matched = data[data[treatment] == 1][[f + "_matched" for f in features_psi]] + psi_treated_matched.columns = [f + "_treated" for f in features_psi] + psi_treated.columns = [f + "_treated" for f in features_psi] + + psi_untreated = data[data[treatment] == 0][features_psi] + psi_untreated_matched = data[data[treatment] == 0][[f + "_matched" for f in features_psi]] + psi_untreated.columns = [f + "_untreated" for f in features_psi] + psi_untreated_matched.columns = [f + "_untreated" for f in features_psi] + + treated_smd_data = smd(orig_treated, matched_treated, silent) + untreated_smd_data = smd(orig_untreated, matched_untreated, silent) + smd_data = pd.concat([treated_smd_data, untreated_smd_data], axis=1) + smd_data.columns = ["match_control_to_treat", "match_treat_to_control"] + + treated_ks = ks(orig_treated, matched_treated, silent) + untreated_ks = ks(orig_untreated, matched_untreated, silent) + ks_dict = {k: [treated_ks[k], untreated_ks[k]] for k in treated_ks.keys()} + ks_df = pd.DataFrame(data=ks_dict, index=range(2)).T + ks_df.columns = ["match_control_to_treat", "match_treat_to_control"] + + report_cols = ["column", "anomaly_score", "check_result"] + report_psi_treated = report(psi_treated, psi_treated_matched, silent=silent)[report_cols] + report_psi_treated.columns = [col + "_treated" for col in report_cols] + report_psi_untreated = report(psi_untreated, psi_untreated_matched, silent=silent)[report_cols] + report_psi_untreated.columns = [col + "_untreated" for col in report_cols] + report_psi = pd.concat( + [report_psi_treated.reset_index(drop=True), report_psi_untreated.reset_index(drop=True)], axis=1 + ) + + return report_psi, ks_df, smd_data + + +def check_repeats(index: np.array, silent: bool = False) -> float: + """Checks the fraction of duplicated indexes in the given array. + + Args: + index: np.ndarray + The array of indexes to check for duplicates + silent: bool, optional + If silent, logger in info mode + + Returns: + float: + The fraction of duplicated index + """ + unique, counts = np.unique(index, return_counts=True) + rep_frac = len(unique) / len(index) if len(unique) > 0 else 0 + + if silent: + logger.debug(f"Fraction of duplicated indexes: {rep_frac: .2f}") + else: + logger.info(f"Fraction of duplicated indexes: {rep_frac: .2f}") + + return round(rep_frac, 2) diff --git a/lightautoml/addons/hypex/utils/psi_pandas.py b/lightautoml/addons/hypex/utils/psi_pandas.py new file mode 100644 index 00000000..91347336 --- /dev/null +++ b/lightautoml/addons/hypex/utils/psi_pandas.py @@ -0,0 +1,494 @@ +"""Calculate PSI.""" +import logging + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + +logger = logging.getLogger("psi_pandas") +console_out = logging.StreamHandler() +logging.basicConfig( + handlers=(console_out,), + format="[%(asctime)s | %(name)s | %(levelname)s]: %(message)s", + datefmt="%d.%m.%Y %H:%M:%S", + level=logging.INFO, +) + + +class PSI: + """Calculates population stability index for different categories of data. + + For numeric data the class generates numeric buckets, except when numeric column + includes only NULL. For categorical data: + 1. For n < 20, a bucket equals the proportion of each category, + 2. For n > 20, a bucket equals to a group of categories, + 3. For n > 100, it calculates unique_index based on Jaccard similarity, + but in case of imbalance null-good data returns PSI + + Args: + expected: pd.DataFrame + The expected values + actual: pd.DataFrame + The actual values + column_name: str + The column name for which to calculate the PSI + plot: bool, optional + If true, generates a distribution plot. Defaults to False + + Returns: + float: PSI for column + float: The PSI for each bucket + list: New categories (empty list for non-categorical data) + list: Categories that are absent in actual column (empty list for non-categorical data) + """ + + def __init__( + self, expected: pd.DataFrame, actual: pd.DataFrame, column_name: str, plot: bool = False, silent=False + ): + """Initializes the PSI class with given parameters. + + Args: + expected: pd.DataFrame + The expected values + actual: pd.DataFrame + The actual values + column_name: str + The column name for which to calculate the PSI + plot: bool, optional + If true, generates a distribution plot. Defaults to False + silent: bool, optional + If True show logs. Default by False + """ + self.expected = expected[column_name].values + self.actual = actual[column_name].values + self.actual_len = len(self.actual) + self.expected_len = len(self.expected) + self.column_name = column_name + self.column_type = self.expected.dtype + self.expected_shape = self.expected.shape + self.expected_nulls = np.sum(pd.isna(self.expected)) + self.actual_nulls = np.sum(pd.isna(self.actual)) + self.axis = 1 + self.plot = plot + self.silent = silent + if self.column_type == np.dtype("O"): + self.expected_uniqs = expected[column_name].unique() + self.actual_uniqs = actual[column_name].unique() + + def jac(self) -> float: + """Calculates the Jacquard similarity index. + + The Jacquard similarity index measures the intersection between two sequences + versus the union of the two sequences + + Returns: + float: The Jacquard similarity index + + """ + x = set(self.expected_uniqs) + y = set(self.expected_uniqs) + + logger.info(f"Jacquard similarity is {len(x.intersection(y)) / len(x.union(y)): .6f}") + + return len(x.intersection(y)) / len(x.union(y)) + + # в функции нет аргумента nulls, а был и испольовался далее в коде + def plots(self, expected_percents, actual_percents, breakpoints, intervals): + """Generates plots expected and actual percents. + + Args: + expected_percents: float + The percentage of expected value from all expected values + actual_percents: float + The percentage of actual value from all actual values + breakpoints: list + The list of breakpoints + intervals: list + The list of intervals + + """ + points = [i for i in breakpoints] + plt.figure(figsize=(15, 7)) + plt.bar( + np.arange(len(intervals)) - 0.15, # что такое 0.15? Может вынести в константу? + expected_percents, + label="expected", + alpha=0.7, + width=0.3, + ) + plt.bar(np.arange(len(intervals)) + 0.15, actual_percents, label="actual", alpha=0.7, width=0.3) + plt.legend(loc="best") + + if self.column_type != np.dtype("O"): + plt.xticks(range(len(intervals)), intervals, rotation=90) + else: + plt.xticks(range(len(points)), points, rotation=90) + plt.title(self.column_name) + + # plt.savefig(f"C:\\Users\\Glazova2-YA\\Documents\\data\\bip\\summary_psi_plots\\{self.column_name}.png") + plt.show() + + def sub_psi(self, e_perc: float, a_perc: float) -> float: + """Calculates the sub PSI value. + + Args: + e_perc: float + The expected percentage + a_perc: float + The actual percentage + + Returns: + float: The calculated sub PSI value. + """ + if a_perc == 0: + a_perc = 0.0001 + if e_perc == 0: + e_perc = 0.0001 + + value = (e_perc - a_perc) * np.log(e_perc / a_perc) + + logger.debug(f"sub_psi value is {value: .6f}") + + return value + + def psi_num(self): + """Calculate the PSI for a single variable. + + Returns: + float: PSI for column + dict: The PSI for each bucket + list: New categories (empty list for non-categorical data) + list: Categories that are absent in actual column (empty list for non-categorical data) + + """ + buckets = 10 + breakpoints = np.arange(0, buckets / 10, 0.1) + + # Заплатка, на случай, если в актуальной таблице появились значения отличные от null + if self.expected_nulls == self.expected_len and self.actual_nulls != self.actual_len: + breakpoints = np.array(list(sorted(set(np.nanquantile(self.actual, breakpoints))))) + else: + breakpoints = np.array(list(sorted(set(np.nanquantile(self.expected, breakpoints))))) + + actual_nulls = self.actual_nulls / self.actual_len + expected_nulls = self.expected_nulls / self.expected_len + + breakpoints = np.concatenate(([-np.inf], breakpoints, [np.inf])) + + expected_percents = np.histogram(self.expected, breakpoints) + actual_percents = np.histogram(self.actual, breakpoints) + # breakpoints[0] = -np.inf + # breakpoints[-1] = np.inf + expected_percents = [p / self.expected_len for p in expected_percents[0]] + actual_percents = [p / self.actual_len for p in actual_percents[0]] + + if self.expected_nulls == 0 and actual_nulls == expected_nulls: + expected_percents = expected_percents + actual_percents = actual_percents + nulls = False + else: + expected_percents.append(expected_nulls) + actual_percents.append(actual_nulls) + nulls = True + + points = [i for i in breakpoints] + intervals = [f"({np.round(points[i], 5)};{np.round(points[i + 1], 5)})" for i in range(len(points) - 1)] + if nulls: + intervals = np.append(intervals, "empty_values") + + if self.plot: + self.plots(expected_percents, actual_percents, breakpoints, intervals) # в функции нет аргумента nulls + + psi_dict = {} + for i in range(0, len(expected_percents)): + psi_val = self.sub_psi(expected_percents[i], actual_percents[i]) + psi_dict.update({intervals[i]: psi_val}) + + psi_value = np.sum(list(psi_dict.values())) + psi_dict = {k: v for k, v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} + new_cats = [] + abs_cats = [] + + return psi_value, psi_dict, new_cats, abs_cats + + def uniq_psi(self): + """Calculates PSI for categorical unique counts grater than 100. + + Returns: + float: PSI for column + dict: The PSI for each bucket + list: New categories (empty list for non-categorical data) + list: Categories that are absent in actual column (empty list for non-categorical data) + + """ + actual_nulls = self.actual_nulls / self.actual_len + expected_nulls = self.expected_nulls / self.expected_len + + actual_not_nulls_arr = self.actual[~np.isnan(self.actual)] + expected_not_nulls_arr = self.expected[~np.isnan(self.expected)] + + actual_not_nulls = len(actual_not_nulls_arr) / self.actual_len + expected_not_nulls = len(expected_not_nulls_arr) / self.expected_len + + expected_percents = [expected_not_nulls, expected_nulls] + actual_percents = [actual_not_nulls, actual_nulls] + + breakpoints = ["good_data", "nulls"] + if self.plot: + self.plots(expected_percents, actual_percents, breakpoints, breakpoints) # в функции нет аргумента nulls + + psi_dict = {} + for i in range(0, len(expected_percents)): + psi_val = self.sub_psi(expected_percents[i], actual_percents[i]) + if breakpoints[i] == "None": + psi_dict.update({"empty_value": psi_val}) + else: + psi_dict.update({breakpoints[i]: psi_val}) + + psi_value = np.sum(list(psi_dict.values())) + jac_metric = self.jac() + new_cats, abs_cats = [], [] + psi_dict = {k: v for k, v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} + + if psi_value >= 0.2: # что такое 0.2? Может перенести его в константу? + psi_value = psi_value + psi_dict.update({"metric": "stability_index"}) + else: + psi_value = 1 - jac_metric + psi_dict.update({"metric": "unique_index"}) + + logger.info(f"PSI for categorical unique >100 is {psi_value: .6f}") + + return psi_value, psi_dict, new_cats, abs_cats + + def psi_categ(self): + """Calculates PSI for categorical data excluding unique counts grater than 100. + + Returns: + float: PSI for column + dict: The PSI for each bucket + list: New categories (empty list for non-categorical data) + list: Categories that are absent in actual column (empty list for non-categorical data) + + """ + expected_uniq_count = len(self.expected_uniqs) + actual_uniq_count = len(self.actual_uniqs) + # правило для категориальных > 100 + if expected_uniq_count > 100 or actual_uniq_count > 100: + psi_value, psi_dict, new_cats, abs_cats = self.uniq_psi() + + logger.info(f"PSI is {psi_value: .6f}") + + return psi_value, psi_dict, new_cats, abs_cats + + expected_dict = ( + pd.DataFrame(self.expected, columns=[self.column_name]) + .groupby(self.column_name)[self.column_name] + .count() + .sort_values(ascending=False) + .to_dict() + ) + actual_dict = ( + pd.DataFrame(self.actual, columns=[self.column_name]) + .groupby(self.column_name)[self.column_name] + .count() + .sort_values(ascending=False) + .to_dict() + ) + + breakpoints = list(expected_dict.keys() | actual_dict.keys()) + + new_cats = [k for k in actual_dict.keys() if k not in expected_dict.keys()] + abs_cats = [k for k in expected_dict.keys() if k not in actual_dict.keys()] + + expected_dict_re = {} + actual_dict_re = {} + + for b in breakpoints: + if b in expected_dict and b not in actual_dict: + expected_dict_re.update({b: expected_dict[b]}) + actual_dict_re.update({b: 0}) + elif b not in expected_dict and b in actual_dict: + expected_dict_re.update({b: 0}) + actual_dict_re.update({b: actual_dict[b]}) + elif b in expected_dict and b in actual_dict: + actual_dict_re.update({b: actual_dict[b]}) + expected_dict_re.update({b: expected_dict[b]}) + + category_names = [c for c in expected_dict_re.keys()] + groups = {} + g_counts = len(category_names) + group_num = 20 + if g_counts <= group_num: + for g_n, val in enumerate(category_names): + groups[val] = g_n + else: + group_size = np.floor(g_counts / group_num) + current_pos = 0 + reminder = g_counts % group_num + for g_n in range(group_num): + if g_n < group_num - reminder: + group_values = category_names[int(current_pos): int(current_pos + group_size)] + current_pos += group_size + else: + group_values = category_names[int(current_pos): int(current_pos + group_size + 1)] + current_pos += group_size + 1 + for val in group_values: + groups[val] = g_n + group_sum_exp = 0 + group_sum_act = 0 + exp_dict = {} + act_dict = {} + group_re = -1 + cat_group_name = "" + group_name_re = "" + for k, v in groups.items(): + current_group = v + if current_group == group_re: + group_re = v + exp_dict.pop(group_name_re, None) + act_dict.pop(group_name_re, None) + cat_group_name = cat_group_name + ", " + str(k) + group_sum_exp += expected_dict_re[k] + group_sum_act += actual_dict_re[k] + exp_dict.update({cat_group_name: group_sum_exp}) + act_dict.update({cat_group_name: group_sum_act}) + group_name_re = cat_group_name + else: + group_name_re = str(k) + group_re = v + cat_group_name = str(k) + group_sum_exp = expected_dict_re[k] + group_sum_act = actual_dict_re[k] + exp_dict.update({cat_group_name: group_sum_exp}) + act_dict.update({cat_group_name: group_sum_act}) + + expected_percents = [e / self.expected_len for e in exp_dict.values()] + actual_percents = [a / self.actual_len for a in act_dict.values()] + + breakpoints = [e for e in exp_dict.keys()] + + if self.plot: + self.plots( + expected_percents, actual_percents, breakpoints, breakpoints + ) # в функции plots нет аргумента nulls + + psi_dict = {} + for i in range(0, len(expected_percents)): + psi_val = self.sub_psi(expected_percents[i], actual_percents[i]) + if breakpoints[i] == "None": + psi_dict.update({"empty_value": psi_val}) + else: + psi_dict.update({breakpoints[i]: psi_val}) + psi_value = np.sum(list(psi_dict.values())) + psi_dict = {k: v for k, v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} + + return psi_value, psi_dict, new_cats, abs_cats + + def psi_result(self): + """Calculates PSI. + + Returns: + float: PSI for column + dict: The PSI for each bucket + list: New categories (empty list for non-categorical data) + list: Categories that are absent in actual column (empty list for non-categorical data) + + """ + if len(self.expected_shape) == 1: + psi_values = np.empty(len(self.expected_shape)) + else: + psi_values = np.empty(self.expected_shape[self.axis]) + + for i in range(0, len(psi_values)): + if (self.column_type == np.dtype("O")) or ( + self.expected_nulls == self.expected_len and self.actual_nulls == self.actual_len + ): + psi_values, psi_dict, new_cats, abs_cats = self.psi_categ() + else: + psi_values, psi_dict, new_cats, abs_cats = self.psi_num() + + if self.silent: + logger.debug(f"PSI value: {psi_values: .3f}") + else: + logger.info(f"PSI value: {psi_values: .3f}") + + # если expected_shape пустой - будет ошибка + return round(psi_values, 2), psi_dict, new_cats, abs_cats + + +def report(expected: pd.DataFrame, actual: pd.DataFrame, plot: bool = False, silent: bool = False) -> pd.DataFrame: + """Generates a report using PSI (Population Stability Index) between the expected and actual data. + + Args: + expected: pd.DataFrame + The expected dataset + actual: pd.DataFrame + The new dataset you want to compare to the expected one + plot: bool + If True, plots the PSI are created. Defaults to False + silent: bool, optional + If silent, logger in info mode + + + Returns: + pd.DataFrame: A dataframe with the PSI report. The report includes the columns names, + metric names, check results, failed buckets, new categories and disappeared categories. + Anomaly score represent the PSI, metrics names indicate with metric was used for PSI calculation, + check results indicate whether the PSI is under the threshold (0.2), + and failed buckets include up to 5 buckets with the highest PSI. + + """ + if silent: + logger.debug("Creating report") + else: + logger.info("Creating report") + + assert len(expected.columns) == len(actual.columns) + + data_cols = expected.columns + score_dict = {} + new_cat_dict = {} + dfs = [] + + for col in data_cols: + psi_res = PSI(expected, actual, col, plot=plot, silent=silent) + # отладка, в случае ошибки выдаст прооблемный столбец + try: + score, psi_dict, new_cats, abs_cats = psi_res.psi_result() + except: + logger.warning(f"Can not count PSIs, see column {col}") + continue + + if len(new_cats) > 0: + new_cat_dict.update({col: new_cats}) + + score_dict.update({col: score}) + check_result = "OK" if score < 0.2 else "NOK" # может 0.2 вынести в константу? + # psi_dict = {k:v for k,v in sorted(psi_dict.items(), key=lambda x: x[1], reverse=True)} + failed_buckets = list(psi_dict.keys())[:5] if score > 0.2 else [] + if "metric" in psi_dict: + new_cats = None + abs_cats = None + metric_name = psi_dict["metric"] + if metric_name == "unique_index": + failed_buckets = None + else: + metric_name = "stability_index" + df_tmp = pd.DataFrame( + { + "column": col, + "anomaly_score": score, + "metric_name": metric_name, + "check_result": check_result, + "failed_bucket": f"{failed_buckets}", + "new_category": f"{new_cats}", + "disappeared_category": f"{abs_cats}", + }, + index=[1], + ) + dfs.append(df_tmp) + + df = pd.concat(dfs, ignore_index=True) + + return df diff --git a/lightautoml/addons/hypex/utils/validators.py b/lightautoml/addons/hypex/utils/validators.py new file mode 100644 index 00000000..f792fc9a --- /dev/null +++ b/lightautoml/addons/hypex/utils/validators.py @@ -0,0 +1,91 @@ +"""Validators.""" +from typing import List + +import numpy as np +import pandas as pd +import scipy.stats as st + + +def random_treatment(df: pd.DataFrame, treatment: str): + """Replaces real treatment with a random placebo treatment. + + Args: + df: pd.DataFrame + The initial dataframe + treatment: str + The columns name representing the treatment + + Return: + pd.DataFrame: The modified dataframe with the original treatment replaced + pd.Series: The original treatment series + int: A validation flag + """ + prop1 = df[treatment].sum() / df.shape[0] + prop0 = 1 - prop1 + new_treatment = np.random.choice([0, 1], size=df.shape[0], p=[prop0, prop1]) + validate = 1 + orig_treatment = df[treatment] + df = df.drop(columns=treatment) + df[treatment] = new_treatment + return df, orig_treatment, validate + + +def random_feature(df: pd.DataFrame): + """Adds a random feature to the initial dataset. + + Args: + df: pd.DataFrame + The initial dataframe + + Return: + pd.DataFrame: The modified dataframe with an additional random feature + int: A validation flag + """ + feature = np.random.normal(0, 1, size=len(df)) + validate = 1 + df["random_feature"] = feature + return df, validate + + +def subset_refuter(df: pd.DataFrame, treatment: str, fraction: float = 0.8): + """Returns a subset of data with given fraction (default 0.8). + + Args: + df: pd.DataFrame + The initial dataframe + treatment: str + The column name representing the treatment + fraction: float + The fraction of the dataset to divide random matching + + Return: + pd.DataFrame: The subset of the dataframe + int: A validation flag + """ + df = df.groupby(treatment, group_keys=False).apply(lambda x: x.sample(frac=fraction)) + validate = 1 + return df, validate + + +def test_significance(estimate: float, simulations: List) -> float: + """Performs a significance test for a normal distribution. + + Args: + estimate: float + The estimated effect + simulations: list + A list of estimated effects from each simulation + + Return: + float: The p-value of the test + """ + mean_refute_value = np.mean(simulations) + std_dev_refute_values = np.std(simulations) + z_score = (estimate - mean_refute_value) / std_dev_refute_values + + if z_score > 0: # Right Tail + p_value = 1 - st.norm.cdf(z_score) + else: # Left Tail + p_value = st.norm.cdf(z_score) + + return p_value From bec1c7f33c8711dceac9f7f9ce69a635a67c5280 Mon Sep 17 00:00:00 2001 From: dev-rinchin Date: Tue, 25 Jul 2023 12:26:52 +0000 Subject: [PATCH 2/2] fixes to pass linters --- .../addons/hypex/{readme.md => README.md} | 3 +- lightautoml/addons/hypex/__init__.py | 1 + .../addons/hypex/algorithms/faiss_matcher.py | 56 ++++++++++--------- lightautoml/addons/hypex/matcher.py | 4 +- .../hypex/selectors/lama_feature_selector.py | 34 ++++++----- .../addons/hypex/selectors/outliers_filter.py | 7 +-- .../addons/hypex/selectors/spearman_filter.py | 2 + lightautoml/addons/hypex/utils/metrics.py | 7 ++- lightautoml/addons/hypex/utils/psi_pandas.py | 11 ++-- lightautoml/addons/hypex/utils/validators.py | 32 +++++------ 10 files changed, 84 insertions(+), 73 deletions(-) rename lightautoml/addons/hypex/{readme.md => README.md} (89%) diff --git a/lightautoml/addons/hypex/readme.md b/lightautoml/addons/hypex/README.md similarity index 89% rename from lightautoml/addons/hypex/readme.md rename to lightautoml/addons/hypex/README.md index d1a53633..5140546d 100644 --- a/lightautoml/addons/hypex/readme.md +++ b/lightautoml/addons/hypex/README.md @@ -1,8 +1,7 @@ -# Digital Twins +# HypEx: Hypotheses and Experiments [![Telegram](https://img.shields.io/badge/chat-on%20Telegram-2ba2d9.svg)](https://t.me/lamamatcher) # Usage You can see the examples of usages this addons [here](https://github.com/sb-ai-lab/LightAutoML/blob/master/examples/tutorials/Tutorial_12_Matching.ipynb) - diff --git a/lightautoml/addons/hypex/__init__.py b/lightautoml/addons/hypex/__init__.py index 18156cf6..72e89553 100644 --- a/lightautoml/addons/hypex/__init__.py +++ b/lightautoml/addons/hypex/__init__.py @@ -1,3 +1,4 @@ from .matcher import Matcher + __all__ = ["Matcher"] diff --git a/lightautoml/addons/hypex/algorithms/faiss_matcher.py b/lightautoml/addons/hypex/algorithms/faiss_matcher.py index 70f6458e..3a2da4cf 100644 --- a/lightautoml/addons/hypex/algorithms/faiss_matcher.py +++ b/lightautoml/addons/hypex/algorithms/faiss_matcher.py @@ -1,15 +1,21 @@ """Class that searches indexes.""" import datetime as dt import logging -from typing import Dict, Union, Tuple + +from typing import Dict +from typing import Tuple +from typing import Union import faiss import numpy as np import pandas as pd + from scipy.stats import norm from tqdm.auto import tqdm -from ..utils.metrics import check_repeats, matching_quality +from ..utils.metrics import check_repeats +from ..utils.metrics import matching_quality + faiss.cvar.distance_compute_blas_threshold = 100000 POSTFIX = "_matched" @@ -26,22 +32,21 @@ class FaissMatcher: - """A class used to match instances using Faiss library. - """ + """A class used to match instances using Faiss library.""" def __init__( - self, - df: pd.DataFrame, - outcomes: str, - treatment: str, - info_col: list, - features: [list, pd.DataFrame] = None, - group_col: str = None, - sigma: float = 1.96, - validation: bool = None, - n_neighbors: int = 10, - silent: bool = True, - pbar: bool = True + self, + df: pd.DataFrame, + outcomes: str, + treatment: str, + info_col: list, + features: [list, pd.DataFrame] = None, + group_col: str = None, + sigma: float = 1.96, + validation: bool = None, + n_neighbors: int = 10, + silent: bool = True, + pbar: bool = True, ): """Construct all the necessary attributes. @@ -285,7 +290,7 @@ def _create_features_matched_df(self, index: np.ndarray, is_treated: bool) -> pd def _create_matched_df(self) -> pd.DataFrame: """Creates matched df of features and outcome. - Return: + Returns: pd.DataFrame: Matched dataframe """ df_pred_treated = self._create_outcome_matched_df(self.dict_outcome_treated, True) @@ -364,9 +369,6 @@ def _calculate_ate_all_target(self, df: pd.DataFrame): df: pd.DataFrame Input dataframe - Returns: - None - """ logger.debug("Creating dicts of all effects: ATE, ATC, ATT") @@ -599,10 +601,8 @@ def _get_index(base: np.ndarray, new: np.ndarray, n_neighbors: int) -> np.ndarra index = faiss.IndexFlatL2(base.shape[1]) index.add(base) dist, indexes = index.search(new, n_neighbors) - map_func = lambda x: np.where(x == x[0])[0] - equal_dist = list(map(map_func, dist)) - f2 = lambda x, y: x[y] - indexes = np.array([f2(i, j) for i, j in zip(indexes, equal_dist)]) + equal_dist = list(map(lambda x: np.where(x == x[0])[0], dist)) + indexes = np.array([i[j] for i, j in zip(indexes, equal_dist)]) return indexes @@ -706,8 +706,9 @@ def calc_att_se(vars_c: np.ndarray, vars_t: np.ndarray, scaled_counts_c: np.ndar return np.sqrt(var) -def calc_ate_se(vars_c: np.ndarray, vars_t: np.ndarray, - scaled_counts_c: np.ndarray, scaled_counts_t: np.ndarray) -> float: +def calc_ate_se( + vars_c: np.ndarray, vars_t: np.ndarray, scaled_counts_c: np.ndarray, scaled_counts_t: np.ndarray +) -> float: """Calculates Average Treatment Effect for the control group (ATC) standard error. Args: @@ -759,8 +760,9 @@ def scaled_counts(N: int, matches: np.ndarray, silent: bool = True) -> np.ndarra A numpy array of matched indexes from control or treated group silent: bool, optional If true logger in info mode + Returns: - numpy.ndarray: An array representing the number of times each subject has appeared as a match + np.ndarray: An array representing the number of times each subject has appeared as a match """ s_counts = np.zeros(N) diff --git a/lightautoml/addons/hypex/matcher.py b/lightautoml/addons/hypex/matcher.py index bf8ea298..9535ad6c 100644 --- a/lightautoml/addons/hypex/matcher.py +++ b/lightautoml/addons/hypex/matcher.py @@ -192,8 +192,8 @@ def _apply_filter(self, filter_class, *filter_args): Args: filter_class: class The class of the filter to apply. - *filter_args: (args) - Arguments to pass to the filter class. + *filter_args: (args) #noqa: DAR101 + Arguments to pass to the filter class. """ filter_instance = filter_class(*filter_args) self.input_data = filter_instance.perform_filter(self.input_data) diff --git a/lightautoml/addons/hypex/selectors/lama_feature_selector.py b/lightautoml/addons/hypex/selectors/lama_feature_selector.py index df21039e..99a01cdb 100644 --- a/lightautoml/addons/hypex/selectors/lama_feature_selector.py +++ b/lightautoml/addons/hypex/selectors/lama_feature_selector.py @@ -1,13 +1,15 @@ """Feature selection class using LAMA.""" import logging -import pandas as pd from typing import List +import pandas as pd + from ....automl.presets.tabular_presets import TabularAutoML from ....report import ReportDeco from ....tasks import Task + logger = logging.getLogger("lama_feature_selector") console_out = logging.StreamHandler() logging.basicConfig( @@ -19,21 +21,20 @@ class LamaFeatureSelector: - """The main class of LAMA Feature selector. + """The main class of LAMA Feature selector. Select top features. By default, use LGM.""" - Select top features. By default, use LGM""" def __init__( - self, - outcome: str, - outcome_type: str, - treatment: str, - timeout: int, - n_threads: int, - n_folds: int, - verbose: bool, # не используется - generate_report: bool, - report_dir: str, - use_algos: List[str], + self, + outcome: str, + outcome_type: str, + treatment: str, + timeout: int, + n_threads: int, + n_folds: int, + verbose: bool, # не используется + generate_report: bool, + report_dir: str, + use_algos: List[str], ): """Initialize the LamaFeatureSelector. @@ -108,7 +109,10 @@ def perform_selection(self, df: pd.DataFrame) -> pd.DataFrame: timeout=self.timeout, cpu_limit=self.n_threads, general_params={"use_algos": [self.use_algos]}, - reader_params={"n_jobs": self.n_threads, "cv": self.n_folds, }, + reader_params={ + "n_jobs": self.n_threads, + "cv": self.n_folds, + }, ) if self.generate_report: diff --git a/lightautoml/addons/hypex/selectors/outliers_filter.py b/lightautoml/addons/hypex/selectors/outliers_filter.py index 6bbc1643..fb23831e 100644 --- a/lightautoml/addons/hypex/selectors/outliers_filter.py +++ b/lightautoml/addons/hypex/selectors/outliers_filter.py @@ -3,6 +3,7 @@ import pandas as pd + logger = logging.getLogger("outliers_filter") console_out = logging.StreamHandler() logging.basicConfig( @@ -14,12 +15,10 @@ class OutliersFilter: - """The main class of Outliers Filter. + """The main class of Outliers Filter. It creates a row indices that should be deleted by percentile.""" - It creates a row indices that should be deleted by percentile.""" def __init__(self, interquartile_coeff, mode_percentile, min_percentile, max_percentile): - """ - Initializes the OutliersFilter. + """Initializes the OutliersFilter. Args: interquartile_coeff: float diff --git a/lightautoml/addons/hypex/selectors/spearman_filter.py b/lightautoml/addons/hypex/selectors/spearman_filter.py index ec8f51f4..2e90dbe7 100644 --- a/lightautoml/addons/hypex/selectors/spearman_filter.py +++ b/lightautoml/addons/hypex/selectors/spearman_filter.py @@ -2,8 +2,10 @@ import logging import pandas as pd + from scipy.stats import spearmanr + PVALUE = 0.05 logger = logging.getLogger("spearman_filter") diff --git a/lightautoml/addons/hypex/utils/metrics.py b/lightautoml/addons/hypex/utils/metrics.py index 38faf520..915e4326 100644 --- a/lightautoml/addons/hypex/utils/metrics.py +++ b/lightautoml/addons/hypex/utils/metrics.py @@ -3,10 +3,12 @@ import numpy as np import pandas as pd + from scipy.stats import ks_2samp from ..utils.psi_pandas import report + logger = logging.getLogger("metrics") console_out = logging.StreamHandler() logging.basicConfig( @@ -74,8 +76,9 @@ def ks(orig: pd.DataFrame, matched: pd.DataFrame, silent=False) -> dict: return ks_dict -def matching_quality(data: pd.DataFrame, treatment: str, features: list, features_psi: list, - silent: bool = False) -> tuple: +def matching_quality( + data: pd.DataFrame, treatment: str, features: list, features_psi: list, silent: bool = False +) -> tuple: """Wraps the functionality for estimating matching quality. Args: diff --git a/lightautoml/addons/hypex/utils/psi_pandas.py b/lightautoml/addons/hypex/utils/psi_pandas.py index 91347336..1e172674 100644 --- a/lightautoml/addons/hypex/utils/psi_pandas.py +++ b/lightautoml/addons/hypex/utils/psi_pandas.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd + logger = logging.getLogger("psi_pandas") console_out = logging.StreamHandler() logging.basicConfig( @@ -43,7 +44,7 @@ class PSI: """ def __init__( - self, expected: pd.DataFrame, actual: pd.DataFrame, column_name: str, plot: bool = False, silent=False + self, expected: pd.DataFrame, actual: pd.DataFrame, column_name: str, plot: bool = False, silent=False ): """Initializes the PSI class with given parameters. @@ -214,7 +215,7 @@ def psi_num(self): def uniq_psi(self): """Calculates PSI for categorical unique counts grater than 100. - Returns: + Returns: float: PSI for column dict: The PSI for each bucket list: New categories (empty list for non-categorical data) @@ -328,10 +329,10 @@ def psi_categ(self): reminder = g_counts % group_num for g_n in range(group_num): if g_n < group_num - reminder: - group_values = category_names[int(current_pos): int(current_pos + group_size)] + group_values = category_names[int(current_pos) : int(current_pos + group_size)] current_pos += group_size else: - group_values = category_names[int(current_pos): int(current_pos + group_size + 1)] + group_values = category_names[int(current_pos) : int(current_pos + group_size + 1)] current_pos += group_size + 1 for val in group_values: groups[val] = g_n @@ -402,7 +403,7 @@ def psi_result(self): for i in range(0, len(psi_values)): if (self.column_type == np.dtype("O")) or ( - self.expected_nulls == self.expected_len and self.actual_nulls == self.actual_len + self.expected_nulls == self.expected_len and self.actual_nulls == self.actual_len ): psi_values, psi_dict, new_cats, abs_cats = self.psi_categ() else: diff --git a/lightautoml/addons/hypex/utils/validators.py b/lightautoml/addons/hypex/utils/validators.py index f792fc9a..b4de9911 100644 --- a/lightautoml/addons/hypex/utils/validators.py +++ b/lightautoml/addons/hypex/utils/validators.py @@ -15,7 +15,7 @@ def random_treatment(df: pd.DataFrame, treatment: str): treatment: str The columns name representing the treatment - Return: + Returns: pd.DataFrame: The modified dataframe with the original treatment replaced pd.Series: The original treatment series int: A validation flag @@ -33,13 +33,13 @@ def random_treatment(df: pd.DataFrame, treatment: str): def random_feature(df: pd.DataFrame): """Adds a random feature to the initial dataset. - Args: - df: pd.DataFrame + Args: + df: pd.DataFrame The initial dataframe - Return: - pd.DataFrame: The modified dataframe with an additional random feature - int: A validation flag + Returns: + pd.DataFrame: The modified dataframe with an additional random feature + int: A validation flag """ feature = np.random.normal(0, 1, size=len(df)) validate = 1 @@ -50,17 +50,17 @@ def random_feature(df: pd.DataFrame): def subset_refuter(df: pd.DataFrame, treatment: str, fraction: float = 0.8): """Returns a subset of data with given fraction (default 0.8). - Args: - df: pd.DataFrame + Args: + df: pd.DataFrame The initial dataframe - treatment: str + treatment: str The column name representing the treatment fraction: float The fraction of the dataset to divide random matching - Return: - pd.DataFrame: The subset of the dataframe - int: A validation flag + Returns: + pd.DataFrame: The subset of the dataframe + int: A validation flag """ df = df.groupby(treatment, group_keys=False).apply(lambda x: x.sample(frac=fraction)) validate = 1 @@ -71,13 +71,13 @@ def test_significance(estimate: float, simulations: List) -> float: """Performs a significance test for a normal distribution. Args: - estimate: float + estimate: float The estimated effect - simulations: list + simulations: list A list of estimated effects from each simulation - Return: - float: The p-value of the test + Returns: + float: The p-value of the test """ mean_refute_value = np.mean(simulations) std_dev_refute_values = np.std(simulations)