Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔨 engineering: switch from threads to asyncio in grapher upserts #3255

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions apps/backport/datasync/data_metadata.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
import numpy as np
import pandas as pd
from sqlalchemy import text
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from structlog import get_logger

log = get_logger()
@@ -24,7 +24,7 @@ def variable_data(data_df: pd.DataFrame) -> Dict[str, Any]:
return data # type: ignore


def _load_variable(session: Session, variable_id: int) -> Dict[str, Any]:
async def _load_variable(session: AsyncSession, variable_id: int) -> Dict[str, Any]:
sql = """
SELECT
variables.*,
@@ -41,14 +41,14 @@ def _load_variable(session: Session, variable_id: int) -> Dict[str, Any]:
"""

# Using the session to execute raw SQL and fetching one row as a result
result = session.execute(text(sql), {"variable_id": variable_id}).fetchone()
result = (await session.execute(text(sql), {"variable_id": variable_id})).fetchone()

# Ensure result exists and convert to dictionary
assert result, f"variableId `{variable_id}` not found"
return dict(result._mapping)


def _load_topic_tags(session: Session, variable_id: int) -> List[str]:
async def _load_topic_tags(session: AsyncSession, variable_id: int) -> List[str]:
sql = """
SELECT
tags.name
@@ -59,13 +59,13 @@ def _load_topic_tags(session: Session, variable_id: int) -> List[str]:
"""

# Using the session to execute raw SQL
result = session.execute(text(sql), {"variable_id": variable_id}).fetchall()
result = (await session.execute(text(sql), {"variable_id": variable_id})).fetchall()

# Extract tag names from the result and return as a list
return [row[0] for row in result]


def _load_faqs(session: Session, variable_id: int) -> List[Dict[str, Any]]:
async def _load_faqs(session: AsyncSession, variable_id: int) -> List[Dict[str, Any]]:
sql = """
SELECT
gdocId,
@@ -76,13 +76,13 @@ def _load_faqs(session: Session, variable_id: int) -> List[Dict[str, Any]]:
"""

# Using the session to execute raw SQL
result = session.execute(text(sql), {"variable_id": variable_id}).fetchall()
result = (await session.execute(text(sql), {"variable_id": variable_id})).fetchall()

# Convert the result rows to a list of dictionaries
return [dict(row._mapping) for row in result]


def _load_origins_df(session: Session, variable_id: int) -> pd.DataFrame:
async def _load_origins_df(session: AsyncSession, variable_id: int) -> pd.DataFrame:
sql = """
SELECT
origins.*
@@ -93,7 +93,7 @@ def _load_origins_df(session: Session, variable_id: int) -> pd.DataFrame:
"""

# Use the session to execute the raw SQL
result_proxy = session.execute(text(sql), {"variable_id": variable_id})
result_proxy = await session.execute(text(sql), {"variable_id": variable_id})

