diff --git a/apps/anomalist/cli.py b/apps/anomalist/cli.py index 1bef4b293e4..48aa1e847a0 100644 --- a/apps/anomalist/cli.py +++ b/apps/anomalist/cli.py @@ -1,4 +1,5 @@ -from typing import Literal, Optional, get_args +import json +from typing import Dict, Literal, Optional, Tuple, get_args import click import pandas as pd @@ -8,14 +9,13 @@ from sqlalchemy.engine import Engine from sqlalchemy.orm import Session +from apps.anomalist.bard_anomaly import NaNAnomalyDetector +from apps.anomalist.gp_anomaly import GPAnomalyDetector, SampleAnomalyDetector from etl import grapher_model as gm from etl.db import get_engine, read_sql from etl.grapher_io import variable_data_df_from_s3 from etl.paths import CACHE_DIR -from .bard_anomaly import NaNAnomalyDetector -from .gp_anomaly import GPAnomalyDetector, SampleAnomalyDetector - log = structlog.get_logger() memory = Memory(CACHE_DIR, verbose=0) @@ -25,26 +25,31 @@ @click.command(name="anomalist", cls=RichCommand, help=__doc__) @click.option( - "--type", + "--anomaly-types", type=click.Choice(get_args(ANOMALY_TYPE)), multiple=True, - help="Type of anomaly detection algorithm to use.", + default=None, + help="Type (or types) of anomaly detection algorithm to use.", ) @click.option( - "--dataset-id", + "--dataset-ids", type=int, - help="Generate anomalies for a specific dataset ID.", + multiple=True, + default=None, + help="Generate anomalies for the variables of a specific dataset ID (or multiple dataset IDs).", ) @click.option( - "--previous-dataset-id", - type=int, - help="Dataset ID of the previous version.", + "--variable-mapping", + type=str, + default=None, + help="Optional JSON dictionary mapping variable IDs from a previous to a new version (where at least some of the new variable IDs must belong to the datasets whose IDs were given).", ) @click.option( - "--variable-id", + "--variable-ids", type=int, multiple=True, - help="Generate anomalies for a list of variable IDs.", + default=None, + help="Generate anomalies for a list of variable IDs (in addition to the ones from dataset ID, if any dataset was given).", ) @click.option( "--dry-run/--no-dry-run", @@ -59,10 +64,10 @@ help="Drop anomalies table and recreate it. This is useful for development when the schema changes.", ) def cli( - type: Optional[ANOMALY_TYPE], - dataset_id: Optional[int], - previous_dataset_id: Optional[int], - variable_id: Optional[list[int]], + anomaly_types: Optional[Tuple[str, ...]], + dataset_ids: Optional[list[int]], + variable_mapping: Optional[str], # type: ignore + variable_ids: Optional[list[int]], dry_run: bool, reset_db: bool, ) -> None: @@ -98,73 +103,92 @@ def cli( gm.Anomaly.__table__.create(engine) # type: ignore return - assert type, "Anomaly type must be specified." - - # load metadata - variables = _load_variables_meta(engine, dataset_id, variable_id) - - # set dataset_id if we're using variables - if not dataset_id: - assert set(v.datasetId for v in variables) == {variables[0].datasetId} - dataset_id = variables[0].datasetId + # If no anomaly types are provided, default to all available types + if not anomaly_types: + anomaly_types = get_args(ANOMALY_TYPE) + + # Parse the variable_mapping if any provided. + if variable_mapping: + try: + variable_mapping: Dict[int, int] = { + int(variable_old): int(variable_new) + for variable_old, variable_new in json.loads(variable_mapping).items() + } + except json.JSONDecodeError: + raise ValueError("Invalid JSON format for variable_mapping.") + else: + variable_mapping = dict() + + # Load metadata for all variables in dataset_ids (if any given) and variable_ids, and new variables in variable_mapping. + variable_ids_all = (list(variable_mapping.values()) if variable_mapping else []) + ( + list(variable_ids) if variable_ids else [] + ) + if dataset_ids is None: + dataset_ids = [] + variables = _load_variables_meta(engine, dataset_ids, variable_ids_all) + + # Create a dictionary of all variable_ids for each dataset_id. + dataset_variable_ids = {} + for variable in variables: + if variable.datasetId not in dataset_variable_ids: + dataset_variable_ids[variable.datasetId] = [] + dataset_variable_ids[variable.datasetId].append(variable) log.info("Detecting anomalies") anomalies = [] - for typ in type: - if typ == "gp": - detector = GPAnomalyDetector() - elif typ == "sample": - detector = SampleAnomalyDetector() - elif typ == "nan": - detector = NaNAnomalyDetector() - else: - raise ValueError(f"Unsupported anomaly type: {typ}") - - # dataframe with (entityName, year) as index and variableId as columns - log.info("Loading data from S3") - df = load_data_for_variables(engine, variables) - - # detect anomalies - log.info("Detecting anomalies") - # the output has the same shape as the input dataframe, but we should make - # it possible to return anomalies in a long format (for detectors that return - # just a few anomalies) - df_score = detector.get_score_df(df, variables) - - # validate format of the output dataframe - # TODO - - anomaly = gm.Anomaly( - datasetId=dataset_id, - anomalyType=detector.anomaly_type, - ) - anomaly.dfScore = df_score - - anomalies.append(anomaly) - - if dry_run: - for anomaly in anomalies: - log.info(anomaly) - else: - with Session(engine) as session: - log.info("Deleting existing anomalies") - session.query(gm.Anomaly).filter( - gm.Anomaly.datasetId == dataset_id, - gm.Anomaly.anomalyType.in_([a.anomalyType for a in anomalies]), - ).delete(synchronize_session=False) - session.commit() - - # Insert new anomalies - log.info("Writing anomalies to database") - session.add_all(anomalies) - session.commit() + for dataset_id, variables_in_dataset in dataset_variable_ids.items(): + for anomaly_type in anomaly_types: + if anomaly_type == "gp": + detector = GPAnomalyDetector() + elif anomaly_type == "sample": + detector = SampleAnomalyDetector() + elif anomaly_type == "nan": + detector = NaNAnomalyDetector() + else: + raise ValueError(f"Unsupported anomaly type: {anomaly_type}") + + # dataframe with (entityName, year) as index and variableId as columns + log.info("Loading data from S3") + df = load_data_for_variables(engine, variables_in_dataset) + + # TODO: If any of the variables are in variable_mapping, load df_old as well. + + # detect anomalies + log.info("Detecting anomalies") + # the output has the same shape as the input dataframe, but we should make + # it possible to return anomalies in a long format (for detectors that return + # just a few anomalies) + df_score = detector.get_score_df(df, variables_in_dataset) + + # TODO: validate format of the output dataframe + + anomaly = gm.Anomaly( + datasetId=dataset_id, + anomalyType=detector.anomaly_type, + ) + anomaly.dfScore = df_score + + if not dry_run: + with Session(engine) as session: + # TODO: Is this right? I suppose it should also delete if already existing. + log.info("Deleting existing anomalies") + session.query(gm.Anomaly).filter( + gm.Anomaly.datasetId == dataset_id, + gm.Anomaly.anomalyType.in_([a.anomalyType for a in anomalies]), + ).delete(synchronize_session=False) + session.commit() + + # Insert new anomalies + log.info("Writing anomalies to database") + session.add_all(anomalies) + session.commit() # @memory.cache def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.DataFrame: # TODO: cache this on disk & re-validate with etags - df_long = variable_data_df_from_s3(engine, [v.id for v in variables]) + df_long = variable_data_df_from_s3(engine, [v.id for v in variables], workers=None) # pivot dataframe df = df_long.pivot(index=["entityName", "year"], columns="variableId", values="value") @@ -184,23 +208,31 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd. @memory.cache def _load_variables_meta( - engine: Engine, dataset_id: Optional[int], variable_ids: Optional[list[int]] + engine: Engine, dataset_ids: Optional[list[int]], variable_ids: Optional[list[int]] ) -> list[gm.Variable]: - if dataset_id and variable_ids: - raise ValueError("Cannot specify both dataset ID and variable IDs.") - - if variable_ids: + if dataset_ids: q = """ select id from variables - where id in %(variable_ids)s + where datasetId in %(dataset_ids)s """ - elif dataset_id: + df_from_dataset_ids = read_sql(q, engine, params={"dataset_ids": dataset_ids}) + else: + df_from_dataset_ids = pd.DataFrame() + + if variable_ids: q = """ select id from variables - where datasetId = %(dataset_id)s + where id in %(variable_ids)s """ - # load all variables from a random dataset + df_from_variable_ids = read_sql(q, engine, params={"variable_ids": variable_ids}) else: + df_from_variable_ids = pd.DataFrame() + + # Combine both dataframes to get all possible variables required. + df = pd.concat([df_from_dataset_ids, df_from_variable_ids]).drop_duplicates() + + # load all variables from a random dataset + if df.empty: q = """ with t as ( select id from datasets order by rand() limit 1 @@ -208,8 +240,7 @@ def _load_variables_meta( select id from variables where datasetId in (select id from t) """ - - df = read_sql(q, engine, params={"variable_ids": variable_ids, "dataset_id": dataset_id}) + df = read_sql(q, engine) # select all variables using SQLAlchemy with Session(engine) as session: diff --git a/etl/grapher_io.py b/etl/grapher_io.py index 6281f1d156e..2ad64a884fe 100644 --- a/etl/grapher_io.py +++ b/etl/grapher_io.py @@ -264,7 +264,7 @@ def _ensure_variable_ids( def variable_data_df_from_s3( engine: Engine, variable_ids: List[int] = [], - workers: int = 1, + workers: Optional[int] = 1, value_as_str: bool = True, ) -> pd.DataFrame: """Fetch data from S3 and add entity code and name from DB."""