Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ anomalist: improve utils #3385

Merged
merged 18 commits into from
Oct 9, 2024
2 changes: 1 addition & 1 deletion api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _load_and_validate_indicator(catalog_path: str) -> gm.Variable:
# update YAML file
with Session(engine) as session:
try:
db_indicator = gm.Variable.load_from_catalog_path(session, catalog_path)
db_indicator = gm.Variable.from_id_or_path(session, catalog_path)
except NoResultFound:
raise HTTPException(
404,
Expand Down
2 changes: 1 addition & 1 deletion apps/backport/backport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
from apps.backport.datasync.data_metadata import (
_variable_metadata,
variable_data,
variable_data_df_from_s3,
)
from apps.backport.datasync.datasync import upload_gzip_dict
from etl import config, paths
from etl import grapher_model as gm
from etl.backport_helpers import GrapherConfig
from etl.db import get_engine, read_sql
from etl.files import checksum_str
from etl.grapher_io import variable_data_df_from_s3
from etl.snapshot import Snapshot, SnapshotMeta

from . import utils
Expand Down
134 changes: 1 addition & 133 deletions apps/backport/datasync/data_metadata.py
Original file line number Diff line number Diff line change
@@ -1,148 +1,16 @@
import concurrent.futures
import json
from copy import deepcopy
from http.client import RemoteDisconnected
from typing import Any, Dict, List, Union, cast
from urllib.error import HTTPError, URLError
from typing import Any, Dict, List, Union

import numpy as np
import pandas as pd
import requests
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from structlog import get_logger
from tenacity import Retrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from etl import config
from etl.config import OWIDEnv
from etl.db import read_sql

log = get_logger()


def _fetch_data_df_from_s3(variable_id: int):
try:
# Cloudflare limits us to 600 requests per minute, retry in case we hit the limit
# NOTE: increase wait time or attempts if we hit the limit too often
for attempt in Retrying(
wait=wait_fixed(2),
stop=stop_after_attempt(3),
retry=retry_if_exception_type((URLError, RemoteDisconnected)),
):
with attempt:
return (
pd.read_json(config.variable_data_url(variable_id))
.rename(
columns={
"entities": "entityId",
"values": "value",
"years": "year",
}
)
.assign(variableId=variable_id)
)
# no data on S3
except HTTPError:
return pd.DataFrame(columns=["variableId", "entityId", "year", "value"])


def variable_data_df_from_s3(
engine: Engine,
variable_ids: List[int] = [],
workers: int = 1,
value_as_str: bool = True,
) -> pd.DataFrame:
"""Fetch data from S3 and add entity code and name from DB."""
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(_fetch_data_df_from_s3, variable_ids))

if isinstance(results, list) and all(isinstance(df, pd.DataFrame) for df in results):
df = pd.concat(cast(List[pd.DataFrame], results))
else:
raise TypeError(f"results must be a list of pd.DataFrame, got {type(results)}")

# we work with strings and convert to specific types later
if value_as_str:
df["value"] = df["value"].astype("string")

with Session(engine) as session:
res = add_entity_code_and_name(session, df)
return res


def _fetch_metadata_from_s3(variable_id: int, env: OWIDEnv | None = None) -> Dict[str, Any] | None:
try:
# Cloudflare limits us to 600 requests per minute, retry in case we hit the limit
# NOTE: increase wait time or attempts if we hit the limit too often
for attempt in Retrying(
wait=wait_fixed(2),
stop=stop_after_attempt(3),
retry=retry_if_exception_type((URLError, RemoteDisconnected)),
):
with attempt:
if env is not None:
url = env.indicator_metadata_url(variable_id)
else:
url = config.variable_metadata_url(variable_id)
return requests.get(url).json()
# no data on S3
except HTTPError:
return {}


def variable_metadata_df_from_s3(
variable_ids: List[int] = [],
workers: int = 1,
env: OWIDEnv | None = None,
) -> List[Dict[str, Any]]:
"""Fetch data from S3 and add entity code and name from DB."""
args = [variable_ids]
if env:
args += [[env for _ in range(len(variable_ids))]]

with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(_fetch_metadata_from_s3, *args))

if not (isinstance(results, list) and all(isinstance(res, dict) for res in results)):
raise TypeError(f"results must be a list of dictionaries, got {type(results)}")

return results # type: ignore


def _fetch_entities(session: Session, entity_ids: List[int]) -> pd.DataFrame:
# Query entities from the database
q = """
SELECT
id AS entityId,
name AS entityName,
code AS entityCode
FROM entities
WHERE id in %(entity_ids)s
"""
return read_sql(q, session, params={"entity_ids": entity_ids})


