diff --git a/apps/anomalist/anomalist_api.py b/apps/anomalist/anomalist_api.py index 39b4966667e..fe59062401c 100644 --- a/apps/anomalist/anomalist_api.py +++ b/apps/anomalist/anomalist_api.py @@ -7,6 +7,7 @@ import structlog from owid.catalog import find from sqlalchemy.engine import Engine +from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import Session from apps.anomalist.detectors import ( @@ -211,13 +212,14 @@ def anomaly_detection( variable_mapping: Optional[dict[int, int]] = None, variable_ids: Optional[list[int]] = None, dry_run: bool = False, + force: bool = False, reset_db: bool = False, ) -> None: """Detect anomalies.""" engine = get_engine() # Ensure the 'anomalies' table exists. Optionally reset it if reset_db is True. - gm.Anomaly.create_table(engine, reset=reset_db) + gm.Anomaly.create_table(engine, if_exists="replace" if reset_db else "skip") # If no anomaly types are provided, default to all available types if not anomaly_types: @@ -254,6 +256,10 @@ def anomaly_detection( dataset_variable_ids[variable.datasetId].append(variable) for dataset_id, variables_in_dataset in dataset_variable_ids.items(): + # Get dataset's checksum + with Session(engine) as session: + dataset = gm.Dataset.load_dataset(session, dataset_id) + log.info("Loading data from S3") variables_old = [ variables[variable_id_old] @@ -268,6 +274,11 @@ def anomaly_detection( if anomaly_type not in ANOMALY_DETECTORS: raise ValueError(f"Unsupported anomaly type: {anomaly_type}") + if not force: + if not needs_update(engine, dataset, anomaly_type): + log.info(f"Anomaly type {anomaly_type} for dataset {dataset_id} already exists in the database.") + continue + log.info(f"Detecting anomaly type {anomaly_type} for dataset {dataset_id}") # Instantiate the anomaly detector. @@ -289,12 +300,17 @@ def anomaly_detection( variable_mapping=variable_mapping_for_current_dataset, ) + if df_score.empty: + log.info("No anomalies detected.`") + continue + # Create a long format score dataframe. df_score_long = get_long_format_score_df(df_score) # TODO: validate format of the output dataframe anomaly = gm.Anomaly( datasetId=dataset_id, + datasetSourceChecksum=dataset.sourceChecksum, anomalyType=anomaly_type, ) anomaly.dfScore = df_score_long @@ -338,6 +354,22 @@ def anomaly_detection( session.commit() +def needs_update(engine: Engine, dataset: gm.Dataset, anomaly_type: str) -> bool: + """If there's an anomaly with the dataset checksum in DB, it doesn't need + to be updated.""" + with Session(engine) as session: + try: + anomaly = gm.Anomaly.load( + session, + dataset_id=dataset.id, + anomaly_type=anomaly_type, + ) + except NoResultFound: + return True + + return anomaly.datasetSourceChecksum != dataset.sourceChecksum + + def export_anomalies_file(df: pd.DataFrame, dataset_id: int, anomaly_type: str) -> str: """Export anomaly df to local file (and upload to staging server if applicable).""" filename = f"{dataset_id}_{anomaly_type}.feather" @@ -372,8 +404,11 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd. # reorder in the same order as variables df = df[[v.id for v in variables]] - # try converting to numeric - df = df.astype(float) + # TODO: how should we treat non-numeric variables? We can exclude it here, but then we need to + # fix it in detectors + # HACK: set non-numeric variables to zero + df = df.apply(pd.to_numeric, errors="coerce") + df = df.fillna(0) # TODO: # remove countries with all nulls or all zeros or constant values @@ -381,7 +416,7 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd. df = df.reset_index().astype({"entity_name": str}) - return df + return df # type: ignore # @memory.cache diff --git a/apps/anomalist/cli.py b/apps/anomalist/cli.py index 5243c169eb6..8c3610b8497 100644 --- a/apps/anomalist/cli.py +++ b/apps/anomalist/cli.py @@ -49,6 +49,12 @@ type=bool, help="Do not write to target database.", ) +@click.option( + "--force", + "-f", + is_flag=True, + help="TBD", +) @click.option( "--reset-db/--no-reset-db", default=False, @@ -61,6 +67,7 @@ def cli( variable_mapping: str, variable_ids: Optional[list[int]], dry_run: bool, + force: bool, reset_db: bool, ) -> None: """TBD @@ -111,6 +118,7 @@ def cli( variable_mapping=variable_mapping_dict, variable_ids=list(variable_ids) if variable_ids else None, dry_run=dry_run, + force=force, reset_db=reset_db, ) diff --git a/apps/anomalist/gp_detector.py b/apps/anomalist/gp_detector.py index 6bd2950b1b6..bde06986f83 100644 --- a/apps/anomalist/gp_detector.py +++ b/apps/anomalist/gp_detector.py @@ -65,8 +65,8 @@ def _processing_queue(items: list[tuple[str, int]]) -> List[tuple]: class AnomalyGaussianProcessOutlier(AnomalyDetector): anomaly_type = "gp_outlier" - # TODO: max_time is hard-coded to 100, but it should be configurable in production - def __init__(self, max_time: Optional[float] = 100, n_jobs: int = 1): + # TODO: max_time is hard-coded to 10, but it should be configurable in production + def __init__(self, max_time: Optional[float] = 10, n_jobs: int = 1): self.max_time = max_time self.n_jobs = n_jobs @@ -100,7 +100,7 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi group = df_wide.loc[(entity_name, variable_id)] # Skip if the series has only one or fewer data points - if len(group) <= 1: + if isinstance(group, pd.Series) or len(group) <= 1: continue # Prepare the input features (X) and target values (y) for Gaussian Process @@ -130,6 +130,9 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi log.info("Finished processing", elapsed=round(time.time() - start_time, 2)) + if not results: + return pd.DataFrame() + df_score_long = pd.concat(results).reset_index() # Normalize the anomaly scores by mapping interval (0, 3+) to (0, 1) diff --git a/apps/owidbot/anomalist.py b/apps/owidbot/anomalist.py new file mode 100644 index 00000000000..dab84205269 --- /dev/null +++ b/apps/owidbot/anomalist.py @@ -0,0 +1,61 @@ +from structlog import get_logger + +from apps.anomalist.anomalist_api import anomaly_detection +from apps.wizard.app_pages.anomalist.utils import load_variable_mapping +from etl import grapher_model as gm +from etl.config import OWIDEnv +from etl.db import Engine, read_sql + +from .chart_diff import production_or_master_engine + +log = get_logger() + + +def run(branch: str) -> None: + """Compute all anomalist for new and updated datasets.""" + # Get engines for branch and production + source_engine = OWIDEnv.from_staging(branch).get_engine() + target_engine = production_or_master_engine() + + # Create table with anomalist if it doesn't exist + gm.Anomaly.create_table(source_engine, if_exists="skip") + + # Load new dataset ids + datasets_new_ids = _load_datasets_new_ids(source_engine, target_engine) + + if not datasets_new_ids: + log.info("No new datasets found.") + return + + log.info(f"New datasets: {datasets_new_ids}") + + # Load all their variables + q = """SELECT id FROM variables WHERE datasetId IN %(dataset_ids)s""" + variable_ids = list(read_sql(q, source_engine, params={"dataset_ids": datasets_new_ids})["id"]) + + # Load variable mapping + variable_mapping_dict = load_variable_mapping(datasets_new_ids) + + # Run anomalist + anomaly_detection( + variable_mapping=variable_mapping_dict, + variable_ids=variable_ids, + ) + + +def _load_datasets_new_ids(source_engine: Engine, target_engine: Engine) -> list[int]: + # Get new datasets + # TODO: replace by real catalogPath when we have it in MySQL + q = """SELECT + id, + CONCAT(namespace, "/", version, "/", shortName) as catalogPath + FROM datasets + """ + source_datasets = read_sql(q, source_engine) + target_datasets = read_sql(q, target_engine) + + return list( + source_datasets[ + source_datasets.catalogPath.isin(set(source_datasets["catalogPath"]) - set(target_datasets["catalogPath"])) + ]["id"] + ) diff --git a/apps/owidbot/chart_diff.py b/apps/owidbot/chart_diff.py index 660e972b9bd..944dffb5fad 100644 --- a/apps/owidbot/chart_diff.py +++ b/apps/owidbot/chart_diff.py @@ -3,6 +3,7 @@ from apps.wizard.app_pages.chart_diff.chart_diff import ChartDiffsLoader from etl.config import OWID_ENV, OWIDEnv, get_container_name +from etl.db import Engine from . import github_utils as gh_utils @@ -66,14 +67,18 @@ def run(branch: str, charts_df: pd.DataFrame) -> str: return body -def call_chart_diff(branch: str) -> pd.DataFrame: - source_engine = OWIDEnv.from_staging(branch).get_engine() - +def production_or_master_engine() -> Engine: + """Return the production engine if available, otherwise connect to staging-site-master.""" if OWID_ENV.env_remote == "production": - target_engine = OWID_ENV.get_engine() + return OWID_ENV.get_engine() else: log.warning("ENV file doesn't connect to production DB, comparing against staging-site-master") - target_engine = OWIDEnv.from_staging("master").get_engine() + return OWIDEnv.from_staging("master").get_engine() + + +def call_chart_diff(branch: str) -> pd.DataFrame: + source_engine = OWIDEnv.from_staging(branch).get_engine() + target_engine = production_or_master_engine() df = ChartDiffsLoader(source_engine, target_engine).get_diffs_summary_df( config=True, diff --git a/apps/owidbot/cli.py b/apps/owidbot/cli.py index 498d5533e2a..0e3e8ba0e3e 100644 --- a/apps/owidbot/cli.py +++ b/apps/owidbot/cli.py @@ -8,7 +8,7 @@ from rich import print from rich_click.rich_command import RichCommand -from apps.owidbot import chart_diff, data_diff, grapher +from apps.owidbot import anomalist, chart_diff, data_diff, grapher from etl.config import get_container_name from . import github_utils as gh_utils @@ -16,7 +16,7 @@ log = structlog.get_logger() REPOS = Literal["etl", "owid-grapher"] -SERVICES = Literal["data-diff", "chart-diff", "grapher"] +SERVICES = Literal["data-diff", "chart-diff", "grapher", "anomalist"] @click.command("owidbot", cls=RichCommand, help=__doc__) @@ -76,6 +76,11 @@ def cli( elif service == "grapher": services_body["grapher"] = grapher.run(branch) + + elif service == "anomalist": + # TODO: anomalist could post a summary of anomalies to the PR + anomalist.run(branch) + else: raise AssertionError("Invalid service") diff --git a/apps/wizard/app_pages/anomalist/utils.py b/apps/wizard/app_pages/anomalist/utils.py index ae25dd2506c..7fddd684de2 100644 --- a/apps/wizard/app_pages/anomalist/utils.py +++ b/apps/wizard/app_pages/anomalist/utils.py @@ -1,6 +1,6 @@ """Utils for chart revision tool.""" from enum import Enum -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple import pandas as pd import streamlit as st @@ -54,6 +54,21 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D datasets_new_ids = list(dataset_new_and_old) # Load mapping created by indicator upgrader (if any). + variable_mapping = load_variable_mapping(datasets_new_ids, dataset_new_and_old) + + # For convenience, create a dataset name "[id] Name". + df_datasets["id_name"] = "[" + df_datasets["id"].astype(str) + "] " + df_datasets["name"] + # List all grapher datasets. + datasets_all = df_datasets[["id", "id_name"]].set_index("id").squeeze().to_dict() + # List new datasets. + datasets_new = {k: v for k, v in datasets_all.items() if k in datasets_new_ids} + + return datasets_all, datasets_new, variable_mapping # type: ignore + + +def load_variable_mapping( + datasets_new_ids: List[int], dataset_new_and_old: Optional[Dict[int, Optional[int]]] = None +) -> Dict[int, int]: mapping = WizardDB.get_variable_mapping_raw() if len(mapping) > 0: log.info("Using variable mapping created by indicator upgrader.") @@ -68,7 +83,7 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D ) # Create a mapping dictionary. variable_mapping = mapping.set_index("id_old")["id_new"].to_dict() - else: + elif dataset_new_and_old: log.info("Inferring variable mapping (since no mapping was created by indicator upgrader).") # Infer the mapping of the new datasets (assuming no names have changed). variable_mapping = dict() @@ -77,16 +92,11 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D continue # Infer variable_mapping.update(infer_variable_mapping(dataset_id_new, dataset_id_old)) + else: + # No mapping available. + variable_mapping = dict() - # For convenience, create a dataset name "[id] Name". - df_datasets["id_name"] = "[" + df_datasets["id"].astype(str) + "] " + df_datasets["name"] - # List all grapher datasets. - datasets_all = df_datasets[["id", "id_name"]].set_index("id").squeeze().to_dict() - - # List new datasets. - datasets_new = {k: v for k, v in datasets_all.items() if k in datasets_new_ids} - - return datasets_all, datasets_new, variable_mapping # type: ignore + return variable_mapping # type: ignore def create_tables(_owid_env: OWIDEnv = OWID_ENV): @@ -94,7 +104,7 @@ def create_tables(_owid_env: OWIDEnv = OWID_ENV): If exist, nothing is created. """ - gm.Anomaly.create_table(_owid_env.engine) + gm.Anomaly.create_table(_owid_env.engine, if_exists="skip") @st.cache_data(show_spinner=False) diff --git a/etl/grapher_model.py b/etl/grapher_model.py index 0b7469f1a6b..e256e0bd5f9 100644 --- a/etl/grapher_model.py +++ b/etl/grapher_model.py @@ -112,13 +112,19 @@ def from_dict(cls, d: Dict[str, Any]) -> Self: return x @classmethod - def create_table(cls, engine: Engine, reset: bool = False) -> None: - if reset: - # Drop the table if it exists + def create_table(cls, engine: Engine, if_exists: Literal["fail", "replace", "skip"] = "fail") -> None: + if if_exists == "replace": + # Drop the table if it exists and create a new one cls.__table__.drop(engine, checkfirst=True) # type: ignore - - # Create table - cls.__table__.create(engine, checkfirst=True) # type: ignore + cls.__table__.create(engine, checkfirst=False) # type: ignore + elif if_exists == "skip": + # Create the table only if it doesn't already exist + cls.__table__.create(engine, checkfirst=True) # type: ignore + elif if_exists == "fail": + # Attempt to create the table; fail if it already exists + cls.__table__.create(engine, checkfirst=False) # type: ignore + else: + raise ValueError(f"Unrecognized value for if_exists: {if_exists}") class Entity(Base): @@ -1759,6 +1765,7 @@ class Anomaly(Base): DateTime, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"), init=False ) datasetId: Mapped[int] = mapped_column(Integer) + datasetSourceChecksum: Mapped[Optional[str]] = mapped_column(VARCHAR(64), default=None) anomalyType: Mapped[str] = mapped_column(VARCHAR(255), default=str) path_file: Mapped[Optional[str]] = mapped_column(VARCHAR(255), default=None) _dfScore: Mapped[Optional[bytes]] = mapped_column("dfScore", LONGBLOB, default=None) @@ -1779,6 +1786,10 @@ def __repr__(self) -> str: f"datasetId={self.datasetId}, anomalyType={self.anomalyType})>" ) + @classmethod + def load(cls, session: Session, dataset_id: int, anomaly_type: str) -> "Anomaly": + return session.scalars(select(cls).where(cls.datasetId == dataset_id, cls.anomalyType == anomaly_type)).one() + @hybrid_property def dfScore(self) -> Optional[pd.DataFrame]: # type: ignore if self._dfScore is None: