Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix anomalist bugs #3427

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion apps/anomalist/anomalist_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sqlalchemy.orm import Session

from apps.anomalist.detectors import (
AnomalyDetector,
AnomalyTimeChange,
AnomalyUpgradeChange,
AnomalyUpgradeMissing,
Expand Down Expand Up @@ -45,6 +46,12 @@

########################################################################################################################


def load_detector(anomaly_type: ANOMALY_TYPE) -> AnomalyDetector:
"""Load detector."""
return ANOMALY_DETECTORS[anomaly_type]


# AGGREGATE ANOMALIES:


Expand Down Expand Up @@ -374,7 +381,15 @@ def _load_variables_meta(engine: Engine, variable_ids: list[int]) -> list[gm.Var
def combine_and_reduce_scores_df(anomalies: List[gm.Anomaly]) -> pd.DataFrame:
"""Get the combined dataframe with scores for all anomalies, and reduce it to include only the largest anomaly for each contry-indicator."""
# Combine the reduced dataframes for all anomalies into a single dataframe.
dfs = [cast(pd.DataFrame, anomaly.dfReduced).assign(**{"type": anomaly.anomalyType}) for anomaly in anomalies]
dfs = []
for anomaly in anomalies:
df = anomaly.dfReduced
if df is None:
log.warning(f"Anomaly {anomaly} has no reduced dataframe.")
continue
df["type"] = anomaly.anomalyType
dfs.append(df)

df_reduced = cast(pd.DataFrame, pd.concat(dfs, ignore_index=True))
# Dtypes
# df = df.astype({"year": int})
Expand Down
20 changes: 15 additions & 5 deletions apps/anomalist/detectors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Literal, Optional
from typing import Dict, List, Optional

import numpy as np
import pandas as pd
Expand All @@ -14,9 +14,6 @@
# Name of index columns for dataframe.
INDEX_COLUMNS = ["entity_name", "year"]

# Define anomaly types.
ANOMALY_TYPE = Literal["upgrade_change", "time_change", "upgrade_missing"]


def estimate_bard_epsilon(series: pd.Series) -> float:
# Make all values positive, and ignore zeros.
Expand Down Expand Up @@ -56,7 +53,8 @@ def get_long_format_score_df(df_score: pd.DataFrame) -> pd.DataFrame:
class AnomalyDetector:
anomaly_type: str

def get_text(self, entity: str, year: int) -> str:
@staticmethod
def get_text(entity: str, year: int) -> str:
return f"Anomaly happened in {entity} in {year}!"

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
Expand Down Expand Up @@ -133,6 +131,10 @@ class AnomalyUpgradeMissing(AnomalyDetector):

anomaly_type = "upgrade_missing"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are missing values for {entity}! There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_lost = self.get_zeros_df(df, variable_ids)
Expand All @@ -150,6 +152,10 @@ class AnomalyUpgradeChange(AnomalyDetector):

anomaly_type = "upgrade_change"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are abrupt changes for {entity} in {year}! There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_version_change = self.get_zeros_df(df, variable_ids)
Expand All @@ -170,6 +176,10 @@ class AnomalyTimeChange(AnomalyDetector):

anomaly_type = "time_change"

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are significant changes for {entity} in {year} compared to the old version of the indicator. There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_time_change = self.get_zeros_df(df, variable_ids)
Expand Down
11 changes: 10 additions & 1 deletion apps/anomalist/gp_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ def _processing_queue(items: list[tuple[str, int]]) -> List[tuple]:
class AnomalyGaussianProcessOutlier(AnomalyDetector):
anomaly_type = "gp_outlier"

def __init__(self, max_time: Optional[float] = None, n_jobs: int = 1):
# TODO: max_time is hard-coded to 100, but it should be configurable in production
def __init__(self, max_time: Optional[float] = 100, n_jobs: int = 1):
self.max_time = max_time
self.n_jobs = n_jobs

@staticmethod
def get_text(entity: str, year: int) -> str:
return f"There are some outliers for {entity}! These were detected using Gaussian processes. There might be other data points affected."

def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# Convert to long format
df_wide = (
Expand Down Expand Up @@ -126,6 +131,10 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi
log.info("Finished processing", elapsed=round(time.time() - start_time, 2))

df_score_long = pd.concat(results).reset_index()

# Normalize the anomaly scores by mapping interval (0, 3+) to (0, 1)
df_score_long["anomaly_score"] = np.minimum(df_score_long["anomaly_score"] / 3, 1)

return df_score_long

@staticmethod
Expand Down
16 changes: 3 additions & 13 deletions apps/wizard/app_pages/anomalist/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pandas as pd
import streamlit as st

from apps.anomalist.anomalist_api import anomaly_detection
from apps.anomalist.anomalist_api import anomaly_detection, load_detector
from apps.utils.gpt import OpenAIWrapper, get_cost_and_tokens, get_number_tokens
from apps.wizard.app_pages.anomalist.utils import (
AnomalyTypeEnum,
Expand Down Expand Up @@ -72,8 +72,7 @@
},
}
ANOMALY_TYPE_NAMES = {k: v["tag_name"] for k, v in ANOMALY_TYPES.items()}
# TODO: Remove the `t != AnomalyTypeEnum.GP_OUTLIER.value` bit to also query for GP outliers.
ANOMALY_TYPES_TO_DETECT = tuple(t for t in ANOMALY_TYPES.keys() if t != AnomalyTypeEnum.GP_OUTLIER.value)
ANOMALY_TYPES_TO_DETECT = tuple(ANOMALY_TYPES.keys())

# GPT
MODEL_NAME = "gpt-4o"
Expand Down Expand Up @@ -346,16 +345,7 @@ def show_anomaly_compact(index, df):
indicator_uri = st.session_state.anomalist_indicators.get(indicator_id)

# Generate descriptive text. Only contains information about top-scoring entity.
if an_type == AnomalyTypeEnum.TIME_CHANGE.value:
text = f"There are significant changes for {entity_default} in {year_default} compared to the old version of the indicator. There might be other data points affected."
elif an_type == AnomalyTypeEnum.UPGRADE_CHANGE.value:
text = f"There are abrupt changes for {entity_default} in {year_default}! There might be other data points affected."
elif an_type == AnomalyTypeEnum.UPGRADE_MISSING.value:
text = f"There are missing values for {entity_default}! There might be other data points affected."
elif an_type == AnomalyTypeEnum.GP_OUTLIER.value:
text = f"There are some outliers for {entity_default}! These were detected using Gaussian processes. There might be other data points affected."
else:
raise ValueError(f"Unknown anomaly type: {an_type}")
text = load_detector(an_type).get_text(entity_default, year_default)

# Render
with st.container(border=True):
Expand Down
9 changes: 8 additions & 1 deletion etl/grapher_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import Any, Dict, List, Literal, Optional, Union, get_args, overload

import humps
import numpy as np
import pandas as pd
import requests
import structlog
Expand Down Expand Up @@ -1274,7 +1275,7 @@ def from_id_or_path(
# Multiple path or id
elif isinstance(id_or_path, list):
# Filter the list to ensure only integers are passed
int_ids = [i for i in id_or_path if isinstance(i, int)]
int_ids = [i for i in id_or_path if isinstance(i, (int, np.integer))]
str_ids = [i for i in id_or_path if isinstance(i, str)]
# Multiple IDs
if len(int_ids) == len(id_or_path):
Expand Down Expand Up @@ -1767,6 +1768,12 @@ class Anomaly(Base):
# year: Mapped[int] = mapped_column(Integer)
# rawScore: Mapped[float] = mapped_column(Float)

def __repr__(self) -> str:
return (
f"<Anomaly(id={self.id}, createdAt={self.createdAt}, updatedAt={self.updatedAt}, "
f"datasetId={self.datasetId}, anomalyType={self.anomalyType})>"
)

@hybrid_property
def dfScore(self) -> Optional[pd.DataFrame]: # type: ignore
if self._dfScore is None:
Expand Down
Loading