Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ anomalist: Improve automatic detection of new datasets #3429

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/wizard/app_pages/anomalist/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ def _change_chart_selection(df, key_table, key_selection):
# If anomalies for dataset already exist in DB, load them. Warn user that these are being loaded from DB
if st.session_state.anomalist_datasets_submitted:
# 3.1/ Check if anomalies are already there in DB
st.session_state.anomalist_anomalies = WizardDB.load_anomalies(st.session_state.anomalist_datasets_selected)
with st.spinner("Loading anomalies (already detected) from database..."):
st.session_state.anomalist_anomalies = WizardDB.load_anomalies(st.session_state.anomalist_datasets_selected)

# Load indicators in selected datasets
st.session_state.anomalist_indicators = cached.load_variables_display_in_dataset(
Expand All @@ -484,7 +485,7 @@ def _change_chart_selection(df, key_table, key_selection):

# Get indicator IDs
variable_ids = list(st.session_state.anomalist_indicators.keys())
st.session_state.anomalist_mapping = get_variable_mapping(variable_ids)
st.session_state.anomalist_mapping = {k: v for k, v in VARIABLE_MAPPING.items() if v in variable_ids}
st.session_state.anomalist_mapping_inv = {v: k for k, v in st.session_state.anomalist_mapping.items()}

# 3.2/ No anomaly found in DB, estimate them
Expand Down
61 changes: 37 additions & 24 deletions apps/wizard/app_pages/anomalist/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import pandas as pd
import streamlit as st
from sqlalchemy.orm import Session
from structlog import get_logger

import etl.grapher_model as gm
from apps.anomalist.anomalist_api import add_auxiliary_scores, combine_and_reduce_scores_df
from apps.wizard.utils.db import WizardDB
from apps.wizard.utils.io import get_steps_df
from apps.wizard.utils.io import get_new_grapher_datasets_and_their_previous_versions
from etl.config import OWID_ENV, OWIDEnv
from etl.db import get_engine

# Logger
log = get_logger()
Expand All @@ -24,32 +26,40 @@ class AnomalyTypeEnum(Enum):
# AI = "ai" # Uncomment if needed


# TODO: Consider refactoring the following function, which does too many things.
def infer_variable_mapping(dataset_id_new: int, dataset_id_old: int) -> Dict[int, int]:
engine = get_engine()
with Session(engine) as session:
variables_new = gm.Variable.load_variables_in_datasets(session=session, dataset_ids=[dataset_id_new])
variables_old = gm.Variable.load_variables_in_datasets(session=session, dataset_ids=[dataset_id_old])
# Create a mapping from old ids to new variable ids for variables whose shortNames are identical in the old and new versions.
_variables = {variable.shortName: variable.id for variable in variables_new}
variable_mapping = {
old_variable.id: _variables[old_variable.shortName]
for old_variable in variables_old
if old_variable.shortName in _variables
}
return variable_mapping


@st.cache_data(show_spinner=False)
@st.spinner("Retrieving datasets...")
def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], Dict[int, int]]:
# Tuple[pd.DataFrame, List[Dict[str, Dict[str, Any]]], Dict[int, int]]:
# NOTE: The following ignores DB datasets that are archived (which is a bit unexpected).
# I had to manually un-archive the testing datasets from the database manually to make things work.
# This could be fixed, but maybe it's not necessary (since we won't archive an old version of a dataset until the
# new has been analyzed).
steps_df_grapher, grapher_changes = get_steps_df(archived=True)

# List new dataset ids based on changes in files.
datasets_new_ids = [ds["new"]["id"] for ds in grapher_changes]
# Get all datasets from DB.
df_datasets = gm.Dataset.load_all_datasets()

# Replace NaN with empty string in etl paths (otherwise dataset won't be shown if 'show step names' is chosen)
steps_df_grapher["step"] = steps_df_grapher["step"].fillna("")
# Detect local files that correspond to new or modified grapher steps, identify their corresponding grapher dataset ids, and the grapher dataset id of the previous version (if any).
dataset_new_and_old = get_new_grapher_datasets_and_their_previous_versions()

# Add a convenient column for "[dataset id] Dataset Name"
steps_df_grapher["id_name"] = [f"[{ds['id']}] {ds['name']}" for ds in steps_df_grapher.to_dict(orient="records")]
# List new dataset ids.
datasets_new_ids = list(dataset_new_and_old)

# Load mapping created by indicator upgrader (if any).
mapping = WizardDB.get_variable_mapping_raw()
if len(mapping) > 0:
log.info("Using variable mapping created by indicator upgrader.")
# Set of ids of new datasets that appear in the mapping generated by indicator upgrader.
datasets_new_mapped = set(mapping["dataset_id_new"])
# Set of ids of datasets that have appear as new datasets in the grapher_changes.
# Set of ids of expected new datasets.
datasets_new_expected = set(datasets_new_ids)
# Sanity check.
if not (datasets_new_mapped <= datasets_new_expected):
Expand All @@ -58,17 +68,20 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D
)
# Create a mapping dictionary.
variable_mapping = mapping.set_index("id_old")["id_new"].to_dict()
# Sanity check.
# TODO: Remove this check once we're sure that this works properly (to save time).
assert variable_mapping == WizardDB.get_variable_mapping(), "Unexpected mapping issues."
else:
# NOTE: Here we could also infer the mapping of the new datasets (assuming no names have changed).
# This could be useful if a user wants to compare two arbitrary versions of existing grapher datasets.
log.info("Inferring variable mapping (since no mapping was created by indicator upgrader).")
# Infer the mapping of the new datasets (assuming no names have changed).
variable_mapping = dict()