def add_entity_code_and_name(session: Session, df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
df["entityName"] = []
df["entityCode"] = []
return df

unique_entities = df["entityId"].unique()

entities = _fetch_entities(session, list(unique_entities))

if set(unique_entities) - set(entities.entityId):
missing_entities = set(unique_entities) - set(entities.entityId)
raise ValueError(f"Missing entities in the database: {missing_entities}")

return pd.merge(df, entities.astype({"entityName": "category", "entityCode": "category"}), on="entityId")


def variable_data(data_df: pd.DataFrame) -> Dict[str, Any]:
data_df = data_df.rename(
columns={
Expand Down
2 changes: 1 addition & 1 deletion apps/explorer_update/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from structlog import get_logger
from tqdm.auto import tqdm

from etl.db import get_variables_data
from etl.grapher_io import get_variables_data
from etl.paths import EXPLORERS_DIR
from etl.version_tracker import VersionTracker

Expand Down
2 changes: 1 addition & 1 deletion apps/metadata_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def cli(
var_id = grapher_config["dimensions"][0]["variableId"]

with Session(engine) as session:
variable = gm.Variable.load_variable(session, var_id)
variable = gm.Variable.from_id_or_path(session, var_id)

assert variable.catalogPath, f"Variable {var_id} does not come from ETL. Migrate it there first."

Expand Down
17 changes: 6 additions & 11 deletions apps/wizard/app_pages/anomalist.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@
from pydantic import BaseModel, Field, ValidationError

from apps.utils.gpt import OpenAIWrapper, get_cost_and_tokens
from apps.wizard.utils import cached
from apps.wizard.utils.components import grapher_chart, st_horizontal
from apps.wizard.utils.dataset import load_datasets_uri_from_db
from apps.wizard.utils.indicator import (
load_indicator_uris_from_db,
load_variable_data_cached,
)

# load_variable_metadata_cached,
from etl.config import OWID_ENV
from etl.grapher_io import load_variables_in_dataset

# PAGE CONFIG
st.set_page_config(
Expand Down Expand Up @@ -47,7 +42,7 @@
)
st.session_state.datasets_selected = st.multiselect(
"Select datasets",
options=load_datasets_uri_from_db(),
options=cached.load_dataset_uris(),
max_selections=3,
)

Expand All @@ -59,7 +54,7 @@
# GET INDICATORS
if len(st.session_state.datasets_selected) > 0:
# Get indicator uris for all selected datasets
indicators = load_indicator_uris_from_db(st.session_state.datasets_selected)
indicators = load_variables_in_dataset(st.session_state.datasets_selected)

for indicator in indicators:
catalog_path = cast(str, indicator.catalogPath)
Expand Down Expand Up @@ -124,10 +119,10 @@ def get_anomaly_gpt(indicator_id: str, indicator_uri: str, dataset_name: str, in
# Open AI (do first to catch possible errors in ENV)
# Prepare messages for Insighter

data = load_variable_data_cached(variable_id=int(indicator_id))
data = cached.load_variable_data(variable_id=int(indicator_id))
data_1 = data.pivot(index="years", columns="entity", values="values") # .head(20)
data_1 = data_1.dropna(axis=1, how="all")
data_1_str = data_1.to_csv().replace(".0,", ",")
data_1_str = cast(str, data_1.to_csv()).replace(".0,", ",")

num_anomalies = 3

Expand Down
7 changes: 3 additions & 4 deletions apps/wizard/app_pages/anomalist_2.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import pandas as pd
import streamlit as st

from apps.wizard.utils import cached
from apps.wizard.utils.components import grapher_chart, st_horizontal
from apps.wizard.utils.dataset import load_datasets_uri_from_db
from apps.wizard.utils.indicator import load_indicator_uris_from_db

# PAGE CONFIG
st.set_page_config(
Expand Down Expand Up @@ -39,7 +38,7 @@
with st.form(key="dataset_search"):
st.session_state.datasets_selected = st.multiselect(
"Select datasets",
options=load_datasets_uri_from_db(),
options=cached.load_dataset_uris(),
max_selections=1,
)

Expand All @@ -51,7 +50,7 @@
st.markdown("##### Filter Parameters")
options = []
if len(st.session_state.datasets_selected) > 0:
st.session_state.indicators = load_indicator_uris_from_db(st.session_state.datasets_selected)
st.session_state.indicators = cached.load_variables_in_dataset(st.session_state.datasets_selected)
options = [o.catalogPath for o in st.session_state.indicators]

st.session_state.filter_indicators = st.multiselect(
Expand Down
2 changes: 1 addition & 1 deletion apps/wizard/app_pages/chart_diff/chart_diff_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
import etl.grapher_model as gm
from apps.backport.datasync.data_metadata import (
filter_out_fields_in_metadata_for_checksum,
variable_metadata_df_from_s3,
)
from apps.utils.gpt import OpenAIWrapper, get_cost_and_tokens
from apps.wizard.app_pages.chart_diff.chart_diff import ChartDiff, ChartDiffsLoader
from apps.wizard.app_pages.chart_diff.conflict_resolver import ChartDiffConflictResolver
from apps.wizard.app_pages.chart_diff.utils import SOURCE, TARGET, prettify_date
from apps.wizard.utils.components import grapher_chart
from etl.config import OWID_ENV
from etl.grapher_io import variable_metadata_df_from_s3

# How to display the various chart review statuses
DISPLAY_STATE_OPTIONS = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import streamlit as st
from structlog import get_logger

from apps.backport.datasync.data_metadata import variable_data_df_from_s3
from apps.wizard.app_pages.indicator_upgrade.explore_mode import st_explore_indicator
from apps.wizard.app_pages.indicator_upgrade.utils import (
find_mapping_suggestions_cached,
Expand All @@ -17,6 +16,7 @@
from apps.wizard.utils import Pagination, set_states
from etl.config import OWID_ENV
from etl.db import get_engine, read_sql
from etl.grapher_io import variable_data_df_from_s3

# Logger
log = get_logger()
Expand Down
4 changes: 3 additions & 1 deletion apps/wizard/app_pages/indicator_upgrade/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from structlog import get_logger

from apps.utils.map_datasets import get_grapher_changes
from etl.db import config, get_all_datasets, get_connection, get_dataset_charts, get_variables_in_dataset
from etl import config
from etl.db import get_connection
from etl.git_helpers import get_changed_files
from etl.grapher_io import get_all_datasets, get_dataset_charts, get_variables_in_dataset
from etl.match_variables import find_mapping_suggestions, preliminary_mapping
from etl.version_tracker import VersionTracker

Expand Down
2 changes: 1 addition & 1 deletion apps/wizard/app_pages/map_brackets.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def load_variable_from_id(variable_id: int):
@st.cache_data
def load_variable_from_catalog_path(catalog_path: str):
with Session(OWID_ENV.engine) as session:
variable = Variable.load_from_catalog_path(session=session, catalog_path=catalog_path)
variable = Variable.from_catalog_path(session=session, catalog_path=catalog_path)

return variable

Expand Down
2 changes: 1 addition & 1 deletion apps/wizard/app_pages/metaplay.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get_data_page_url() -> str:
# The following port is defined in one of owid-grapher's config files.
HOST = "localhost:3030"
with get_session() as session:
VARIABLE_ID = gm.Variable.load_from_catalog_path(session, CATALOG_PATH).id
VARIABLE_ID = gm.Variable.from_catalog_path(session, CATALOG_PATH).id
url = f"http://{HOST}/admin/datapage-preview/{VARIABLE_ID}"
return url

Expand Down
52 changes: 52 additions & 0 deletions apps/wizard/utils/cached.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Any, Dict, List, Optional

import pandas as pd
import streamlit as st

from etl import grapher_io as io
from etl.config import OWID_ENV, OWIDEnv
from etl.grapher_model import Variable


@st.cache_data
def load_dataset_uris() -> List[str]:
return load_dataset_uris()


@st.cache_data
def load_variables_in_dataset(
dataset_uri: List[str],
_owid_env: OWIDEnv = OWID_ENV,
) -> List[Variable]:
"""Load Variable objects that belong to a dataset with URI `dataset_uri`."""
return load_variables_in_dataset(dataset_uri, _owid_env)


@st.cache_data
def load_variable_metadata(
catalog_path: Optional[str] = None,
variable_id: Optional[int] = None,
variable: Optional[Variable] = None,
_owid_env: OWIDEnv = OWID_ENV,
) -> Dict[str, Any]:
return io.load_variable_metadata(
catalog_path=catalog_path,
variable_id=variable_id,
variable=variable,
owid_env=_owid_env,
)


@st.cache_data
def load_variable_data(
catalog_path: Optional[str] = None,
variable_id: Optional[int] = None,
variable: Optional[Variable] = None,
_owid_env: OWIDEnv = OWID_ENV,
) -> pd.DataFrame:
return io.load_variable_data(
catalog_path=catalog_path,
variable_id=variable_id,
variable=variable,
owid_env=_owid_env,
)
Loading
Loading