diff --git a/apps/wizard/app_pages/anomalist/app.py b/apps/wizard/app_pages/anomalist/app.py index eb97d64f556..a389b6cf77e 100644 --- a/apps/wizard/app_pages/anomalist/app.py +++ b/apps/wizard/app_pages/anomalist/app.py @@ -474,7 +474,8 @@ def _change_chart_selection(df, key_table, key_selection): # If anomalies for dataset already exist in DB, load them. Warn user that these are being loaded from DB if st.session_state.anomalist_datasets_submitted: # 3.1/ Check if anomalies are already there in DB - st.session_state.anomalist_anomalies = WizardDB.load_anomalies(st.session_state.anomalist_datasets_selected) + with st.spinner("Loading anomalies (already detected) from database..."): + st.session_state.anomalist_anomalies = WizardDB.load_anomalies(st.session_state.anomalist_datasets_selected) # Load indicators in selected datasets st.session_state.anomalist_indicators = cached.load_variables_display_in_dataset( @@ -484,7 +485,7 @@ def _change_chart_selection(df, key_table, key_selection): # Get indicator IDs variable_ids = list(st.session_state.anomalist_indicators.keys()) - st.session_state.anomalist_mapping = get_variable_mapping(variable_ids) + st.session_state.anomalist_mapping = {k: v for k, v in VARIABLE_MAPPING.items() if v in variable_ids} st.session_state.anomalist_mapping_inv = {v: k for k, v in st.session_state.anomalist_mapping.items()} # 3.2/ No anomaly found in DB, estimate them diff --git a/apps/wizard/app_pages/anomalist/utils.py b/apps/wizard/app_pages/anomalist/utils.py index 5c23af409c8..ae25dd2506c 100644 --- a/apps/wizard/app_pages/anomalist/utils.py +++ b/apps/wizard/app_pages/anomalist/utils.py @@ -4,13 +4,15 @@ import pandas as pd import streamlit as st +from sqlalchemy.orm import Session from structlog import get_logger import etl.grapher_model as gm from apps.anomalist.anomalist_api import add_auxiliary_scores, combine_and_reduce_scores_df from apps.wizard.utils.db import WizardDB -from apps.wizard.utils.io import get_steps_df +from apps.wizard.utils.io import get_new_grapher_datasets_and_their_previous_versions from etl.config import OWID_ENV, OWIDEnv +from etl.db import get_engine # Logger log = get_logger() @@ -24,32 +26,40 @@ class AnomalyTypeEnum(Enum): # AI = "ai" # Uncomment if needed -# TODO: Consider refactoring the following function, which does too many things. +def infer_variable_mapping(dataset_id_new: int, dataset_id_old: int) -> Dict[int, int]: + engine = get_engine() + with Session(engine) as session: + variables_new = gm.Variable.load_variables_in_datasets(session=session, dataset_ids=[dataset_id_new]) + variables_old = gm.Variable.load_variables_in_datasets(session=session, dataset_ids=[dataset_id_old]) + # Create a mapping from old ids to new variable ids for variables whose shortNames are identical in the old and new versions. + _variables = {variable.shortName: variable.id for variable in variables_new} + variable_mapping = { + old_variable.id: _variables[old_variable.shortName] + for old_variable in variables_old + if old_variable.shortName in _variables + } + return variable_mapping + + @st.cache_data(show_spinner=False) @st.spinner("Retrieving datasets...") def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], Dict[int, int]]: - # Tuple[pd.DataFrame, List[Dict[str, Dict[str, Any]]], Dict[int, int]]: - # NOTE: The following ignores DB datasets that are archived (which is a bit unexpected). - # I had to manually un-archive the testing datasets from the database manually to make things work. - # This could be fixed, but maybe it's not necessary (since we won't archive an old version of a dataset until the - # new has been analyzed). - steps_df_grapher, grapher_changes = get_steps_df(archived=True) - - # List new dataset ids based on changes in files. - datasets_new_ids = [ds["new"]["id"] for ds in grapher_changes] + # Get all datasets from DB. + df_datasets = gm.Dataset.load_all_datasets() - # Replace NaN with empty string in etl paths (otherwise dataset won't be shown if 'show step names' is chosen) - steps_df_grapher["step"] = steps_df_grapher["step"].fillna("") + # Detect local files that correspond to new or modified grapher steps, identify their corresponding grapher dataset ids, and the grapher dataset id of the previous version (if any). + dataset_new_and_old = get_new_grapher_datasets_and_their_previous_versions() - # Add a convenient column for "[dataset id] Dataset Name" - steps_df_grapher["id_name"] = [f"[{ds['id']}] {ds['name']}" for ds in steps_df_grapher.to_dict(orient="records")] + # List new dataset ids. + datasets_new_ids = list(dataset_new_and_old) # Load mapping created by indicator upgrader (if any). mapping = WizardDB.get_variable_mapping_raw() if len(mapping) > 0: + log.info("Using variable mapping created by indicator upgrader.") # Set of ids of new datasets that appear in the mapping generated by indicator upgrader. datasets_new_mapped = set(mapping["dataset_id_new"]) - # Set of ids of datasets that have appear as new datasets in the grapher_changes. + # Set of ids of expected new datasets. datasets_new_expected = set(datasets_new_ids) # Sanity check. if not (datasets_new_mapped <= datasets_new_expected): @@ -58,17 +68,20 @@ def get_datasets_and_mapping_inputs() -> Tuple[Dict[int, str], Dict[int, str], D ) # Create a mapping dictionary. variable_mapping = mapping.set_index("id_old")["id_new"].to_dict() - # Sanity check. - # TODO: Remove this check once we're sure that this works properly (to save time). - assert variable_mapping == WizardDB.get_variable_mapping(), "Unexpected mapping issues." else: - # NOTE: Here we could also infer the mapping of the new datasets (assuming no names have changed). - # This could be useful if a user wants to compare two arbitrary versions of existing grapher datasets. + log.info("Inferring variable mapping (since no mapping was created by indicator upgrader).") + # Infer the mapping of the new datasets (assuming no names have changed). variable_mapping = dict() - + for dataset_id_new, dataset_id_old in dataset_new_and_old.items(): + if dataset_id_old is None: + continue + # Infer + variable_mapping.update(infer_variable_mapping(dataset_id_new, dataset_id_old)) + + # For convenience, create a dataset name "[id] Name". + df_datasets["id_name"] = "[" + df_datasets["id"].astype(str) + "] " + df_datasets["name"] # List all grapher datasets. - datasets_all = steps_df_grapher["id_name"].to_list() - datasets_all = steps_df_grapher[["id", "id_name"]].set_index("id").squeeze().to_dict() + datasets_all = df_datasets[["id", "id_name"]].set_index("id").squeeze().to_dict() # List new datasets. datasets_new = {k: v for k, v in datasets_all.items() if k in datasets_new_ids} diff --git a/apps/wizard/utils/io.py b/apps/wizard/utils/io.py index d8a7b2367d8..f2e0c6bb82d 100644 --- a/apps/wizard/utils/io.py +++ b/apps/wizard/utils/io.py @@ -2,12 +2,28 @@ Together with utils.db and utils.cached, it might need some rethinking on where it goes. """ +from pathlib import Path +from typing import Dict, List, Optional + +import pandas as pd from pymysql import OperationalError +from sqlalchemy.orm import Session +from structlog import get_logger +import etl.grapher_model as gm from apps.wizard.utils.cached import get_datasets_from_version_tracker +from etl.db import get_engine +from etl.git_helpers import get_changed_files from etl.grapher_io import get_all_datasets +from etl.paths import BASE_DIR, STEP_DIR + +# Initialize logger. +log = get_logger() +######################################################################################################################## +# Consider deprecating this function, which is very slow, and possibly an overkill. +# NOTE: I think it can easily be replaced in the context of Anomalist, but unclear yet if it can be replace in the context of indicator upgrader. def get_steps_df(archived: bool = True): """Get steps_df, and grapher_changes from version tracker.""" # NOTE: The following ignores DB datasets that are archived (which is a bit unexpected). @@ -41,3 +57,86 @@ def get_steps_df(archived: bool = True): assert steps_df_grapher["namespace"].notna().all(), "NaNs found in `namespace`" return steps_df_grapher, grapher_changes + + +######################################################################################################################## + + +def get_changed_grapher_steps(files_changed: Dict[str, Dict[str, str]]) -> List[str]: + """Get list of new grapher steps with their corresponding old steps.""" + grapher_steps = [] + for file_path, file_status in files_changed.items(): + # File status can be: D (deleted), A (added), M (modified). + # NOTE: In principle, we could select only "A" files. But it is possible that the user adds a new grapher step, and then commits changes to it, in which case (I think) the status would be "M". + + # If deleted, skip loop iteration + if file_status == "D": + # Skip deleted files. + continue + + # Identify grapher data steps, and ignore the rest. + if file_path.startswith(STEP_DIR.relative_to(BASE_DIR).as_posix()) and file_path.endswith(".py"): + if Path(file_path).with_suffix("").as_posix().split("/")[-4] == "grapher": + grapher_steps.append(file_path) + else: + continue + + return grapher_steps + + +def get_new_grapher_datasets_and_their_previous_versions() -> Dict[int, Optional[int]]: + """Detect which local grapher step files have changed, identify their corresponding grapher dataset ids, and the grapher dataset id of the previous version (if any). + + The result is a dictionary {dataset_id (of the new dataset): previous_dataset_id or None (if there is no previous version)}. + """ + # Get list of all files changed locally. + files_changed = get_changed_files() + # Select new (or modified) grapher steps. + grapher_steps = get_changed_grapher_steps(files_changed) + # Get properties of the modified grapher steps. + namespaces = sorted(set([step.split("/")[-3] for step in grapher_steps])) + short_names = sorted(set([step.split("/")[-1].replace(".py", "") for step in grapher_steps])) + # Initialize database engine. + engine = get_engine() + + # Load all relevant grapher datasets from DB. + with Session(engine) as session: + datasets = ( + session.query(gm.Dataset) + .filter( + gm.Dataset.namespace.in_(namespaces), + gm.Dataset.shortName.in_(short_names), + ) + .all() + ) + df_datasets = pd.DataFrame(datasets) + # For each modified grapher step, check if the corresponding dataset is the latest version. + # If there is no dataset, raise a warning (either it has not been run yet, or it was deleted). + new_datasets = dict() + for grapher_step in grapher_steps: + namespace, version, short_name = grapher_step.replace(".py", "").split("/")[-3:] + selected_datasets = df_datasets[ + (df_datasets["namespace"] == namespace) & (df_datasets["shortName"] == short_name) + ].sort_values("version", ascending=False) + if (len(selected_datasets) == 0) or (version not in selected_datasets["version"].tolist()): + log.warning( + f"Warning: No grapher dataset found for {grapher_step}. It might not have been run yet, or it was deleted from DB." + ) + continue + + # Check if the dataset is the latest version. + if selected_datasets["version"].iloc[0] == version: + # Find the dataset id of the current grapher dataset. + ds_id = selected_datasets["id"].iloc[0] + # This is new grapher dataset and will be added to the dictionary. + # But let's also find out if there is a previous version. + if len(selected_datasets) > 1: + # Get the dataset id of the previous version. + previous_dataset = selected_datasets["id"].iloc[1] + else: + # There was no previous version. + previous_dataset = None + # Add the dataset to the dictionary. + new_datasets[ds_id] = previous_dataset + + return new_datasets diff --git a/etl/grapher_model.py b/etl/grapher_model.py index 316c7aeb460..0b7469f1a6b 100644 --- a/etl/grapher_model.py +++ b/etl/grapher_model.py @@ -673,6 +673,11 @@ def load_datasets_uri(cls, session: Session): """ return read_sql(query, session) + @classmethod + def load_all_datasets(cls) -> pd.DataFrame: + """Get all the content of the grapher `datasets` table in DB as a dataframe.""" + return read_sql("select * from datasets") + class SourceDescription(TypedDict, total=False): link: Optional[str]