# Fetch the results into a DataFrame
df = pd.DataFrame(result_proxy.fetchall(), columns=result_proxy.keys())
@@ -226,17 +226,22 @@ def _move_population_origin_to_end(origins: List[Dict[str, Any]]) -> List[Dict[s
return new_origins


def variable_metadata(session: Session, variable_id: int, variable_data: pd.DataFrame) -> Dict[str, Any]:
async def variable_metadata(session: AsyncSession, variable_id: int, variable_data: pd.DataFrame) -> Dict[str, Any]:
"""Fetch metadata for a single variable from database. This function was initially based on the
one from owid-grapher repository and uses raw SQL commands. It'd be interesting to rewrite it
using SQLAlchemy ORM in grapher_model.py.
"""
task_variable = _load_variable(session, variable_id)
task_origins = _load_origins_df(session, variable_id)
task_topic_tags = _load_topic_tags(session, variable_id)
task_faqs = _load_faqs(session, variable_id)

return _variable_metadata(
db_variable_row=_load_variable(session, variable_id),
db_variable_row=await task_variable,
variable_data=variable_data,
db_origins_df=_load_origins_df(session, variable_id),
db_topic_tags=_load_topic_tags(session, variable_id),
db_faqs=_load_faqs(session, variable_id),
db_origins_df=await task_origins,
db_topic_tags=await task_topic_tags,
db_faqs=await task_faqs,
)


24 changes: 24 additions & 0 deletions apps/backport/datasync/datasync.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import gzip
import json
from typing import Any, Dict
@@ -17,6 +18,9 @@
config.enable_bugsnag()


R2_UPLOAD_SEMAPHORE = asyncio.Semaphore(10)


def upload_gzip_dict(d: Dict[str, Any], s3_path: str, private: bool = False) -> None:
return upload_gzip_string(json.dumps(d, default=str), s3_path=s3_path, private=private)

@@ -46,3 +50,23 @@ def upload_gzip_string(s: str, s3_path: str, private: bool = False) -> None:
ContentType="application/json",
**extra_args,
)


async def upload_gzip_string_async(client: Any, s: str, s3_path: str, private: bool = False) -> None:
"""Upload compressed dictionary to S3 and return its URL."""
body_gzip = gzip.compress(s.encode())

bucket, key = s3_utils.s3_bucket_key(s3_path)

assert not private, "r2 does not support private files yet"
extra_args = {}

async with R2_UPLOAD_SEMAPHORE:
await client.put_object(
Bucket=bucket,
Body=body_gzip,
Key=key,
ContentEncoding="gzip",
ContentType="application/json",
**extra_args,
)
28 changes: 16 additions & 12 deletions apps/chart_sync/admin_api.py
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
from functools import cache
from typing import Any, Dict, List, Optional

import aiohttp
import requests
import structlog
from requests.adapters import HTTPAdapter, Retry
@@ -86,21 +87,24 @@ def set_tags(self, chart_id: int, tags: List[Dict[str, Any]], user_id: Optional[
raise AdminAPIError({"error": js["error"], "tags": tags})
return js

def put_grapher_config(self, variable_id: int, grapher_config: Dict[str, Any]) -> dict:
async def put_grapher_config(self, variable_id: int, grapher_config: Dict[str, Any]) -> dict:
# If schema is missing, use the default one
grapher_config.setdefault("$schema", DEFAULT_GRAPHER_SCHEMA)

# Retry in case we're restarting Admin on staging server
resp = requests_with_retry().put(
self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL",
cookies={"sessionid": self.session_id},
json=grapher_config,
)
js = self._json_from_response(resp)
if not js["success"]:
raise AdminAPIError({"error": js["error"], "variable_id": variable_id, "grapher_config": grapher_config})
return js

async with aiohttp.ClientSession(cookies={"sessionid": self.session_id}) as session:
async with session.put(
self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL",
json=grapher_config,
) as resp:
# TODO: make _json_from_response async
js = await resp.json()
if not js["success"]:
raise AdminAPIError(
{"error": js["error"], "variable_id": variable_id, "grapher_config": grapher_config}
)
return js

# TODO: make it async
def delete_grapher_config(self, variable_id: int) -> dict:
resp = requests.delete(
self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL",
10 changes: 1 addition & 9 deletions etl/command.py
Original file line number Diff line number Diff line change
@@ -191,14 +191,9 @@ def main_cli(

# make everything single threaded, useful for debugging
if not use_threads:
config.GRAPHER_INSERT_WORKERS = 1
config.DIRTY_STEPS_WORKERS = 1
workers = 1

# GRAPHER_INSERT_WORKERS should be split among workers
if workers > 1:
config.GRAPHER_INSERT_WORKERS = config.GRAPHER_INSERT_WORKERS // workers

kwargs = dict(
steps=steps,
dry_run=dry_run,
@@ -223,7 +218,6 @@ def main_cli(
for _ in runs:
if ipdb:
config.IPDB_ENABLED = True
config.GRAPHER_INSERT_WORKERS = 1
config.DIRTY_STEPS_WORKERS = 1
kwargs["workers"] = 1
with launch_ipdb_on_exception():
@@ -384,9 +378,7 @@ def run_dag(
)
return exec_steps(steps, strict=strict)
else:
print(
f"--- Running {len(steps)} steps with {workers} processes ({config.GRAPHER_INSERT_WORKERS} threads each):"
)
print(f"--- Running {len(steps)} steps with {workers} processes:")
return exec_steps_parallel(steps, workers, dag=dag, strict=strict)


4 changes: 0 additions & 4 deletions etl/config.py
Original file line number Diff line number Diff line change
@@ -178,10 +178,6 @@ def variable_metadata_url(variable_id):
# because we're making a lot of HTTP requests
DIRTY_STEPS_WORKERS = int(env.get("DIRTY_STEPS_WORKERS", 5))

# number of workers for grapher inserts to DB, this is for all processes, so if
# --workers is higher than 1, this will be divided among them
GRAPHER_INSERT_WORKERS = int(env.get("GRAPHER_WORKERS", 40))

# only upsert indicators matching this filter, this is useful for fast development
# of data pages for a single indicator
GRAPHER_FILTER = env.get("GRAPHER_FILTER", None)
11 changes: 11 additions & 0 deletions etl/db.py
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
from deprecated import deprecated
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy.orm import Session

from etl import config
@@ -61,6 +62,16 @@ def get_engine(conf: Optional[Dict[str, Any]] = None) -> Engine:
return _get_engine_cached(cf, pid)


def get_engine_async(conf: Optional[Dict[str, Any]] = None) -> AsyncEngine:
cf: Any = dict_to_object(conf) if conf else config
engine = create_async_engine(
f"mysql+aiomysql://{cf.DB_USER}:{quote(cf.DB_PASS)}@{cf.DB_HOST}:{cf.DB_PORT}/{cf.DB_NAME}",
pool_size=30, # Increase pool size
max_overflow=50, # Increase overflow limit
)
return engine


def dict_to_object(d):
return type("DynamicObject", (object,), d)()

Loading