for dataset_id_new, dataset_id_old in dataset_new_and_old.items():
if dataset_id_old is None:
continue
# Infer
variable_mapping.update(infer_variable_mapping(dataset_id_new, dataset_id_old))

# For convenience, create a dataset name "[id] Name".
df_datasets["id_name"] = "[" + df_datasets["id"].astype(str) + "] " + df_datasets["name"]
# List all grapher datasets.
datasets_all = steps_df_grapher["id_name"].to_list()
datasets_all = steps_df_grapher[["id", "id_name"]].set_index("id").squeeze().to_dict()
datasets_all = df_datasets[["id", "id_name"]].set_index("id").squeeze().to_dict()

# List new datasets.
datasets_new = {k: v for k, v in datasets_all.items() if k in datasets_new_ids}
Expand Down
99 changes: 99 additions & 0 deletions apps/wizard/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@

Together with utils.db and utils.cached, it might need some rethinking on where it goes.
"""
from pathlib import Path
from typing import Dict, List, Optional

import pandas as pd
from pymysql import OperationalError
from sqlalchemy.orm import Session
from structlog import get_logger

import etl.grapher_model as gm
from apps.wizard.utils.cached import get_datasets_from_version_tracker
from etl.db import get_engine
from etl.git_helpers import get_changed_files
from etl.grapher_io import get_all_datasets
from etl.paths import BASE_DIR, STEP_DIR

# Initialize logger.
log = get_logger()


########################################################################################################################
# Consider deprecating this function, which is very slow, and possibly an overkill.
# NOTE: I think it can easily be replaced in the context of Anomalist, but unclear yet if it can be replace in the context of indicator upgrader.
def get_steps_df(archived: bool = True):
"""Get steps_df, and grapher_changes from version tracker."""
# NOTE: The following ignores DB datasets that are archived (which is a bit unexpected).
Expand Down Expand Up @@ -41,3 +57,86 @@ def get_steps_df(archived: bool = True):
assert steps_df_grapher["namespace"].notna().all(), "NaNs found in `namespace`"

return steps_df_grapher, grapher_changes


########################################################################################################################


def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]:
"""Get list of new grapher steps with their corresponding old steps."""
grapher_steps = []
for file_path, file_status in files_changed.items():
# File status can be: D (deleted), A (added), M (modified).
# NOTE: In principle, we could select only "A" files. But it is possible that the user adds a new grapher step, and then commits changes to it, in which case (I think) the status would be "M".

# If deleted, skip loop iteration
if file_status == "D":
# Skip deleted files.
continue

# Identify grapher data steps, and ignore the rest.
if file_path.startswith(STEP_DIR.relative_to(BASE_DIR).as_posix()) and file_path.endswith(".py"):
if Path(file_path).with_suffix("").as_posix().split("/")[-4] == "grapher":
grapher_steps.append(file_path)
else:
continue

return grapher_steps


def get_new_grapher_datasets_and_their_previous_versions() -> Dict[int, Optional[int]]:
"""Detect which local grapher step files have changed, identify their corresponding grapher dataset ids, and the grapher dataset id of the previous version (if any).

The result is a dictionary {dataset_id (of the new dataset): previous_dataset_id or None (if there is no previous version)}.
"""
# Get list of all files changed locally.
files_changed = get_changed_files()
# Select new (or modified) grapher steps.
grapher_steps = get_changed_grapher_steps(files_changed)
# Get properties of the modified grapher steps.
namespaces = sorted(set([step.split("/")[-3] for step in grapher_steps]))
short_names = sorted(set([step.split("/")[-1].replace(".py", "") for step in grapher_steps]))
# Initialize database engine.
engine = get_engine()

# Load all relevant grapher datasets from DB.
with Session(engine) as session:
datasets = (
session.query(gm.Dataset)
.filter(
gm.Dataset.namespace.in_(namespaces),
gm.Dataset.shortName.in_(short_names),
)
.all()
)
df_datasets = pd.DataFrame(datasets)
# For each modified grapher step, check if the corresponding dataset is the latest version.
# If there is no dataset, raise a warning (either it has not been run yet, or it was deleted).
new_datasets = dict()
for grapher_step in grapher_steps:
namespace, version, short_name = grapher_step.replace(".py", "").split("/")[-3:]
selected_datasets = df_datasets[
(df_datasets["namespace"] == namespace) & (df_datasets["shortName"] == short_name)
].sort_values("version", ascending=False)
if (len(selected_datasets) == 0) or (version not in selected_datasets["version"].tolist()):
log.warning(
f"Warning: No grapher dataset found for {grapher_step}. It might not have been run yet, or it was deleted from DB."
)
continue

# Check if the dataset is the latest version.
if selected_datasets["version"].iloc[0] == version:
# Find the dataset id of the current grapher dataset.
ds_id = selected_datasets["id"].iloc[0]
# This is new grapher dataset and will be added to the dictionary.
# But let's also find out if there is a previous version.
if len(selected_datasets) > 1:
# Get the dataset id of the previous version.
previous_dataset = selected_datasets["id"].iloc[1]
else:
# There was no previous version.
previous_dataset = None
# Add the dataset to the dictionary.
new_datasets[ds_id] = previous_dataset

return new_datasets
5 changes: 5 additions & 0 deletions etl/grapher_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,11 @@ def load_datasets_uri(cls, session: Session):
"""
return read_sql(query, session)

@classmethod
def load_all_datasets(cls) -> pd.DataFrame:
"""Get all the content of the grapher `datasets` table in DB as a dataframe."""
return read_sql("select * from datasets")


class SourceDescription(TypedDict, total=False):
link: Optional[str]
Expand Down
Loading