Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add anomalist to owidbot #3431

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions apps/anomalist/anomalist_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import structlog
from owid.catalog import find
from sqlalchemy.engine import Engine
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Session

from apps.anomalist.detectors import (
Expand Down Expand Up @@ -211,13 +212,14 @@ def anomaly_detection(
variable_mapping: Optional[dict[int, int]] = None,
variable_ids: Optional[list[int]] = None,
dry_run: bool = False,
force: bool = False,
reset_db: bool = False,
) -> None:
"""Detect anomalies."""
engine = get_engine()

# Ensure the 'anomalies' table exists. Optionally reset it if reset_db is True.
gm.Anomaly.create_table(engine, reset=reset_db)
gm.Anomaly.create_table(engine, if_exists="replace" if reset_db else "skip")

# If no anomaly types are provided, default to all available types
if not anomaly_types:
Expand Down Expand Up @@ -254,6 +256,10 @@ def anomaly_detection(
dataset_variable_ids[variable.datasetId].append(variable)

for dataset_id, variables_in_dataset in dataset_variable_ids.items():
# Get dataset's checksum
with Session(engine) as session:
dataset = gm.Dataset.load_dataset(session, dataset_id)

log.info("Loading data from S3")
variables_old = [
variables[variable_id_old]
Expand All @@ -268,6 +274,11 @@ def anomaly_detection(
if anomaly_type not in ANOMALY_DETECTORS:
raise ValueError(f"Unsupported anomaly type: {anomaly_type}")

if not force:
if not needs_update(engine, dataset, anomaly_type):
log.info(f"Anomaly type {anomaly_type} for dataset {dataset_id} already exists in the database.")
continue

log.info(f"Detecting anomaly type {anomaly_type} for dataset {dataset_id}")

# Instantiate the anomaly detector.
Expand All @@ -289,12 +300,17 @@ def anomaly_detection(
variable_mapping=variable_mapping_for_current_dataset,
)

if df_score.empty:
log.info("No anomalies detected.`")
continue

# Create a long format score dataframe.
df_score_long = get_long_format_score_df(df_score)

# TODO: validate format of the output dataframe
anomaly = gm.Anomaly(
datasetId=dataset_id,
datasetSourceChecksum=dataset.sourceChecksum,
anomalyType=anomaly_type,
)
anomaly.dfScore = df_score_long
Expand Down Expand Up @@ -338,6 +354,22 @@ def anomaly_detection(
session.commit()


def needs_update(engine: Engine, dataset: gm.Dataset, anomaly_type: str) -> bool:
"""If there's an anomaly with the dataset checksum in DB, it doesn't need
to be updated."""
with Session(engine) as session:
try:
anomaly = gm.Anomaly.load(
session,
dataset_id=dataset.id,
anomaly_type=anomaly_type,
)
except NoResultFound:
return True

return anomaly.datasetSourceChecksum != dataset.sourceChecksum


def export_anomalies_file(df: pd.DataFrame, dataset_id: int, anomaly_type: str) -> str:
"""Export anomaly df to local file (and upload to staging server if applicable)."""
filename = f"{dataset_id}_{anomaly_type}.feather"
Expand Down Expand Up @@ -372,16 +404,19 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.
# reorder in the same order as variables
df = df[[v.id for v in variables]]

# try converting to numeric
df = df.astype(float)
# TODO: how should we treat non-numeric variables? We can exclude it here, but then we need to
# fix it in detectors
# HACK: set non-numeric variables to zero
df = df.apply(pd.to_numeric, errors="coerce")
df = df.fillna(0)

# TODO:
# remove countries with all nulls or all zeros or constant values
# df = df.loc[:, df.fillna(0).std(axis=0) != 0]

df = df.reset_index().astype({"entity_name": str})

return df
return df # type: ignore


# @memory.cache
Expand Down
8 changes: 8 additions & 0 deletions apps/anomalist/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@
type=bool,
help="Do not write to target database.",
)
@click.option(
"--force",
"-f",
is_flag=True,
help="TBD",
)
@click.option(
"--reset-db/--no-reset-db",
default=False,
Expand All @@ -61,6 +67,7 @@ def cli(
variable_mapping: str,
variable_ids: Optional[list[int]],
dry_run: bool,
force: bool,
reset_db: bool,
) -> None:
"""TBD
Expand Down Expand Up @@ -111,6 +118,7 @@ def cli(
variable_mapping=variable_mapping_dict,
variable_ids=list(variable_ids) if variable_ids else None,
dry_run=dry_run,
force=force,
reset_db=reset_db,
)

Expand Down
9 changes: 6 additions & 3 deletions apps/anomalist/gp_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def _processing_queue(items: list[tuple[str, int]]) -> List[tuple]:
class AnomalyGaussianProcessOutlier(AnomalyDetector):
anomaly_type = "gp_outlier"

# TODO: max_time is hard-coded to 100, but it should be configurable in production
def __init__(self, max_time: Optional[float] = 100, n_jobs: int = 1):
# TODO: max_time is hard-coded to 10, but it should be configurable in production
def __init__(self, max_time: Optional[float] = 10, n_jobs: int = 1):
self.max_time = max_time
self.n_jobs = n_jobs

Expand Down Expand Up @@ -100,7 +100,7 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi
group = df_wide.loc[(entity_name, variable_id)]

# Skip if the series has only one or fewer data points
if len(group) <= 1:
if isinstance(group, pd.Series) or len(group) <= 1:
continue

# Prepare the input features (X) and target values (y) for Gaussian Process
Expand Down Expand Up @@ -130,6 +130,9 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi

log.info("Finished processing", elapsed=round(time.time() - start_time, 2))

if not results:
return pd.DataFrame()

df_score_long = pd.concat(results).reset_index()

# Normalize the anomaly scores by mapping interval (0, 3+) to (0, 1)
Expand Down
61 changes: 61 additions & 0 deletions apps/owidbot/anomalist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from structlog import get_logger

from apps.anomalist.anomalist_api import anomaly_detection
from apps.wizard.app_pages.anomalist.utils import load_variable_mapping
from etl import grapher_model as gm
from etl.config import OWIDEnv
from etl.db import Engine, read_sql

from .chart_diff import production_or_master_engine

log = get_logger()


def run(branch: str) -> None:
"""Compute all anomalist for new and updated datasets."""
# Get engines for branch and production
source_engine = OWIDEnv.from_staging(branch).get_engine()
target_engine = production_or_master_engine()

# Create table with anomalist if it doesn't exist
gm.Anomaly.create_table(source_engine, if_exists="skip")

# Load new dataset ids
datasets_new_ids = _load_datasets_new_ids(source_engine, target_engine)

if not datasets_new_ids:
log.info("No new datasets found.")
return

log.info(f"New datasets: {datasets_new_ids}")

# Load all their variables
q = """SELECT id FROM variables WHERE datasetId IN %(dataset_ids)s"""
variable_ids = list(read_sql(q, source_engine, params={"dataset_ids": datasets_new_ids})["id"])

# Load variable mapping
variable_mapping_dict = load_variable_mapping(datasets_new_ids)

# Run anomalist
anomaly_detection(
variable_mapping=variable_mapping_dict,
variable_ids=variable_ids,
)


def _load_datasets_new_ids(source_engine: Engine, target_engine: Engine) -> list[int]:
# Get new datasets
# TODO: replace by real catalogPath when we have it in MySQL
q = """SELECT
id,
CONCAT(namespace, "/", version, "/", shortName) as catalogPath
FROM datasets
"""
source_datasets = read_sql(q, source_engine)
target_datasets = read_sql(q, target_engine)

return list(
source_datasets[
source_datasets.catalogPath.isin(set(source_datasets["catalogPath"]) - set(target_datasets["catalogPath"]))
]["id"]
)
15 changes: 10 additions & 5 deletions apps/owidbot/chart_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from apps.wizard.app_pages.chart_diff.chart_diff import ChartDiffsLoader
from etl.config import OWID_ENV, OWIDEnv, get_container_name
from etl.db import Engine

from . import github_utils as gh_utils

Expand Down Expand Up @@ -66,14 +67,18 @@ def run(branch: str, charts_df: pd.DataFrame) -> str:
return body


def call_chart_diff(branch: str) -> pd.DataFrame:
source_engine = OWIDEnv.from_staging(branch).get_engine()

def production_or_master_engine() -> Engine:
"""Return the production engine if available, otherwise connect to staging-site-master."""
if OWID_ENV.env_remote == "production":
target_engine = OWID_ENV.get_engine()
return OWID_ENV.get_engine()
else:
log.warning("ENV file doesn't connect to production DB, comparing against staging-site-master")
target_engine = OWIDEnv.from_staging("master").get_engine()
return OWIDEnv.from_staging("master").get_engine()


def call_chart_diff(branch: str) -> pd.DataFrame:
source_engine = OWIDEnv.from_staging(branch).get_engine()
target_engine = production_or_master_engine()

df = ChartDiffsLoader(source_engine, target_engine).get_diffs_summary_df(
config=True,
Expand Down
9 changes: 7 additions & 2 deletions apps/owidbot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from rich import print
from rich_click.rich_command import RichCommand

from apps.owidbot import chart_diff, data_diff, grapher
from apps.owidbot import anomalist, chart_diff, data_diff, grapher
from etl.config import get_container_name

from . import github_utils as gh_utils

log = structlog.get_logger()

REPOS = Literal["etl", "owid-grapher"]
SERVICES = Literal["data-diff", "chart-diff", "grapher"]
SERVICES = Literal["data-diff", "chart-diff", "grapher", "anomalist"]


@click.command("owidbot", cls=RichCommand, help=__doc__)
Expand Down Expand Up @@ -76,6 +76,11 @@ def cli(

elif service == "grapher":
services_body["grapher"] = grapher.run(branch)

elif service == "anomalist":
# TODO: anomalist could post a summary of anomalies to the PR
anomalist.run(branch)

else:
raise AssertionError("Invalid service")

Expand Down
34 changes: 22 additions & 12 deletions apps/wizard/app_pages/anomalist/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Utils for chart revision tool."""
from enum import Enum
from typing import Dict, List, Tuple
from typing import Dict, List, Optional, Tuple

import pandas as pd
import streamlit as st
Expand Down Expand Up @@ -54,6 +54,21 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D
datasets_new_ids = list(dataset_new_and_old)

# Load mapping created by indicator upgrader (if any).
variable_mapping = load_variable_mapping(datasets_new_ids, dataset_new_and_old)

# For convenience, create a dataset name "[id] Name".
df_datasets["id_name"] = "[" + df_datasets["id"].astype(str) + "] " + df_datasets["name"]
# List all grapher datasets.
datasets_all = df_datasets[["id", "id_name"]].set_index("id").squeeze().to_dict()
# List new datasets.
datasets_new = {k: v for k, v in datasets_all.items() if k in datasets_new_ids}

return datasets_all, datasets_new, variable_mapping # type: ignore


def load_variable_mapping(
datasets_new_ids: List[int], dataset_new_and_old: Optional[Dict[int, Optional[int]]] = None
) -> Dict[int, int]:
mapping = WizardDB.get_variable_mapping_raw()
if len(mapping) > 0:
log.info("Using variable mapping created by indicator upgrader.")
Expand All @@ -68,7 +83,7 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D
)
# Create a mapping dictionary.
variable_mapping = mapping.set_index("id_old")["id_new"].to_dict()
else:
elif dataset_new_and_old:
log.info("Inferring variable mapping (since no mapping was created by indicator upgrader).")
# Infer the mapping of the new datasets (assuming no names have changed).
variable_mapping = dict()
Expand All @@ -77,24 +92,19 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D
continue
# Infer
variable_mapping.update(infer_variable_mapping(dataset_id_new, dataset_id_old))
else:
# No mapping available.
variable_mapping = dict()

# For convenience, create a dataset name "[id] Name".
df_datasets["id_name"] = "[" + df_datasets["id"].astype(str) + "] " + df_datasets["name"]
# List all grapher datasets.
datasets_all = df_datasets[["id", "id_name"]].set_index("id").squeeze().to_dict()

# List new datasets.
datasets_new = {k: v for k, v in datasets_all.items() if k in datasets_new_ids}

return datasets_all, datasets_new, variable_mapping # type: ignore
return variable_mapping # type: ignore


def create_tables(_owid_env: OWIDEnv = OWID_ENV):
"""Create all required tables.

If exist, nothing is created.
"""
gm.Anomaly.create_table(_owid_env.engine)
gm.Anomaly.create_table(_owid_env.engine, if_exists="skip")


@st.cache_data(show_spinner=False)
Expand Down
Loading
Loading