Skip to content

Commit

Permalink
🐛 Fix anomalist bugs (#3427)
Browse files Browse the repository at this point in the history
* 🐛 Fix anomalist bugs
  • Loading branch information
Marigold authored Oct 18, 2024
1 parent 885a6b5 commit 9e1caac
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 25 deletions.
17 changes: 16 additions & 1 deletion apps/anomalist/anomalist_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sqlalchemy.orm import Session

from apps.anomalist.detectors import (
AnomalyDetector,
AnomalyTimeChange,
AnomalyUpgradeChange,
AnomalyUpgradeMissing,
Expand Down Expand Up @@ -45,6 +46,12 @@

########################################################################################################################


def load_detector(anomaly_type: ANOMALY_TYPE) -> AnomalyDetector:
"""Load detector."""
return ANOMALY_DETECTORS[anomaly_type]


# AGGREGATE ANOMALIES:


Expand Down Expand Up @@ -374,7 +381,15 @@ def _load_variables_meta(engine: Engine, variable_ids: list[int]) -> list[gm.Var
def combine_and_reduce_scores_df(anomalies: List[gm.Anomaly]) -> pd.DataFrame:
"""Get the combined dataframe with scores for all anomalies, and reduce it to include only the largest anomaly for each contry-indicator."""
# Combine the reduced dataframes for all anomalies into a single dataframe.
dfs = [cast(pd.DataFrame, anomaly.dfReduced).assign(**{"type": anomaly.anomalyType}) for anomaly in anomalies]
dfs = []
for anomaly in anomalies:
df = anomaly.dfReduced
if df is None:
log.warning(f"Anomaly {anomaly} has no reduced dataframe.")
continue
df["type"] = anomaly.anomalyType
dfs.append(df)

df_reduced = cast(pd.DataFrame, pd.concat(dfs, ignore_index=True))
# Dtypes
# df = df.astype({"year": int})
Expand Down
20 changes: 15 additions & 5 deletions apps/anomalist/detectors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Literal, Optional
from typing import Dict, List, Optional

import numpy as np
import pandas as pd
Expand All @@ -14,9 +14,6 @@
# Name of index columns for dataframe.
INDEX_COLUMNS = ["entity_name", "year"]

# Define anomaly types.
ANOMALY_TYPE = Literal["upgrade_change", "time_change", "upgrade_missing"]


def estimate_bard_epsilon(series: pd.Series) -> float:
# Make all values positive, and ignore zeros.
Expand Down Expand Up @@ -56,7 +53,8 @@ def get_long_format_score_df(df_score: pd.DataFrame) -> pd.DataFrame:
class AnomalyDetector:
anomaly_type: str

def get_text(self, entity: str, year: int) -> str:
@staticmethod
def get_text(entity: str, year: int) -> str:
return f"Anomaly happened in {entity} in {year}!"

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
Expand Down Expand Up @@ -133,6 +131,10 @@ class AnomalyUpgradeMissing(AnomalyDetector):

anomaly_type = "upgrade_missing"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are missing values for {entity}! There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_lost = self.get_zeros_df(df, variable_ids)
Expand All @@ -150,6 +152,10 @@ class AnomalyUpgradeChange(AnomalyDetector):

anomaly_type = "upgrade_change"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are abrupt changes for {entity} in {year}! There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_version_change = self.get_zeros_df(df, variable_ids)
Expand All @@ -170,6 +176,10 @@ class AnomalyTimeChange(AnomalyDetector):

anomaly_type = "time_change"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are significant changes for {entity} in {year} compared to the old version of the indicator. There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_time_change = self.get_zeros_df(df, variable_ids)
Expand Down
11 changes: 10 additions & 1 deletion apps/anomalist/gp_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ def _processing_queue(items: list[tuple[str, int]]) -> List[tuple]:
class AnomalyGaussianProcessOutlier(AnomalyDetector):
anomaly_type = "gp_outlier"

def __init__(self, max_time: Optional[float] = None, n_jobs: int = 1):
# TODO: max_time is hard-coded to 100, but it should be configurable in production
def __init__(self, max_time: Optional[float] = 100, n_jobs: int = 1):
self.max_time = max_time
self.n_jobs = n_jobs

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are some outliers for {entity}! These were detected using Gaussian processes. There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Convert to long format
df_wide = (
Expand Down Expand Up @@ -126,6 +131,10 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi
log.info("Finished processing", elapsed=round(time.time() - start_time, 2))

df_score_long = pd.concat(results).reset_index()

# Normalize the anomaly scores by mapping interval (0, 3+) to (0, 1)
df_score_long["anomaly_score"] = np.minimum(df_score_long["anomaly_score"] / 3, 1)

return df_score_long

@staticmethod
Expand Down
24 changes: 7 additions & 17 deletions apps/wizard/app_pages/anomalist/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pandas as pd
import streamlit as st

from apps.anomalist.anomalist_api import anomaly_detection
from apps.anomalist.anomalist_api import anomaly_detection, load_detector
from apps.utils.gpt import OpenAIWrapper, get_cost_and_tokens, get_number_tokens
from apps.wizard.app_pages.anomalist.utils import (
AnomalyTypeEnum,
Expand Down Expand Up @@ -72,8 +72,7 @@
},
}
ANOMALY_TYPE_NAMES = {k: v["tag_name"] for k, v in ANOMALY_TYPES.items()}
# TODO: Remove the `t != AnomalyTypeEnum.GP_OUTLIER.value` bit to also query for GP outliers.
ANOMALY_TYPES_TO_DETECT = tuple(t for t in ANOMALY_TYPES.keys() if t != AnomalyTypeEnum.GP_OUTLIER.value)
ANOMALY_TYPES_TO_DETECT = tuple(ANOMALY_TYPES.keys())

# GPT
MODEL_NAME = "gpt-4o"
Expand Down Expand Up @@ -346,16 +345,7 @@ def show_anomaly_compact(index, df):
indicator_uri = st.session_state.anomalist_indicators.get(indicator_id)

# Generate descriptive text. Only contains information about top-scoring entity.
if an_type == AnomalyTypeEnum.TIME_CHANGE.value:
text = f"There are significant changes for {entity_default} in {year_default} compared to the old version of the indicator. There might be other data points affected."
elif an_type == AnomalyTypeEnum.UPGRADE_CHANGE.value:
text = f"There are abrupt changes for {entity_default} in {year_default}! There might be other data points affected."
elif an_type == AnomalyTypeEnum.UPGRADE_MISSING.value:
text = f"There are missing values for {entity_default}! There might be other data points affected."
elif an_type == AnomalyTypeEnum.GP_OUTLIER.value:
text = f"There are some outliers for {entity_default}! These were detected using Gaussian processes. There might be other data points affected."
else:
raise ValueError(f"Unknown anomaly type: {an_type}")
text = load_detector(an_type).get_text(entity_default, year_default)

# Render
with st.container(border=True):
Expand Down Expand Up @@ -674,13 +664,13 @@ def _change_chart_selection(df, key_table, key_selection):
pagination_key="pagination-demo",
)

# Show controls only if needed
if len(items) > items_per_page:
pagination.show_controls(mode="bar")

# Show items (only current page)
for item in pagination.get_page_items():
show_anomaly_compact(item[0], item[1])

# Show controls only if needed
if len(items) > items_per_page:
pagination.show_controls(mode="bar")

# Reset state
set_states({"anomalist_datasets_submitted": False})
9 changes: 8 additions & 1 deletion etl/grapher_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import Any, Dict, List, Literal, Optional, Union, get_args, overload

import humps
import numpy as np
import pandas as pd
import requests
import structlog
Expand Down Expand Up @@ -1274,7 +1275,7 @@ def from_id_or_path(
# Multiple path or id
elif isinstance(id_or_path, list):
# Filter the list to ensure only integers are passed
int_ids = [i for i in id_or_path if isinstance(i, int)]
int_ids = [i for i in id_or_path if isinstance(i, (int, np.integer))]
str_ids = [i for i in id_or_path if isinstance(i, str)]
# Multiple IDs
if len(int_ids) == len(id_or_path):
Expand Down Expand Up @@ -1767,6 +1768,12 @@ class Anomaly(Base):
# year: Mapped[int] = mapped_column(Integer)
# rawScore: Mapped[float] = mapped_column(Float)

def __repr__(self) -> str:
return (
f"<Anomaly(id={self.id}, createdAt={self.createdAt}, updatedAt={self.updatedAt}, "
f"datasetId={self.datasetId}, anomalyType={self.anomalyType})>"
)

@hybrid_property
def dfScore(self) -> Optional[pd.DataFrame]: # type: ignore
if self._dfScore is None:
Expand Down

0 comments on commit 9e1caac

Please sign in to comment.