diff --git a/apps/anomalist/anomalist_api.py b/apps/anomalist/anomalist_api.py index 47194c29015..82f545a69d6 100644 --- a/apps/anomalist/anomalist_api.py +++ b/apps/anomalist/anomalist_api.py @@ -24,7 +24,7 @@ from etl.config import OWID_ENV from etl.db import get_engine, read_sql from etl.files import create_folder, upload_file_to_server -from etl.grapher_io import variable_data_df_from_s3 +from etl.grapher_io import variable_data_df_from_catalog log = structlog.get_logger() @@ -261,7 +261,7 @@ def anomaly_detection( with Session(engine) as session: dataset = gm.Dataset.load_dataset(session, dataset_id) - log.info("loading_data_from_s3.start") + log.info("loading_data.start") variables_old = [ variables[variable_id_old] for variable_id_old in variable_mapping.keys() @@ -270,7 +270,7 @@ def anomaly_detection( variables_old_and_new = variables_in_dataset + variables_old t = time.time() df = load_data_for_variables(engine=engine, variables=variables_old_and_new) - log.info("loading_data_from_s3.end", t=time.time() - t) + log.info("loading_data.end", t=time.time() - t) for anomaly_type in anomaly_types: # Instantiate the anomaly detector. @@ -396,15 +396,10 @@ def export_anomalies_file(df: pd.DataFrame, dataset_id: int, anomaly_type: str) return path_str -# @memory.cache def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.DataFrame: - # TODO: cache this on disk & re-validate with etags - df_long = variable_data_df_from_s3(engine, [v.id for v in variables], workers=None) - - df_long = df_long.rename(columns={"variableId": "variable_id", "entityName": "entity_name"}) - - # pivot dataframe - df = df_long.pivot(index=["entity_name", "year"], columns="variable_id", values="value") + # Load data from local catalog + df = variable_data_df_from_catalog(engine, variables=variables) + df = df.rename(columns={"country": "entity_name"}).set_index(["entity_name", "year"]) # reorder in the same order as variables df = df[[v.id for v in variables]] diff --git a/apps/anomalist/cli.py b/apps/anomalist/cli.py index 8c3610b8497..379efb99901 100644 --- a/apps/anomalist/cli.py +++ b/apps/anomalist/cli.py @@ -5,9 +5,10 @@ import structlog from joblib import Memory from rich_click.rich_command import RichCommand +from sqlalchemy.engine import Engine from apps.anomalist.anomalist_api import ANOMALY_TYPE, anomaly_detection -from etl.db import get_engine, read_sql +from etl.db import get_engine, production_or_master_engine, read_sql from etl.paths import CACHE_DIR log = structlog.get_logger() @@ -91,6 +92,12 @@ def cli( ``` $ etl anomalist --anomaly-type gp --dataset-ids 6589 ``` + + **Example 4:** Create anomalies for new datasets + + ``` + $ etl anomalist --anomaly-type gp + ``` """ # Convert variable mapping from JSON to dictionary. if variable_mapping: @@ -104,8 +111,15 @@ def cli( else: variable_mapping_dict = {} - # Load all variables from given datasets - if dataset_ids: + # If no variable IDs are given, load all variables from the given datasets. + if not variable_ids: + assert not dataset_ids, "Cannot specify both dataset IDs and variable IDs." + + # Use new datasets + if not dataset_ids: + dataset_ids = load_datasets_new_ids(get_engine()) + + # Load all variables from given datasets assert not variable_ids, "Cannot specify both dataset IDs and variable IDs." q = """ select id from variables @@ -123,5 +137,26 @@ def cli( ) +def load_datasets_new_ids(source_engine: Engine) -> list[int]: + # Compare against production or staging-site-master + target_engine = production_or_master_engine() + + # Get new datasets + # TODO: replace by real catalogPath when we have it in MySQL + q = """SELECT + id, + CONCAT(namespace, "/", version, "/", shortName) as catalogPath + FROM datasets + """ + source_datasets = read_sql(q, source_engine) + target_datasets = read_sql(q, target_engine) + + return list( + source_datasets[ + source_datasets.catalogPath.isin(set(source_datasets["catalogPath"]) - set(target_datasets["catalogPath"])) + ]["id"] + ) + + if __name__ == "__main__": cli() diff --git a/apps/anomalist/gp_detector.py b/apps/anomalist/gp_detector.py index bec71936713..266446ad9d8 100644 --- a/apps/anomalist/gp_detector.py +++ b/apps/anomalist/gp_detector.py @@ -1,3 +1,4 @@ +import os import random import time import warnings @@ -24,6 +25,11 @@ memory = Memory(CACHE_DIR, verbose=0) +# Maximum time for processing in seconds +ANOMALIST_MAX_TIME = int(os.environ.get("ANOMALIST_MAX_TIME", 10)) +# Number of jobs for parallel processing +ANOMALIST_N_JOBS = int(os.environ.get("ANOMALIST_N_JOBS", 1)) + @memory.cache def _load_population(): @@ -65,8 +71,7 @@ def _processing_queue(items: list[tuple[str, int]]) -> List[tuple]: class AnomalyGaussianProcessOutlier(AnomalyDetector): anomaly_type = "gp_outlier" - # TODO: max_time is hard-coded to 10, but it should be configurable in production - def __init__(self, max_time: Optional[float] = 10, n_jobs: int = 1): + def __init__(self, max_time: Optional[float] = ANOMALIST_MAX_TIME, n_jobs: int = ANOMALIST_N_JOBS): self.max_time = max_time self.n_jobs = n_jobs @@ -76,7 +81,7 @@ def get_text(entity: str, year: int) -> str: def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame: # Convert to long format - df_wide = df.melt(id_vars=["entity_name", "year"]) + df_wide = df.melt(id_vars=["entity_name", "year"], var_name="variable_id") # Filter to only include the specified variable IDs. df_wide = ( df_wide[df_wide["variable_id"].isin(variable_ids)] diff --git a/apps/owidbot/anomalist.py b/apps/owidbot/anomalist.py index dab84205269..f62f17cd606 100644 --- a/apps/owidbot/anomalist.py +++ b/apps/owidbot/anomalist.py @@ -1,12 +1,13 @@ +import time + from structlog import get_logger from apps.anomalist.anomalist_api import anomaly_detection +from apps.anomalist.cli import load_datasets_new_ids from apps.wizard.app_pages.anomalist.utils import load_variable_mapping from etl import grapher_model as gm from etl.config import OWIDEnv -from etl.db import Engine, read_sql - -from .chart_diff import production_or_master_engine +from etl.db import read_sql log = get_logger() @@ -15,13 +16,12 @@ def run(branch: str) -> None: """Compute all anomalist for new and updated datasets.""" # Get engines for branch and production source_engine = OWIDEnv.from_staging(branch).get_engine() - target_engine = production_or_master_engine() # Create table with anomalist if it doesn't exist gm.Anomaly.create_table(source_engine, if_exists="skip") # Load new dataset ids - datasets_new_ids = _load_datasets_new_ids(source_engine, target_engine) + datasets_new_ids = load_datasets_new_ids(source_engine) if not datasets_new_ids: log.info("No new datasets found.") @@ -36,26 +36,13 @@ def run(branch: str) -> None: # Load variable mapping variable_mapping_dict = load_variable_mapping(datasets_new_ids) + log.info("owidbot.anomalist.start", n_variables=len(variable_ids)) + t = time.time() + # Run anomalist anomaly_detection( variable_mapping=variable_mapping_dict, variable_ids=variable_ids, ) - -def _load_datasets_new_ids(source_engine: Engine, target_engine: Engine) -> list[int]: - # Get new datasets - # TODO: replace by real catalogPath when we have it in MySQL - q = """SELECT - id, - CONCAT(namespace, "/", version, "/", shortName) as catalogPath - FROM datasets - """ - source_datasets = read_sql(q, source_engine) - target_datasets = read_sql(q, target_engine) - - return list( - source_datasets[ - source_datasets.catalogPath.isin(set(source_datasets["catalogPath"]) - set(target_datasets["catalogPath"])) - ]["id"] - ) + log.info("owidbot.anomalist.end", n_variables=len(variable_ids), t=time.time() - t) diff --git a/apps/owidbot/chart_diff.py b/apps/owidbot/chart_diff.py index 944dffb5fad..8b5838cbd0e 100644 --- a/apps/owidbot/chart_diff.py +++ b/apps/owidbot/chart_diff.py @@ -2,8 +2,8 @@ from structlog import get_logger from apps.wizard.app_pages.chart_diff.chart_diff import ChartDiffsLoader -from etl.config import OWID_ENV, OWIDEnv, get_container_name -from etl.db import Engine +from etl.config import OWIDEnv, get_container_name +from etl.db import production_or_master_engine from . import github_utils as gh_utils @@ -67,15 +67,6 @@ def run(branch: str, charts_df: pd.DataFrame) -> str: return body -def production_or_master_engine() -> Engine: - """Return the production engine if available, otherwise connect to staging-site-master.""" - if OWID_ENV.env_remote == "production": - return OWID_ENV.get_engine() - else: - log.warning("ENV file doesn't connect to production DB, comparing against staging-site-master") - return OWIDEnv.from_staging("master").get_engine() - - def call_chart_diff(branch: str) -> pd.DataFrame: source_engine = OWIDEnv.from_staging(branch).get_engine() target_engine = production_or_master_engine() diff --git a/etl/db.py b/etl/db.py index 2541f8f8472..a1b40706eb1 100644 --- a/etl/db.py +++ b/etl/db.py @@ -95,3 +95,14 @@ def to_sql(df: pd.DataFrame, name: str, engine: Optional[Engine | Session] = Non return df.to_sql(name, engine.bind, *args, **kwargs) else: raise ValueError(f"Unsupported engine type {type(engine)}") + + +def production_or_master_engine() -> Engine: + """Return the production engine if available, otherwise connect to staging-site-master.""" + if config.OWID_ENV.env_remote == "production": + return config.OWID_ENV.get_engine() + elif config.ENV_FILE_PROD: + return config.OWIDEnv.from_env_file(config.ENV_FILE_PROD).get_engine() + else: + log.warning("ENV file doesn't connect to production DB, comparing against staging-site-master") + return config.OWIDEnv.from_staging("master").get_engine() diff --git a/etl/grapher_io.py b/etl/grapher_io.py index 11d2dbf0864..6663f7b5d14 100644 --- a/etl/grapher_io.py +++ b/etl/grapher_io.py @@ -8,8 +8,10 @@ """ import concurrent.futures +import functools as ft import io import warnings +from collections import defaultdict from http.client import RemoteDisconnected from typing import Any, Dict, List, Optional, cast from urllib.error import HTTPError, URLError @@ -20,6 +22,7 @@ import structlog import validators from deprecated import deprecated +from owid.catalog import Dataset from sqlalchemy.engine import Engine from sqlalchemy.orm import Session from tenacity import Retrying @@ -28,10 +31,10 @@ from tenacity.wait import wait_fixed from etl import config +from etl import grapher_model as gm from etl.config import OWID_ENV, OWIDEnv from etl.db import get_connection, read_sql -from etl.grapher_model import Dataset, Entity, Variable -from etl.paths import CACHE_DIR +from etl.paths import CACHE_DIR, DATA_DIR log = structlog.get_logger() @@ -46,7 +49,7 @@ def load_dataset_uris( ) -> List[str]: """Get list of dataset URIs from the database.""" with Session(owid_env.engine) as session: - datasets = Dataset.load_datasets_uri(session) + datasets = gm.Dataset.load_datasets_uri(session) return list(datasets["dataset_uri"]) @@ -55,10 +58,10 @@ def load_variables_in_dataset( dataset_uri: Optional[List[str]] = None, dataset_id: Optional[List[int]] = None, owid_env: OWIDEnv = OWID_ENV, -) -> List[Variable]: +) -> List[gm.Variable]: """Load Variable objects that belong to a dataset with URI `dataset_uri`.""" with Session(owid_env.engine) as session: - indicators = Variable.load_variables_in_datasets( + indicators = gm.Variable.load_variables_in_datasets( session=session, dataset_uris=dataset_uri, dataset_ids=dataset_id, @@ -71,7 +74,7 @@ def load_variables_in_dataset( def load_variable( id_or_path: str | int, owid_env: OWIDEnv = OWID_ENV, -) -> Variable: +) -> gm.Variable: """Load variable. If id_or_path is str, it'll be used as catalog path. @@ -83,7 +86,7 @@ def load_variable( pass with Session(owid_env.engine) as session: - variable = Variable.from_id_or_path( + variable = gm.Variable.from_id_or_path( session=session, id_or_path=id_or_path, ) @@ -95,7 +98,7 @@ def load_variable( def load_variables( ids_or_paths: List[str | int], owid_env: OWIDEnv = OWID_ENV, -) -> List[Variable]: +) -> List[gm.Variable]: """Load variable. If id_or_path is str, it'll be used as catalog path. @@ -103,7 +106,7 @@ def load_variables( TODO: this should be merged with load_variable! """ with Session(owid_env.engine) as session: - variable = Variable.from_id_or_path( + variable = gm.Variable.from_id_or_path( session=session, id_or_path=ids_or_paths, ) @@ -121,7 +124,7 @@ def load_variables( def load_variable_metadata( catalog_path: Optional[str] = None, variable_id: Optional[int] = None, - variable: Optional[Variable] = None, + variable: Optional[gm.Variable] = None, owid_env: OWIDEnv = OWID_ENV, ) -> Dict[str, Any]: """Get metadata for an indicator based on its catalog path or variable id. @@ -147,7 +150,7 @@ def load_variable_metadata( def load_variable_data( catalog_path: Optional[str] = None, variable_id: Optional[int] = None, - variable: Optional[Variable] = None, + variable: Optional[gm.Variable] = None, owid_env: OWIDEnv = OWID_ENV, set_entity_names: bool = True, ) -> pd.DataFrame: @@ -180,9 +183,9 @@ def load_variable_data( def ensure_load_variable( catalog_path: Optional[str] = None, variable_id: Optional[int] = None, - variable: Optional[Variable] = None, + variable: Optional[gm.Variable] = None, owid_env: OWIDEnv = OWID_ENV, -) -> Variable: +) -> gm.Variable: if variable is None: if catalog_path is not None: variable = load_variable(id_or_path=catalog_path, owid_env=owid_env) @@ -203,7 +206,7 @@ def ensure_load_variable( def load_variables_data( catalog_paths: Optional[List[str]] = None, variable_ids: Optional[List[int]] = None, - variables: Optional[List[Variable]] = None, + variables: Optional[List[gm.Variable]] = None, owid_env: OWIDEnv = OWID_ENV, workers: int = 1, value_as_str: bool = True, @@ -239,7 +242,7 @@ def load_variables_data( def load_variables_metadata( catalog_paths: Optional[List[str]] = None, variable_ids: Optional[List[int]] = None, - variables: Optional[List[Variable]] = None, + variables: Optional[List[gm.Variable]] = None, owid_env: OWIDEnv = OWID_ENV, workers: int = 1, ) -> List[Dict[str, Any]]: @@ -273,11 +276,11 @@ def _ensure_variable_ids( engine: Engine, catalog_paths: Optional[List[str]] = None, variable_ids: Optional[List[int]] = None, - variables: Optional[List[Variable]] = None, + variables: Optional[List[gm.Variable]] = None, ) -> List[int]: if catalog_paths is not None: with Session(engine) as session: - mapping = Variable.catalog_paths_to_variable_ids(session, catalog_paths=catalog_paths) + mapping = gm.Variable.catalog_paths_to_variable_ids(session, catalog_paths=catalog_paths) variable_ids = [int(i) for i in mapping.values()] elif (variable_ids is None) and (variables is not None): variable_ids = [variable.id for variable in variables] @@ -449,11 +452,83 @@ def _fetch_metadata_from_s3(variable_id: int, env: OWIDEnv | None = None) -> Dic def load_entity_mapping(entity_ids: Optional[List[int]] = None, owid_env: OWIDEnv = OWID_ENV) -> Dict[int, str]: # Fetch the mapping of entity ids to names. with Session(owid_env.engine) as session: - entity_id_to_name = Entity.load_entity_mapping(session=session, entity_ids=entity_ids) + entity_id_to_name = gm.Entity.load_entity_mapping(session=session, entity_ids=entity_ids) return entity_id_to_name +def variable_data_df_from_catalog( + engine: Engine, variables: Optional[List[gm.Variable]] = None, variable_ids: Optional[List[int | str]] = None +) -> pd.DataFrame: + """Load all variables for a given dataset from local catalog.""" + if variable_ids: + assert not variables, "Only one of variables or variable_ids must be provided" + with Session(engine) as session: + variables = gm.Variable.from_id_or_path(session, variable_ids, columns=["id", "shortName", "dimensions"]) + elif variables: + assert not variable_ids, "Only one of variables or variable_ids must be provided" + else: + raise ValueError("Either variables or variable_ids must be provided") + + to_read = defaultdict(list) + + # Group variables by dataset and table + # TODO: use CatalogPath object + for variable in variables: + assert variable.catalogPath, f"Variable {variable.id} has no catalogPath" + path, short_name = variable.catalogPath.split("#") + ds_path, table_name = path.rsplit("/", 1) + to_read[(ds_path, table_name)].append(variable) + + # Read the table and load all its variables + tbs = [] + for (ds_path, table_name), variables in to_read.items(): + try: + tb = Dataset(DATA_DIR / ds_path).read_table(table_name) + except FileNotFoundError as e: + raise FileNotFoundError(f"Dataset {ds_path} not found in local catalog.") from e + + # Simple case with no dimensions + if not variables[0].dimensions: + variable_names = [variable.shortName for variable in variables] + tb = tb[["country", "year"] + variable_names] + + # Rename from shortName to id + tb = tb.rename(columns={variable.shortName: variable.id for variable in variables}) + tbs.append(tb) + + # Dimensional case + else: + # NOTE: example of `filters` + # [ + # {'name': 'question', 'value': 'mh1 - Importance of mental health for well-being'}, + # {'name': 'answer', 'value': 'As important'}, + # {'name': 'gender', 'value': 'all'}, + # {'name': 'age_group', 'value': '15-29'} + # ] + filters = variables[0].dimensions["filters"] + dim_names = [f["name"] for f in filters] + tb_pivoted = tb.pivot(index=["country", "year"], columns=dim_names) + + labels = [] + for variable in variables: + assert variable.dimensions, f"Variable {variable.id} has no dimensions" + labels.append( + tuple( + [variable.dimensions["originalShortName"]] + + [f["value"] for f in variable.dimensions["filters"]] + ) + ) + + tb = tb_pivoted.loc[:, labels] + + tb.columns = [variable.id for variable in variables] + tbs.append(tb.reset_index()) + + # TODO: this could be slow for datasets with a lot of tables + return ft.reduce(lambda left, right: pd.merge(left, right, on=["country", "year"], how="outer"), tbs) + + ####################################################################################################### # TO BE REVIEWED: diff --git a/etl/grapher_model.py b/etl/grapher_model.py index 1fc78b1c1cb..6513f36e263 100644 --- a/etl/grapher_model.py +++ b/etl/grapher_model.py @@ -1256,18 +1256,14 @@ def load_variables(cls, session: Session, variables_id: List[int]) -> List["Vari @overload @classmethod def from_id_or_path( - cls, - session: Session, - id_or_path: str | int, + cls, session: Session, id_or_path: str | int, columns: Optional[List[str]] = None ) -> "Variable": ... @overload @classmethod def from_id_or_path( - cls, - session: Session, - id_or_path: List[str | int], + cls, session: Session, id_or_path: List[str | int], columns: Optional[List[str]] = None ) -> List["Variable"]: ... @@ -1276,14 +1272,15 @@ def from_id_or_path( cls, session: Session, id_or_path: int | str | List[str | int], + columns: Optional[List[str]] = None, ) -> "Variable" | List["Variable"]: """Load a variable from the database by its catalog path or variable ID.""" # Single id if isinstance(id_or_path, int): - return cls.from_id(session=session, variable_id=id_or_path) + return cls.from_id(session=session, variable_id=id_or_path, columns=columns) # Single path elif isinstance(id_or_path, str): - return cls.from_catalog_path(session=session, catalog_path=id_or_path) + return cls.from_catalog_path(session=session, catalog_path=id_or_path, columns=columns) # Multiple path or id elif isinstance(id_or_path, list): @@ -1292,10 +1289,10 @@ def from_id_or_path( str_ids = [i for i in id_or_path if isinstance(i, str)] # Multiple IDs if len(int_ids) == len(id_or_path): - return cls.from_id(session=session, variable_id=int_ids) + return cls.from_id(session=session, variable_id=int_ids, columns=columns) # Multiple paths elif len(str_ids) == len(id_or_path): - return cls.from_catalog_path(session=session, catalog_path=str_ids) + return cls.from_catalog_path(session=session, catalog_path=str_ids, columns=columns) else: raise TypeError("All elements in the list must be integers") @@ -1312,40 +1309,46 @@ def from_id_or_path( @overload @classmethod - def from_catalog_path(cls, session: Session, catalog_path: str) -> "Variable": + def from_catalog_path(cls, session: Session, catalog_path: str, columns: Optional[List[str]] = None) -> "Variable": ... @overload @classmethod - def from_catalog_path(cls, session: Session, catalog_path: List[str]) -> List["Variable"]: + def from_catalog_path( + cls, session: Session, catalog_path: List[str], columns: Optional[List[str]] = None + ) -> List["Variable"]: ... @classmethod - def from_catalog_path(cls, session: Session, catalog_path: str | List[str]) -> "Variable" | List["Variable"]: + def from_catalog_path( + cls, session: Session, catalog_path: str | List[str], columns: Optional[List[str]] = None + ) -> "Variable" | List["Variable"]: """Load a variable from the DB by its catalog path.""" assert "#" in catalog_path, "catalog_path should end with #indicator_short_name" if isinstance(catalog_path, str): - return session.scalars(select(cls).where(cls.catalogPath == catalog_path)).one() + return session.scalars(_select_columns(cls, columns).where(cls.catalogPath == catalog_path)).one() elif isinstance(catalog_path, list): - return session.scalars(select(cls).where(cls.catalogPath.in_(catalog_path))).all() # type: ignore + return session.scalars(_select_columns(cls, columns).where(cls.catalogPath.in_(catalog_path))).all() # type: ignore @overload @classmethod - def from_id(cls, session: Session, variable_id: int) -> "Variable": + def from_id(cls, session: Session, variable_id: int, columns: Optional[List[str]] = None) -> "Variable": ... @overload @classmethod - def from_id(cls, session: Session, variable_id: List[int]) -> List["Variable"]: + def from_id(cls, session: Session, variable_id: List[int], columns: Optional[List[str]] = None) -> List["Variable"]: ... @classmethod - def from_id(cls, session: Session, variable_id: int | List[int]) -> "Variable" | List["Variable"]: + def from_id( + cls, session: Session, variable_id: int | List[int], columns: Optional[List[str]] = None + ) -> "Variable" | List["Variable"]: """Load a variable (or list of variables) from the DB by its ID path.""" if isinstance(variable_id, int): - return session.scalars(select(cls).where(cls.id == variable_id)).one() + return session.scalars(_select_columns(cls, columns).where(cls.id == variable_id)).one() elif isinstance(variable_id, list): - return session.scalars(select(cls).where(cls.id.in_(variable_id))).all() # type: ignore + return session.scalars(_select_columns(cls, columns).where(cls.id.in_(variable_id))).all() # type: ignore @classmethod def catalog_paths_to_variable_ids(cls, session: Session, catalog_paths: List[str]) -> Dict[str, int]: @@ -1965,3 +1968,13 @@ def _fetch_entities( df = df.rename(columns=column_renames) return df + + +def _select_columns(cls, columns: Optional[list[str]] = None) -> Select: + # Select only the specified columns, or all if not specified + if columns: + # Use getattr to dynamically select the columns + columns_to_select = [getattr(cls, col) for col in columns] + return select(*columns_to_select) + else: + return select(cls)