From fe3027b2e66046a4502a9ae4329d08875381c585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Rod=C3=A9s-Guirao?= Date: Wed, 16 Oct 2024 15:46:13 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20anomalist:=20GP=20support,=20refact?= =?UTF-8?q?or=20functions,=20add=20dfReduced=20(#3416)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ anomalist: nits * abstract df parsing logic * add GP outlier * add dfReduced to table * reset index * incorporate GP * re-arrange functions, add link to indicator * stop reducing dfScore --- apps/anomalist/anomalist_api.py | 10 + apps/anomalist/detectors.py | 7 - apps/wizard/app_pages/anomalist/app.py | 311 ++++++++++++----------- apps/wizard/app_pages/anomalist/utils.py | 1 + etl/grapher_model.py | 18 ++ 5 files changed, 186 insertions(+), 161 deletions(-) diff --git a/apps/anomalist/anomalist_api.py b/apps/anomalist/anomalist_api.py index 7624a33f646..4f92d4cf2b6 100644 --- a/apps/anomalist/anomalist_api.py +++ b/apps/anomalist/anomalist_api.py @@ -284,6 +284,15 @@ def anomaly_detection( ) anomaly.dfScore = df_score_long + # Reduce dataframe + df_score_long_reduced = ( + df_score_long.sort_values("anomaly_score", ascending=False) + .drop_duplicates(subset=["entity_name", "variable_id"], keep="first") + .reset_index(drop=True) + ) + anomaly.dfReduced = df_score_long_reduced + + ################################################################## # TODO: Use this as an alternative to storing binary files in the DB # anomaly = gm.Anomaly( # datasetId=dataset_id, @@ -293,6 +302,7 @@ def anomaly_detection( # # Export anomaly file # anomaly.path_file = export_anomalies_file(df_score, dataset_id, detector.anomaly_type) + ################################################################## if not dry_run: with Session(engine) as session: diff --git a/apps/anomalist/detectors.py b/apps/anomalist/detectors.py index cbb872a07e3..fdfdcbfd7ae 100644 --- a/apps/anomalist/detectors.py +++ b/apps/anomalist/detectors.py @@ -41,13 +41,6 @@ def get_long_format_score_df(df_score: pd.DataFrame) -> pd.DataFrame: # Drop zero anomalies. df_score_long = df_score_long[df_score_long["anomaly_score"] != 0] - # For now, keep only the highest anomaly score for each country-indicator. - df_score_long = ( - df_score_long.sort_values("anomaly_score", ascending=False) - .drop_duplicates(subset=["variable_id", "entity_name"], keep="first") - .reset_index(drop=True) - ) - # Save memory by converting to categoricals. df_score_long = df_score_long.astype({"entity_name": "category", "year": "category", "variable_id": "category"}) diff --git a/apps/wizard/app_pages/anomalist/app.py b/apps/wizard/app_pages/anomalist/app.py index bdae049a63e..dfa3b109606 100644 --- a/apps/wizard/app_pages/anomalist/app.py +++ b/apps/wizard/app_pages/anomalist/app.py @@ -30,6 +30,7 @@ from apps.wizard.utils.chart_config import bake_chart_config from apps.wizard.utils.components import Pagination, grapher_chart, st_horizontal, tag_in_md from apps.wizard.utils.db import WizardDB +from etl.config import OWID_ENV # PAGE CONFIG st.set_page_config( @@ -56,8 +57,16 @@ "color": "red", "icon": ":material/hide_source", }, + AnomalyTypeEnum.GP_OUTLIER.value: { + "tag_name": "Gaussian Process", + "color": "blue", + "icon": ":material/notifications", + }, } ANOMALY_TYPE_NAMES = {k: v["tag_name"] for k, v in ANOMALY_TYPES.items()} +# TODO: Remove the `t != AnomalyTypeEnum.GP_OUTLIER.value` bit to also query for GP outliers. +ANOMALY_TYPES_TO_DETECT = tuple(t for t in ANOMALY_TYPES.keys() if t != AnomalyTypeEnum.GP_OUTLIER.value) + SORTING_STRATEGIES = { "relevance": "Relevance", @@ -66,6 +75,7 @@ "views": "Chart views", "population+views": "Population+views", } + # SESSION STATE # Datasets selected by the user in first multiselect st.session_state.anomalist_datasets_selected = st.session_state.get("anomalist_datasets_selected", []) @@ -93,10 +103,7 @@ # FLAG: True to trigger anomaly detection manually st.session_state.anomalist_trigger_detection = st.session_state.get("anomalist_trigger_detection", False) -###################################################################### -# MOCK VARIABLES AND FUNCTIONS -###################################################################### -# DEBUGGING + ###################################################################### # FUNCTIONS ###################################################################### @@ -111,16 +118,140 @@ def get_variable_mapping(variable_ids): return mapping -def _change_chart_selection(df, key_table, key_selection): - """Change selection in grapher chart.""" - # st.toast(f"Changing entity in indicator {indicator_id}") - # Get selected row number - rows = st.session_state[key_table]["selection"]["rows"] +def parse_anomalies_to_df() -> pd.DataFrame | None: + """Given a list of anomalies, parse them into a dataframe. - # Update entities in chart - st.session_state[key_selection] = df.iloc[rows]["entity_name"].tolist() + This function takes the anomalies stored in st.session_state.anomalist_anomalies and parses them into a single dataframe. + + - It loads dfScore from each anomaly. + - It keeps only one row per entity and anomaly type, based on the highest anomaly score. + - Concatenates all dataframes. + - Renames columns to appropriate names. + - Adjusts dtypes. + - Adds population and analytics scores. + - Calculates weighed combined score + + """ + # Only return dataframe if there are anomalies! + if len(st.session_state.anomalist_anomalies) > 0: + dfs = [] + for anomaly in st.session_state.anomalist_anomalies: + # Load + df = anomaly.dfReduced + if isinstance(df, pd.DataFrame): + # Assign anomaly type in df + df["type"] = anomaly.anomalyType + # Add to list + dfs.append(df) + else: + raise ValueError(f"Anomaly {anomaly} has no dfScore attribute.") + + # Concatenate all dfs + df = cast(pd.DataFrame, pd.concat(dfs, ignore_index=True)) + + # Rename columns + df = df.rename( + columns={ + "variable_id": "indicator_id", + "anomaly_score": "score", + } + ) + # Dtypes + df = df.astype( + { + "year": int, + } + ) + # Add population and analytics score: + df["score_population"] = 1 + df["score_analytics"] = 1 + + # Weighed combined score + w_score = 1 + w_pop = 1 + w_views = 1 + df["score_weighed"] = ( + w_score * df["score"] + w_pop * df["score_population"] + w_views * df["score_analytics"] + ) / (w_score + w_pop + w_views) + + return df + + +def ask_llm_for_summary(df: pd.DataFrame): + pass + + +# Functions to filter the results +def filter_df(df: pd.DataFrame): + """Apply filters from user to the dataframe. + + Filter parameters are stored in the session state: + + - `anomalist_filter_entities`: list of entities to filter. + - `anomalist_filter_indicators`: list of indicators to filter. + - `anomalist_filter_anomaly_types`: list of anomaly types to filter. + - `anomalist_min_year`: minimum year to filter. + - `anomalist_max_year`: maximum year to filter. + - `anomalist_sorting_strategy`: sorting strategy. + """ + # Filter dataframe + df = _filter_df( + df=df, + year_min=st.session_state.anomalist_min_year, + year_max=st.session_state.anomalist_max_year, + anomaly_types=st.session_state.anomalist_filter_anomaly_types, + entities=st.session_state.anomalist_filter_entities, + indicators=st.session_state.anomalist_filter_indicators, + ) + ## Sort dataframe + df, st.session_state.anomalist_sorting_columns = _sort_df(df, st.session_state.anomalist_sorting_strategy) + return df + + +@st.cache_data +def _filter_df(df: pd.DataFrame, year_min, year_max, anomaly_types, entities, indicators) -> pd.DataFrame: + """Used in filter_df.""" + ## Year + df = df[(df["year"] >= year_min) & (df["year"] <= year_max)] + ## Anomaly type + df = df[~df["type"].isin(anomaly_types)] + ## Entities + if len(entities) > 0: + df = df[df["entity_name"].isin(entities)] + # Indicators + if len(indicators) > 0: + df = df[df["indicator_id"].isin(indicators)] + + return df + + +@st.cache_data +def _sort_df(df: pd.DataFrame, sort_strategy: str) -> Tuple[pd.DataFrame, List[str]]: + """Used in filter_df.""" + ## Sort + columns_sort = [] + match sort_strategy: + case "relevance": + columns_sort = ["score_weighed"] + case "score": + columns_sort = ["score"] + case "population": + columns_sort = ["score_population"] + case "views": + columns_sort = ["score_analytics"] + case "population+views": + columns_sort = ["score_population", "score_analytics"] + case _: + pass + if columns_sort != []: + df = df.sort_values(columns_sort, ascending=False) + + return df, columns_sort + + +# Functions to show the anomalies @st.fragment def show_anomaly_compact(index, df): """Show anomaly compactly. @@ -151,20 +282,22 @@ def show_anomaly_compact(index, df): text = f"There are abrupt changes for {entity_default} in {year_default}! There might be other data points affected." elif an_type == AnomalyTypeEnum.UPGRADE_MISSING.value: text = f"There are missing values for {entity_default}! There might be other data points affected." + elif an_type == AnomalyTypeEnum.GP_OUTLIER.value: + text = f"There are some outliers for {entity_default}! These were detected using Gaussian processes. There might be other data points affected." else: raise ValueError(f"Unknown anomaly type: {an_type}") # Render with st.container(border=True): # Title - st.markdown(f"{tag_in_md(**ANOMALY_TYPES[an_type])} **{indicator_uri}**") + link = OWID_ENV.indicator_admin_site(indicator_id) + st.markdown(f"{tag_in_md(**ANOMALY_TYPES[an_type])} **[{indicator_uri}]({link})**") col1, col2 = st.columns(2) # Chart with col1: # Bake chart config # If the anomaly is compared to previous indicator, then we need to show two indicators (old and new)! if an_type in {AnomalyTypeEnum.UPGRADE_CHANGE.value, AnomalyTypeEnum.UPGRADE_MISSING.value}: - # TODO: Uncomment the following code to show comparison between old and new indicator versions. display = [ { "name": "New", @@ -191,7 +324,7 @@ def show_anomaly_compact(index, df): config = bake_chart_config(variable_id=indicator_id, selected_entities=entities) config["hideAnnotationFieldsInTitle"]["time"] = True # Actually plot - grapher_chart(chart_config=config) + grapher_chart(chart_config=config, owid_env=OWID_ENV) # Description and other entities with col2: @@ -215,86 +348,17 @@ def show_anomaly_compact(index, df): # st.button("Hide anomaly", key=key_btn, icon=":material/hide:") -def show_anomaly(anomaly, indicator_id): - """Show anomaly details. - - Renders an anomaly. Title, description and possibly a chart. - - TODO: use if we want to expand anomalies to have one box per entity too. - """ - with st.container(border=True): - col1, col2 = st.columns(2) - with col1: - st.markdown(f"##### {anomaly['title']}") - st.markdown(f"{anomaly['description']}") - with col2: - # st.write(indicator.id) - grapher_chart(variable_id=indicator_id, selected_entities=[anomaly["country"]]) - - -def filter_df(df: pd.DataFrame): - """Apply filters from user to the dataframe. - - Filter parameters are stored in the session state: - - - `anomalist_filter_entities`: list of entities to filter. - - `anomalist_filter_indicators`: list of indicators to filter. - - `anomalist_filter_anomaly_types`: list of anomaly types to filter. - - `anomalist_min_year`: minimum year to filter. - - `anomalist_max_year`: maximum year to filter. - - `anomalist_sorting_strategy`: sorting strategy. - """ - # Filter dataframe - df = _filter_df( - df=df, - year_min=st.session_state.anomalist_min_year, - year_max=st.session_state.anomalist_max_year, - anomaly_types=st.session_state.anomalist_filter_anomaly_types, - entities=st.session_state.anomalist_filter_entities, - indicators=st.session_state.anomalist_filter_indicators, - ) - ## Sort dataframe - df, st.session_state.anomalist_sorting_columns = _sort_df(df, st.session_state.anomalist_sorting_strategy) - return df - - -@st.cache_data -def _filter_df(df: pd.DataFrame, year_min, year_max, anomaly_types, entities, indicators) -> pd.DataFrame: - ## Year - df = df[(df["year"] >= year_min) & (df["year"] <= year_max)] - ## Anomaly type - df = df[~df["type"].isin(anomaly_types)] - ## Entities - if len(entities) > 0: - df = df[df["entity_name"].isin(entities)] - # Indicators - if len(indicators) > 0: - df = df[df["indicator_id"].isin(indicators)] - - return df +def _change_chart_selection(df, key_table, key_selection): + """Change selection in grapher chart.""" + # st.toast(f"Changing entity in indicator {indicator_id}") + # Get selected row number + rows = st.session_state[key_table]["selection"]["rows"] + # Update entities in chart + st.session_state[key_selection] = df.iloc[rows]["entity_name"].tolist() -@st.cache_data -def _sort_df(df: pd.DataFrame, sort_strategy: str) -> Tuple[pd.DataFrame, List[str]]: - ## Sort - columns_sort = [] - match sort_strategy: - case "relevance": - columns_sort = ["score_weighed"] - case "score": - columns_sort = ["score"] - case "population": - columns_sort = ["score_population"] - case "views": - columns_sort = ["score_analytics"] - case "population+views": - columns_sort = ["score_population", "score_analytics"] - case _: - pass - if columns_sort != []: - df = df.sort_values(columns_sort, ascending=False) - return df, columns_sort +###################################################################### # Load the main inputs: @@ -370,7 +434,7 @@ def _sort_df(df: pd.DataFrame, sort_strategy: str) -> Tuple[pd.DataFrame, List[s with st.spinner("Scanning for anomalies... This can take some time."): anomaly_detection( - anomaly_types=tuple(ANOMALY_TYPE_NAMES.keys()), + anomaly_types=ANOMALY_TYPES_TO_DETECT, variable_ids=variable_ids, variable_mapping=st.session_state.anomalist_mapping, dry_run=False, @@ -395,68 +459,7 @@ def _sort_df(df: pd.DataFrame, sort_strategy: str) -> Tuple[pd.DataFrame, List[s st.session_state.anomalist_anomalies_out_of_date = False # 3.4/ Parse obtained anomalist into dataframe - if len(st.session_state.anomalist_anomalies) > 0: - ############################################################################################################### - # TODO: Encapsulate this code in a function, add real population and analytics scores - dfs = [] - for anomaly in st.session_state.anomalist_anomalies: - # Load - df = anomaly.dfScore - if isinstance(df, pd.DataFrame): - # TODO: We should not store all-zero dataframes in table if there is no variable mapping! - # if (df["anomaly_score"] == 0).all(): - # continue - # Reduce df - # st.write(df) - df = df.sort_values("anomaly_score", ascending=False) - df = df.drop_duplicates(subset=["entity_name", "variable_id"], keep="first") - # Assign anomaly type in df - df["type"] = anomaly.anomalyType - # Add to list - dfs.append(df) - else: - raise ValueError(f"Anomaly {anomaly} has no dfScore attribute.") - - # Concatenate all dfs - df = cast(pd.DataFrame, pd.concat(dfs, ignore_index=True)) - - # Rename columns - df = df.rename( - columns={ - "variable_id": "indicator_id", - "anomaly_score": "score", - } - ) - - # Add population and analytics score: - df["score_population"] = 1 - df["score_analytics"] = 1 - - # Dtypes - df = df.astype( - { - "year": int, - } - ) - - # Weighed combined score - w_score = 1 - w_pop = 1 - w_views = 1 - df["score_weighed"] = ( - w_score * df["score"] + w_pop * df["score_population"] + w_views * df["score_analytics"] - ) / (w_score + w_pop + w_views) - - st.session_state.anomalist_df = df - # Former mock data - # st.session_state.anomalist_df = mock_anomalies_df( - # indicators_id, - # indicators_id_upgrade, - # n=1000, - # ) - ############################################################################################################### - else: - st.session_state.anomalist_df = None + st.session_state.anomalist_df = parse_anomalies_to_df() # 4/ SHOW ANOMALIES (only if any are found) if st.session_state.anomalist_df is not None: diff --git a/apps/wizard/app_pages/anomalist/utils.py b/apps/wizard/app_pages/anomalist/utils.py index a9d92c7956a..c5f4915dc3b 100644 --- a/apps/wizard/app_pages/anomalist/utils.py +++ b/apps/wizard/app_pages/anomalist/utils.py @@ -18,6 +18,7 @@ class AnomalyTypeEnum(Enum): TIME_CHANGE = "time_change" UPGRADE_CHANGE = "upgrade_change" UPGRADE_MISSING = "upgrade_missing" + GP_OUTLIER = "gp_outlier" # AI = "ai" # Uncomment if needed diff --git a/etl/grapher_model.py b/etl/grapher_model.py index 135d4a47e4b..62a1affa42f 100644 --- a/etl/grapher_model.py +++ b/etl/grapher_model.py @@ -1756,6 +1756,7 @@ class Anomaly(Base): anomalyType: Mapped[str] = mapped_column(VARCHAR(255), default=str) path_file: Mapped[Optional[str]] = mapped_column(VARCHAR(255), default=None) _dfScore: Mapped[Optional[bytes]] = mapped_column("dfScore", LONGBLOB, default=None) + _dfReduced: Mapped[Optional[bytes]] = mapped_column("dfReduced", LONGBLOB, default=None) # catalogPath: Mapped[str] = mapped_column(VARCHAR(255), default=None) # NOTE: why do we need indicatorChecksum? # Answer: This can be useful to assign an anomaly to a specific snapshot of the indicator. Unclear if we need it atm, but maybe in the future... @@ -1783,6 +1784,23 @@ def dfScore(self, value: Optional[pd.DataFrame]) -> None: buffer.seek(0) self._dfScore = buffer.read() + @hybrid_property + def dfReduced(self) -> Optional[pd.DataFrame]: # type: ignore + if self._dfReduced is None: + return None + buffer = io.BytesIO(self._dfReduced) + return feather.read_feather(buffer) + + @dfReduced.setter + def dfReduced(self, value: Optional[pd.DataFrame]) -> None: + if value is None: + self._dfReduced = None + else: + buffer = io.BytesIO() + feather.write_feather(value, buffer, compression="zstd") + buffer.seek(0) + self._dfReduced = buffer.read() + @classmethod def load_anomalies(cls, session: Session, dataset_id: List[int]) -> List["Anomaly"]: return session.scalars(select(cls).where(cls.datasetId.in_(dataset_id))).all() # type: ignore