diff --git a/apps/anomalist/anomalist_api.py b/apps/anomalist/anomalist_api.py index e3b56909315..bbe65aeb733 100644 --- a/apps/anomalist/anomalist_api.py +++ b/apps/anomalist/anomalist_api.py @@ -10,6 +10,7 @@ from sqlalchemy.orm import Session from apps.anomalist.detectors import ( + AnomalyDetector, AnomalyTimeChange, AnomalyUpgradeChange, AnomalyUpgradeMissing, @@ -45,6 +46,12 @@ ######################################################################################################################## + +def load_detector(anomaly_type: ANOMALY_TYPE) -> AnomalyDetector: + """Load detector.""" + return ANOMALY_DETECTORS[anomaly_type] + + # AGGREGATE ANOMALIES: @@ -374,7 +381,15 @@ def _load_variables_meta(engine: Engine, variable_ids: list[int]) -> list[gm.Var def combine_and_reduce_scores_df(anomalies: List[gm.Anomaly]) -> pd.DataFrame: """Get the combined dataframe with scores for all anomalies, and reduce it to include only the largest anomaly for each contry-indicator.""" # Combine the reduced dataframes for all anomalies into a single dataframe. - dfs = [cast(pd.DataFrame, anomaly.dfReduced).assign(**{"type": anomaly.anomalyType}) for anomaly in anomalies] + dfs = [] + for anomaly in anomalies: + df = anomaly.dfReduced + if df is None: + log.warning(f"Anomaly {anomaly} has no reduced dataframe.") + continue + df["type"] = anomaly.anomalyType + dfs.append(df) + df_reduced = cast(pd.DataFrame, pd.concat(dfs, ignore_index=True)) # Dtypes # df = df.astype({"year": int}) diff --git a/apps/anomalist/detectors.py b/apps/anomalist/detectors.py index b8349f03ae5..f431fb554a2 100644 --- a/apps/anomalist/detectors.py +++ b/apps/anomalist/detectors.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Literal, Optional +from typing import Dict, List, Optional import numpy as np import pandas as pd @@ -14,9 +14,6 @@ # Name of index columns for dataframe. INDEX_COLUMNS = ["entity_name", "year"] -# Define anomaly types. -ANOMALY_TYPE = Literal["upgrade_change", "time_change", "upgrade_missing"] - def estimate_bard_epsilon(series: pd.Series) -> float: # Make all values positive, and ignore zeros. @@ -56,7 +53,8 @@ def get_long_format_score_df(df_score: pd.DataFrame) -> pd.DataFrame: class AnomalyDetector: anomaly_type: str - def get_text(self, entity: str, year: int) -> str: + @staticmethod + def get_text(entity: str, year: int) -> str: return f"Anomaly happened in {entity} in {year}!" def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: @@ -133,6 +131,10 @@ class AnomalyUpgradeMissing(AnomalyDetector): anomaly_type = "upgrade_missing" + @staticmethod + def get_text(entity: str, year: int) -> str: + return f"There are missing values for {entity}! There might be other data points affected." + def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Create a dataframe of zeros. df_lost = self.get_zeros_df(df, variable_ids) @@ -150,6 +152,10 @@ class AnomalyUpgradeChange(AnomalyDetector): anomaly_type = "upgrade_change" + @staticmethod + def get_text(entity: str, year: int) -> str: + return f"There are abrupt changes for {entity} in {year}! There might be other data points affected." + def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Create a dataframe of zeros. df_version_change = self.get_zeros_df(df, variable_ids) @@ -170,6 +176,10 @@ class AnomalyTimeChange(AnomalyDetector): anomaly_type = "time_change" + @staticmethod + def get_text(entity: str, year: int) -> str: + return f"There are significant changes for {entity} in {year} compared to the old version of the indicator. There might be other data points affected." + def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Create a dataframe of zeros. df_time_change = self.get_zeros_df(df, variable_ids) diff --git a/apps/anomalist/gp_detector.py b/apps/anomalist/gp_detector.py index be01139d2e4..6bd2950b1b6 100644 --- a/apps/anomalist/gp_detector.py +++ b/apps/anomalist/gp_detector.py @@ -65,10 +65,15 @@ def _processing_queue(items: list[tuple[str, int]]) -> List[tuple]: class AnomalyGaussianProcessOutlier(AnomalyDetector): anomaly_type = "gp_outlier" - def __init__(self, max_time: Optional[float] = None, n_jobs: int = 1): + # TODO: max_time is hard-coded to 100, but it should be configurable in production + def __init__(self, max_time: Optional[float] = 100, n_jobs: int = 1): self.max_time = max_time self.n_jobs = n_jobs + @staticmethod + def get_text(entity: str, year: int) -> str: + return f"There are some outliers for {entity}! These were detected using Gaussian processes. There might be other data points affected." + def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Convert to long format df_wide = ( @@ -126,6 +131,10 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi log.info("Finished processing", elapsed=round(time.time() - start_time, 2)) df_score_long = pd.concat(results).reset_index() + + # Normalize the anomaly scores by mapping interval (0, 3+) to (0, 1) + df_score_long["anomaly_score"] = np.minimum(df_score_long["anomaly_score"] / 3, 1) + return df_score_long @staticmethod diff --git a/apps/wizard/app_pages/anomalist/app.py b/apps/wizard/app_pages/anomalist/app.py index ee6451d4427..49c596eae5f 100644 --- a/apps/wizard/app_pages/anomalist/app.py +++ b/apps/wizard/app_pages/anomalist/app.py @@ -25,7 +25,7 @@ import pandas as pd import streamlit as st -from apps.anomalist.anomalist_api import anomaly_detection +from apps.anomalist.anomalist_api import anomaly_detection, load_detector from apps.utils.gpt import OpenAIWrapper, get_cost_and_tokens, get_number_tokens from apps.wizard.app_pages.anomalist.utils import ( AnomalyTypeEnum, @@ -72,8 +72,7 @@ }, } ANOMALY_TYPE_NAMES = {k: v["tag_name"] for k, v in ANOMALY_TYPES.items()} -# TODO: Remove the `t != AnomalyTypeEnum.GP_OUTLIER.value` bit to also query for GP outliers. -ANOMALY_TYPES_TO_DETECT = tuple(t for t in ANOMALY_TYPES.keys() if t != AnomalyTypeEnum.GP_OUTLIER.value) +ANOMALY_TYPES_TO_DETECT = tuple(ANOMALY_TYPES.keys()) # GPT MODEL_NAME = "gpt-4o" @@ -346,16 +345,7 @@ def show_anomaly_compact(index, df): indicator_uri = st.session_state.anomalist_indicators.get(indicator_id) # Generate descriptive text. Only contains information about top-scoring entity. - if an_type == AnomalyTypeEnum.TIME_CHANGE.value: - text = f"There are significant changes for {entity_default} in {year_default} compared to the old version of the indicator. There might be other data points affected." - elif an_type == AnomalyTypeEnum.UPGRADE_CHANGE.value: - text = f"There are abrupt changes for {entity_default} in {year_default}! There might be other data points affected." - elif an_type == AnomalyTypeEnum.UPGRADE_MISSING.value: - text = f"There are missing values for {entity_default}! There might be other data points affected." - elif an_type == AnomalyTypeEnum.GP_OUTLIER.value: - text = f"There are some outliers for {entity_default}! These were detected using Gaussian processes. There might be other data points affected." - else: - raise ValueError(f"Unknown anomaly type: {an_type}") + text = load_detector(an_type).get_text(entity_default, year_default) # Render with st.container(border=True): @@ -674,13 +664,13 @@ def _change_chart_selection(df, key_table, key_selection): pagination_key="pagination-demo", ) - # Show controls only if needed - if len(items) > items_per_page: - pagination.show_controls(mode="bar") - # Show items (only current page) for item in pagination.get_page_items(): show_anomaly_compact(item[0], item[1]) + # Show controls only if needed + if len(items) > items_per_page: + pagination.show_controls(mode="bar") + # Reset state set_states({"anomalist_datasets_submitted": False}) diff --git a/etl/grapher_model.py b/etl/grapher_model.py index 62a1affa42f..316c7aeb460 100644 --- a/etl/grapher_model.py +++ b/etl/grapher_model.py @@ -23,6 +23,7 @@ from typing import Any, Dict, List, Literal, Optional, Union, get_args, overload import humps +import numpy as np import pandas as pd import requests import structlog @@ -1274,7 +1275,7 @@ def from_id_or_path( # Multiple path or id elif isinstance(id_or_path, list): # Filter the list to ensure only integers are passed - int_ids = [i for i in id_or_path if isinstance(i, int)] + int_ids = [i for i in id_or_path if isinstance(i, (int, np.integer))] str_ids = [i for i in id_or_path if isinstance(i, str)] # Multiple IDs if len(int_ids) == len(id_or_path): @@ -1767,6 +1768,12 @@ class Anomaly(Base): # year: Mapped[int] = mapped_column(Integer) # rawScore: Mapped[float] = mapped_column(Float) + def __repr__(self) -> str: + return ( + f"" + ) + @hybrid_property def dfScore(self) -> Optional[pd.DataFrame]: # type: ignore if self._dfScore is None: