diff --git a/apps/backport/datasync/data_metadata.py b/apps/backport/datasync/data_metadata.py index 1707164e42a..db46843b1e6 100644 --- a/apps/backport/datasync/data_metadata.py +++ b/apps/backport/datasync/data_metadata.py @@ -5,7 +5,7 @@ import numpy as np import pandas as pd from sqlalchemy import text -from sqlalchemy.orm import Session +from sqlalchemy.ext.asyncio import AsyncSession from structlog import get_logger log = get_logger() @@ -24,7 +24,7 @@ def variable_data(data_df: pd.DataFrame) -> Dict[str, Any]: return data # type: ignore -def _load_variable(session: Session, variable_id: int) -> Dict[str, Any]: +async def _load_variable(session: AsyncSession, variable_id: int) -> Dict[str, Any]: sql = """ SELECT variables.*, @@ -41,14 +41,14 @@ def _load_variable(session: Session, variable_id: int) -> Dict[str, Any]: """ # Using the session to execute raw SQL and fetching one row as a result - result = session.execute(text(sql), {"variable_id": variable_id}).fetchone() + result = (await session.execute(text(sql), {"variable_id": variable_id})).fetchone() # Ensure result exists and convert to dictionary assert result, f"variableId `{variable_id}` not found" return dict(result._mapping) -def _load_topic_tags(session: Session, variable_id: int) -> List[str]: +async def _load_topic_tags(session: AsyncSession, variable_id: int) -> List[str]: sql = """ SELECT tags.name @@ -59,13 +59,13 @@ def _load_topic_tags(session: Session, variable_id: int) -> List[str]: """ # Using the session to execute raw SQL - result = session.execute(text(sql), {"variable_id": variable_id}).fetchall() + result = (await session.execute(text(sql), {"variable_id": variable_id})).fetchall() # Extract tag names from the result and return as a list return [row[0] for row in result] -def _load_faqs(session: Session, variable_id: int) -> List[Dict[str, Any]]: +async def _load_faqs(session: AsyncSession, variable_id: int) -> List[Dict[str, Any]]: sql = """ SELECT gdocId, @@ -76,13 +76,13 @@ def _load_faqs(session: Session, variable_id: int) -> List[Dict[str, Any]]: """ # Using the session to execute raw SQL - result = session.execute(text(sql), {"variable_id": variable_id}).fetchall() + result = (await session.execute(text(sql), {"variable_id": variable_id})).fetchall() # Convert the result rows to a list of dictionaries return [dict(row._mapping) for row in result] -def _load_origins_df(session: Session, variable_id: int) -> pd.DataFrame: +async def _load_origins_df(session: AsyncSession, variable_id: int) -> pd.DataFrame: sql = """ SELECT origins.* @@ -93,7 +93,7 @@ def _load_origins_df(session: Session, variable_id: int) -> pd.DataFrame: """ # Use the session to execute the raw SQL - result_proxy = session.execute(text(sql), {"variable_id": variable_id}) + result_proxy = await session.execute(text(sql), {"variable_id": variable_id}) # Fetch the results into a DataFrame df = pd.DataFrame(result_proxy.fetchall(), columns=result_proxy.keys()) @@ -226,17 +226,22 @@ def _move_population_origin_to_end(origins: List[Dict[str, Any]]) -> List[Dict[s return new_origins -def variable_metadata(session: Session, variable_id: int, variable_data: pd.DataFrame) -> Dict[str, Any]: +async def variable_metadata(session: AsyncSession, variable_id: int, variable_data: pd.DataFrame) -> Dict[str, Any]: """Fetch metadata for a single variable from database. This function was initially based on the one from owid-grapher repository and uses raw SQL commands. It'd be interesting to rewrite it using SQLAlchemy ORM in grapher_model.py. """ + task_variable = _load_variable(session, variable_id) + task_origins = _load_origins_df(session, variable_id) + task_topic_tags = _load_topic_tags(session, variable_id) + task_faqs = _load_faqs(session, variable_id) + return _variable_metadata( - db_variable_row=_load_variable(session, variable_id), + db_variable_row=await task_variable, variable_data=variable_data, - db_origins_df=_load_origins_df(session, variable_id), - db_topic_tags=_load_topic_tags(session, variable_id), - db_faqs=_load_faqs(session, variable_id), + db_origins_df=await task_origins, + db_topic_tags=await task_topic_tags, + db_faqs=await task_faqs, ) diff --git a/apps/backport/datasync/datasync.py b/apps/backport/datasync/datasync.py index a5c9a63c537..552203cab1b 100644 --- a/apps/backport/datasync/datasync.py +++ b/apps/backport/datasync/datasync.py @@ -1,3 +1,4 @@ +import asyncio import gzip import json from typing import Any, Dict @@ -17,6 +18,9 @@ config.enable_bugsnag() +R2_UPLOAD_SEMAPHORE = asyncio.Semaphore(10) + + def upload_gzip_dict(d: Dict[str, Any], s3_path: str, private: bool = False) -> None: return upload_gzip_string(json.dumps(d, default=str), s3_path=s3_path, private=private) @@ -46,3 +50,23 @@ def upload_gzip_string(s: str, s3_path: str, private: bool = False) -> None: ContentType="application/json", **extra_args, ) + + +async def upload_gzip_string_async(client: Any, s: str, s3_path: str, private: bool = False) -> None: + """Upload compressed dictionary to S3 and return its URL.""" + body_gzip = gzip.compress(s.encode()) + + bucket, key = s3_utils.s3_bucket_key(s3_path) + + assert not private, "r2 does not support private files yet" + extra_args = {} + + async with R2_UPLOAD_SEMAPHORE: + await client.put_object( + Bucket=bucket, + Body=body_gzip, + Key=key, + ContentEncoding="gzip", + ContentType="application/json", + **extra_args, + ) diff --git a/apps/chart_sync/admin_api.py b/apps/chart_sync/admin_api.py index 5dd3f73eefd..2979e69caa5 100644 --- a/apps/chart_sync/admin_api.py +++ b/apps/chart_sync/admin_api.py @@ -6,6 +6,7 @@ from functools import cache from typing import Any, Dict, List, Optional +import aiohttp import requests import structlog from requests.adapters import HTTPAdapter, Retry @@ -86,21 +87,24 @@ def set_tags(self, chart_id: int, tags: List[Dict[str, Any]], user_id: Optional[ raise AdminAPIError({"error": js["error"], "tags": tags}) return js - def put_grapher_config(self, variable_id: int, grapher_config: Dict[str, Any]) -> dict: + async def put_grapher_config(self, variable_id: int, grapher_config: Dict[str, Any]) -> dict: # If schema is missing, use the default one grapher_config.setdefault("$schema", DEFAULT_GRAPHER_SCHEMA) - # Retry in case we're restarting Admin on staging server - resp = requests_with_retry().put( - self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL", - cookies={"sessionid": self.session_id}, - json=grapher_config, - ) - js = self._json_from_response(resp) - if not js["success"]: - raise AdminAPIError({"error": js["error"], "variable_id": variable_id, "grapher_config": grapher_config}) - return js - + async with aiohttp.ClientSession(cookies={"sessionid": self.session_id}) as session: + async with session.put( + self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL", + json=grapher_config, + ) as resp: + # TODO: make _json_from_response async + js = await resp.json() + if not js["success"]: + raise AdminAPIError( + {"error": js["error"], "variable_id": variable_id, "grapher_config": grapher_config} + ) + return js + + # TODO: make it async def delete_grapher_config(self, variable_id: int) -> dict: resp = requests.delete( self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL", diff --git a/etl/command.py b/etl/command.py index c70b8ec046c..ae9a99badd3 100644 --- a/etl/command.py +++ b/etl/command.py @@ -191,14 +191,9 @@ def main_cli( # make everything single threaded, useful for debugging if not use_threads: - config.GRAPHER_INSERT_WORKERS = 1 config.DIRTY_STEPS_WORKERS = 1 workers = 1 - # GRAPHER_INSERT_WORKERS should be split among workers - if workers > 1: - config.GRAPHER_INSERT_WORKERS = config.GRAPHER_INSERT_WORKERS // workers - kwargs = dict( steps=steps, dry_run=dry_run, @@ -223,7 +218,6 @@ def main_cli( for _ in runs: if ipdb: config.IPDB_ENABLED = True - config.GRAPHER_INSERT_WORKERS = 1 config.DIRTY_STEPS_WORKERS = 1 kwargs["workers"] = 1 with launch_ipdb_on_exception(): @@ -384,9 +378,7 @@ def run_dag( ) return exec_steps(steps, strict=strict) else: - print( - f"--- Running {len(steps)} steps with {workers} processes ({config.GRAPHER_INSERT_WORKERS} threads each):" - ) + print(f"--- Running {len(steps)} steps with {workers} processes:") return exec_steps_parallel(steps, workers, dag=dag, strict=strict) diff --git a/etl/config.py b/etl/config.py index 25d03772b24..d03b49faffc 100644 --- a/etl/config.py +++ b/etl/config.py @@ -178,10 +178,6 @@ def variable_metadata_url(variable_id): # because we're making a lot of HTTP requests DIRTY_STEPS_WORKERS = int(env.get("DIRTY_STEPS_WORKERS", 5)) -# number of workers for grapher inserts to DB, this is for all processes, so if -# --workers is higher than 1, this will be divided among them -GRAPHER_INSERT_WORKERS = int(env.get("GRAPHER_WORKERS", 40)) - # only upsert indicators matching this filter, this is useful for fast development # of data pages for a single indicator GRAPHER_FILTER = env.get("GRAPHER_FILTER", None) diff --git a/etl/db.py b/etl/db.py index a1b40706eb1..95539243056 100644 --- a/etl/db.py +++ b/etl/db.py @@ -10,6 +10,7 @@ from deprecated import deprecated from sqlalchemy import create_engine from sqlalchemy.engine import Engine +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from sqlalchemy.orm import Session from etl import config @@ -61,6 +62,16 @@ def get_engine(conf: Optional[Dict[str, Any]] = None) -> Engine: return _get_engine_cached(cf, pid) +def get_engine_async(conf: Optional[Dict[str, Any]] = None) -> AsyncEngine: + cf: Any = dict_to_object(conf) if conf else config + engine = create_async_engine( + f"mysql+aiomysql://{cf.DB_USER}:{quote(cf.DB_PASS)}@{cf.DB_HOST}:{cf.DB_PORT}/{cf.DB_NAME}", + pool_size=30, # Increase pool size + max_overflow=50, # Increase overflow limit + ) + return engine + + def dict_to_object(d): return type("DynamicObject", (object,), d)() diff --git a/etl/grapher_import.py b/etl/grapher_import.py index e284b22eaf8..f28dc834193 100644 --- a/etl/grapher_import.py +++ b/etl/grapher_import.py @@ -9,31 +9,33 @@ >>> import_dataset.main(dataset_dir, dataset_namespace) """ +import asyncio import datetime import json import os import warnings -from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from threading import Lock from typing import Dict, List, Optional, cast import pandas as pd import structlog +from aiobotocore.session import get_session from owid import catalog -from owid.catalog import Table, VariableMeta, utils +from owid.catalog import Table, VariableMeta, s3_utils, utils from owid.catalog.utils import hash_any from sqlalchemy import select, text, update from sqlalchemy.engine.base import Engine from sqlalchemy.exc import NoResultFound +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker from sqlalchemy.orm import Session from apps.backport.datasync import data_metadata as dm -from apps.backport.datasync.datasync import upload_gzip_string +from apps.backport.datasync.datasync import upload_gzip_string_async from apps.chart_sync.admin_api import AdminAPI from apps.wizard.app_pages.chart_diff.chart_diff import ChartDiffsLoader from etl import config -from etl.db import get_engine, production_or_master_engine +from etl.db import get_engine, get_engine_async, production_or_master_engine from . import grapher_helpers as gh from . import grapher_model as gm @@ -120,23 +122,26 @@ def upsert_dataset( return DatasetUpsertResult(ds.id, source_ids) -def _upsert_source_to_db(session: Session, source: catalog.Source, dataset_id: int) -> int: +async def _upsert_source_to_db(session: AsyncSession, source: catalog.Source, dataset_id: int) -> int: """Upsert source and return its id""" # NOTE: we need the lock because upserts can happen in multiple threads and `sources` table # has no unique constraint on `name`. It can be removed once we switch to variable views # and stop using threads with source_table_lock: - db_source = gm.Source.from_catalog_source(source, dataset_id).upsert(session) + db_source = await gm.Source.from_catalog_source(source, dataset_id).upsert(session) # commit within the lock to make sure other threads get the latest sources - session.commit() + await session.commit() assert db_source.id return db_source.id -def _add_or_update_source( - session: Session, variable_meta: catalog.VariableMeta, column_name: str, dataset_upsert_result: DatasetUpsertResult +async def _add_or_update_source( + session: AsyncSession, + variable_meta: catalog.VariableMeta, + column_name: str, + dataset_upsert_result: DatasetUpsertResult, ) -> Optional[int]: if not variable_meta.sources: assert variable_meta.origins, "Variable must have either sources or origins" @@ -163,11 +168,11 @@ def _add_or_update_source( return source_id -def _add_or_update_origins(session: Session, origins: list[catalog.Origin]) -> list[gm.Origin]: +async def _add_or_update_origins(session: AsyncSession, origins: list[catalog.Origin]) -> list[gm.Origin]: out = [] assert len(origins) == len(set(origins)), "origins must be unique" for origin in origins: - out.append(gm.Origin.from_origin(origin).upsert(session)) + out.append(await gm.Origin.from_origin(origin).upsert(session)) return out @@ -225,8 +230,12 @@ def _check_upserted_table(table: Table) -> None: assert not gh.contains_inf(table.iloc[:, 0]), f"Column `{table.columns[0]}` has inf values" -def upsert_table( - engine: Engine, +semaphore = asyncio.Semaphore(50) + + +async def upsert_table( + engine_async: AsyncEngine, + client, admin_api: AdminAPI, table: Table, dataset_upsert_result: DatasetUpsertResult, @@ -260,78 +269,82 @@ def upsert_table( checksum_data = calculate_checksum_data(df) checksum_metadata = calculate_checksum_metadata(variable_meta, df) - with Session(engine) as session: - # compare checksums - try: - db_variable = gm.Variable.from_catalog_path(session, catalog_path) - except NoResultFound: - db_variable = None - - upsert_metadata_kwargs = dict( - session=session, - df=df, - variable_meta=variable_meta, + # NOTE: this is useful for debugging, will be removed once it is stable + if os.environ.get("SKIP_CHECKSUMS"): + import random + + checksum_metadata = str(random.randint(0, 10000)) + checksum_data = str(random.randint(0, 10000)) + + async with semaphore: + log.info( + "upsert_dataset.upsert_table.start", column_name=column_name, - dataset_upsert_result=dataset_upsert_result, - catalog_path=catalog_path, - dimensions=dimensions, - admin_api=admin_api, ) - # create variable if it doesn't exist - if not db_variable: - db_variable = upsert_metadata(**upsert_metadata_kwargs) - upsert_data(df, db_variable.s3_data_path()) + async_session = async_sessionmaker(engine_async, expire_on_commit=False) + async with async_session() as session: + # compare checksums + try: + db_variable = await gm.Variable.load_from_catalog_path_async(session, catalog_path) + except NoResultFound: + db_variable = None + + upsert_metadata_kwargs = dict( + session=session, + df=df, + variable_meta=variable_meta, + column_name=column_name, + dataset_upsert_result=dataset_upsert_result, + catalog_path=catalog_path, + dimensions=dimensions, + admin_api=admin_api, + client=client, + ) + + # create variable if it doesn't exist + if not db_variable: + db_variable = await upsert_metadata(**upsert_metadata_kwargs) + await upsert_data(df, db_variable.s3_data_path(), client) - # variable exists - else: - if db_variable.dataChecksum == checksum_data and db_variable.metadataChecksum == checksum_metadata: - if verbose: - log.info("upsert_table.skipped_no_changes", size=len(df), variable_id=db_variable.id) - return - - # NOTE: sequantial upserts are slower than parallel, but they will be useful once we switch to asyncio - # if db_variable.dataChecksum != checksum_data: - # upsert_data(df, db_variable.s3_data_path()) - # if db_variable.metadataChecksum != checksum_metadata: - # db_variable = upsert_metadata(**upsert_metadata_kwargs) - - futures = {} - with ThreadPoolExecutor() as executor: + # variable exists + else: + if db_variable.dataChecksum == checksum_data and db_variable.metadataChecksum == checksum_metadata: + if verbose: + log.info("upsert_table.skipped_no_changes", size=len(df), variable_id=db_variable.id) + return + + upsert_data_task = None if db_variable.dataChecksum != checksum_data: - futures["data"] = executor.submit(upsert_data, df, db_variable.s3_data_path()) + upsert_data_task = upsert_data(df, db_variable.s3_data_path(), client) if db_variable.metadataChecksum != checksum_metadata: - futures["metadata"] = executor.submit(upsert_metadata, **upsert_metadata_kwargs) + db_variable = await upsert_metadata(**upsert_metadata_kwargs) - if futures: - # Wait for futures to complete in case exceptions are raised - if "data" in futures: - futures["data"].result() - if "metadata" in futures: - db_variable = futures["metadata"].result() + if upsert_data_task: + await upsert_data_task - # Update checksums - db_variable.dataChecksum = checksum_data - db_variable.metadataChecksum = checksum_metadata + # Update checksums + db_variable.dataChecksum = checksum_data + db_variable.metadataChecksum = checksum_metadata - # Commit new checksums - session.add(db_variable) - session.commit() + # Commit new checksums + session.add(db_variable) + await session.commit() - if verbose: - log.info("upsert_table.uploaded_to_s3", size=len(df), variable_id=db_variable.id) + if verbose: + log.info("upsert_table.uploaded_to_s3", size=len(df), variable_id=db_variable.id) -def upsert_data(df: pd.DataFrame, s3_data_path: str): +async def upsert_data(df: pd.DataFrame, s3_data_path: str, client) -> None: # upload data to R2 var_data = dm.variable_data(df) var_data_str = json.dumps(var_data, default=str) - upload_gzip_string(var_data_str, s3_data_path) + await upload_gzip_string_async(client, var_data_str, s3_data_path) -def upsert_metadata( - session: Session, +async def upsert_metadata( + session: AsyncSession, df: pd.DataFrame, variable_meta: VariableMeta, column_name: str, @@ -339,15 +352,16 @@ def upsert_metadata( catalog_path: str, dimensions: Optional[gm.Dimensions], admin_api: AdminAPI, + client, ) -> gm.Variable: timespan = _get_timespan(df, variable_meta) - source_id = _add_or_update_source(session, variable_meta, column_name, dataset_upsert_result) + source_id = await _add_or_update_source(session, variable_meta, column_name, dataset_upsert_result) - with origins_table_lock: - db_origins = _add_or_update_origins(session, variable_meta.origins) - # commit within the lock to make sure other threads get the latest sources - session.commit() + # with origins_table_lock: + db_origins = await _add_or_update_origins(session, variable_meta.origins) + # commit within the lock to make sure other threads get the latest sources + await session.commit() # pop grapher_config from variable metadata, later we send it to Admin API if variable_meta.presentation and variable_meta.presentation.grapher_config: @@ -356,7 +370,7 @@ def upsert_metadata( else: grapher_config = None - db_variable = gm.Variable.from_variable_metadata( + db_variable = await gm.Variable.from_variable_metadata( variable_meta, short_name=column_name, timespan=timespan, @@ -373,33 +387,38 @@ def upsert_metadata( db_variable.type = db_variable.infer_type(df["value"]) # update links, we need to do it after we commit deleted relationships above - db_variable.update_links( + update_links_task = db_variable.update_links( session, db_origins, faqs=variable_meta.presentation.faqs if variable_meta.presentation else [], tag_names=variable_meta.presentation.topic_tags if variable_meta.presentation else [], ) - session.add(db_variable) # we need to commit changes because `dm.variable_metadata` pulls all data from MySQL # and sends it to R2 # NOTE: we could optimize this by evading pulling from MySQL and instead constructing JSON files from objects # we have available - session.commit() + await session.commit() # grapher_config needs to be sent to Admin API because it has side effects + put_grapher_config_task = None if grapher_config: - admin_api.put_grapher_config(db_variable_id, grapher_config) + put_grapher_config_task = admin_api.put_grapher_config(db_variable_id, grapher_config) # grapher_config does not exist, but it's still in the database -> delete it elif not grapher_config and db_variable.grapherConfigIdETL: admin_api.delete_grapher_config(db_variable_id) # upload metadata to R2 - var_metadata = dm.variable_metadata(session, db_variable.id, df) + var_metadata = await dm.variable_metadata(session, db_variable.id, df) var_metadata_str = json.dumps(var_metadata, default=str) # upload them to R2 - upload_gzip_string(var_metadata_str, db_variable.s3_metadata_path()) + await upload_gzip_string_async(client, var_metadata_str, db_variable.s3_metadata_path()) + + # await all tasks + await update_links_task + if put_grapher_config_task: + await put_grapher_config_task return db_variable @@ -627,3 +646,64 @@ def _get_timespan(table: pd.DataFrame, variable_meta: VariableMeta) -> str: min_year = min(years) max_year = max(years) return f"{min_year}-{max_year}" + + +async def _upsert_tables_from_dataset_async(step_path, dataset, engine, admin_api, dataset_upsert_results): + i = 0 + catalog_paths = [] + tasks = [] + verbose = True + + engine_async = get_engine_async() + + async with get_session().create_client(**s3_utils.r2_config()) as client: + # NOTE: multiple tables will be saved under a single dataset, this could cause problems if someone + # is fetching the whole dataset from data-api as they would receive all tables merged in a single + # table. This won't be a problem after we introduce the concept of "tables" + for table in dataset: + assert not table.empty, f"table {table.metadata.short_name} is empty" + + # if GRAPHER_FILTER is set, only upsert matching columns + if config.GRAPHER_FILTER: + cols = table.filter(regex=config.GRAPHER_FILTER).columns.tolist() + if not cols: + continue + cols += [c for c in table.columns if c in {"year", "date", "country"} and c not in cols] + table = table.loc[:, cols] + + table = gh._adapt_table_for_grapher(table, engine) + + for t in gh._yield_wide_table(table, na_action="drop"): + i += 1 + assert len(t.columns) == 1 + catalog_path = f"{step_path}/{table.metadata.short_name}#{t.columns[0]}" + catalog_paths.append(catalog_path) + + # TODO: switch to asyncio or threads + # stop logging to stop cluttering logs + # if i > 20 and verbose: + # verbose = False + # thread_pool.submit( + # lambda: (time.sleep(10), log.info("upsert_dataset.continue_without_logging")) + # ) + + # generate table with entity_id, year and value for every column + task = upsert_table( + engine_async, + client, + admin_api, + t, + dataset_upsert_results, + catalog_path=catalog_path, + dimensions=(t.iloc[:, 0].metadata.additional_info or {}).get("dimensions"), + verbose=verbose, + ) + + tasks.append(task) + + await asyncio.gather(*tasks) + + await engine_async.dispose() + await client.close() + + return catalog_paths diff --git a/etl/grapher_model.py b/etl/grapher_model.py index 01d78d9c39a..a05355685f7 100644 --- a/etl/grapher_model.py +++ b/etl/grapher_model.py @@ -45,6 +45,7 @@ SmallInteger, String, and_, + delete, func, or_, select, @@ -61,6 +62,7 @@ ) from sqlalchemy.engine import Engine from sqlalchemy.exc import NoResultFound +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import ( # type: ignore DeclarativeBase, @@ -231,9 +233,9 @@ def load_tags(cls, session: Session) -> List["Tag"]: return list(session.scalars(select(cls).where(cls.slug.isnot(None))).all()) @classmethod - def load_tags_by_names(cls, session: Session, tag_names: List[str]) -> List["Tag"]: + async def load_tags_by_names(cls, session: AsyncSession, tag_names: List[str]) -> List["Tag"]: """Load topic tags by their names in the order given in `tag_names`.""" - tags = session.scalars(select(Tag).where(Tag.name.in_(tag_names))).all() + tags = (await session.scalars(select(Tag).where(Tag.name.in_(tag_names)))).all() if len(tags) != len(tag_names): found_tags = [tag.name for tag in tags] @@ -762,8 +764,8 @@ def _upsert_select(self) -> Select: ] return select(cls).where(*conds) - def upsert(self, session: Session) -> "Source": - ds = session.scalars(self._upsert_select).one_or_none() + async def upsert(self, session: AsyncSession) -> "Source": + ds = (await session.scalars(self._upsert_select)).one_or_none() if not ds: ds = self @@ -772,7 +774,7 @@ def upsert(self, session: Session) -> "Source": ds.description = self.description session.add(ds) - session.flush() # Ensure the object is written to the database and its ID is generated + await session.flush() # Ensure the object is written to the database and its ID is generated return ds @classmethod @@ -884,10 +886,12 @@ class OriginsVariablesLink(Base): displayOrder: Mapped[int] = mapped_column(SmallInteger, server_default=text("'0'")) @classmethod - def link_with_variable(cls, session: Session, variable_id: int, new_origin_ids: List[int]) -> None: + async def link_with_variable(cls, session: AsyncSession, variable_id: int, new_origin_ids: List[int]) -> None: """Link the given Variable ID with the given Origin IDs.""" # Fetch current linked Origins for the given Variable ID - existing_links = session.query(cls.originId, cls.displayOrder).filter(cls.variableId == variable_id).all() + existing_links = ( + await session.execute(select(cls.originId, cls.displayOrder).filter_by(variableId=variable_id)) + ).all() existing_origins = {(link.originId, link.displayOrder) for link in existing_links} new_origins = {(origin_id, i) for i, origin_id in enumerate(new_origin_ids)} @@ -898,11 +902,19 @@ def link_with_variable(cls, session: Session, variable_id: int, new_origin_ids: # Delete the obsolete Origin-Variable links for origin_id, display_order in to_delete: - session.query(cls).filter( + # TODO: test that it works + stmt = delete(cls).where( cls.variableId == variable_id, cls.originId == origin_id, cls.displayOrder == display_order, - ).delete(synchronize_session="fetch") + ) + await session.execute(stmt) + + # session.query(cls).filter( + # cls.variableId == variable_id, + # cls.originId == origin_id, + # cls.displayOrder == display_order, + # ).delete(synchronize_session="fetch") # Add the new Origin-Variable links if to_add: @@ -928,10 +940,10 @@ class PostsGdocsVariablesFaqsLink(Base): displayOrder: Mapped[int] = mapped_column(SmallInteger, server_default=text("'0'")) @classmethod - def link_with_variable(cls, session: Session, variable_id: int, new_faqs: List[catalog.FaqLink]) -> None: + async def link_with_variable(cls, session: AsyncSession, variable_id: int, new_faqs: List[catalog.FaqLink]) -> None: """Link the given Variable ID with Faqs""" # Fetch current linked Faqs for the given Variable ID - existing_faqs = session.query(cls).filter(cls.variableId == variable_id).all() + existing_faqs = (await session.execute(select(cls).filter_by(variableId=variable_id))).scalars().all() # Work with tuples instead existing_gdoc_fragment = {(f.gdocId, f.fragmentId, f.displayOrder) for f in existing_faqs} @@ -942,12 +954,20 @@ def link_with_variable(cls, session: Session, variable_id: int, new_faqs: List[c # Delete the obsolete links for gdoc_id, fragment_id, display_order in to_delete: - session.query(cls).filter( + stmt = delete(cls).where( cls.variableId == variable_id, cls.gdocId == gdoc_id, cls.fragmentId == fragment_id, cls.displayOrder == display_order, - ).delete(synchronize_session="fetch") + ) + await session.execute(stmt) + + # session.query(cls).filter( + # cls.variableId == variable_id, + # cls.gdocId == gdoc_id, + # cls.fragmentId == fragment_id, + # cls.displayOrder == display_order, + # ).delete(synchronize_session="fetch") # Add the new links if to_add: @@ -972,12 +992,14 @@ class TagsVariablesTopicTagsLink(Base): displayOrder: Mapped[int] = mapped_column(SmallInteger, server_default=text("'0'")) @classmethod - def link_with_variable(cls, session: Session, variable_id: int, new_tag_ids: List[int]) -> None: + async def link_with_variable(cls, session: AsyncSession, variable_id: int, new_tag_ids: List[int]) -> None: """Link the given Variable ID with the given Tag IDs.""" assert len(new_tag_ids) == len(set(new_tag_ids)), "Tag IDs must be unique" # Fetch current linked tags for the given Variable ID - existing_links = session.query(cls.tagId, cls.displayOrder).filter(cls.variableId == variable_id).all() + existing_links = ( + await session.execute(select(cls.tagId, cls.displayOrder).filter_by(variableId=variable_id)) + ).all() existing_tags = {(link.tagId, link.displayOrder) for link in existing_links} new_tags = {(tag_id, i) for i, tag_id in enumerate(new_tag_ids)} @@ -988,11 +1010,17 @@ def link_with_variable(cls, session: Session, variable_id: int, new_tag_ids: Lis # Delete the obsolete links for tag_id, display_order in to_delete: - session.query(cls).filter( + stmt = delete(cls).where( cls.variableId == variable_id, cls.tagId == tag_id, cls.displayOrder == display_order, - ).delete(synchronize_session="fetch") + ) + await session.execute(stmt) + # session.query(cls).filter( + # cls.variableId == variable_id, + # cls.tagId == tag_id, + # cls.displayOrder == display_order, + # ).delete(synchronize_session="fetch") # Add the new links if to_add: @@ -1089,7 +1117,7 @@ class Variable(Base): dataChecksum: Mapped[Optional[str]] = mapped_column(VARCHAR(64), default=None) metadataChecksum: Mapped[Optional[str]] = mapped_column(VARCHAR(64), default=None) - def upsert(self, session: Session) -> "Variable": + async def upsert(self, session: AsyncSession) -> "Variable": assert self.catalogPath assert self.shortName @@ -1106,24 +1134,10 @@ def upsert(self, session: Session) -> "Variable": # try matching on shortName first q = select(cls).where( - or_( - cls.shortName == self.shortName, - # NOTE: we used to slugify shortName which replaced double underscore by a single underscore - # this was a bug, we should have kept the double underscore - # match even those variables and correct their shortName - cls.shortName == self.shortName.replace("__", "_"), - ), + cls.catalogPath == self.catalogPath, cls.datasetId == self.datasetId, ) - ds = session.scalars(q).one_or_none() - - # try matching on name if there was no match on shortName - if not ds: - q = select(cls).where( - cls.name == self.name, - cls.datasetId == self.datasetId, - ) - ds = session.scalars(q).one_or_none() + ds = (await session.scalars(q)).one_or_none() # there's a unique index on `name` which can cause conflict if we swap names of two variables # in that case, we append "(conflict)" to the name of the conflicting variable (it will be cleaned @@ -1136,12 +1150,12 @@ def upsert(self, session: Session) -> "Variable": cls.shortName != self.shortName, cls.datasetId == self.datasetId, ) - conflict = session.scalars(q).one_or_none() + conflict = (await session.scalars(q)).one_or_none() if conflict: # modify the conflicting variable name, it'll be cleaned up later conflict.name = f"{conflict.name} (conflict {random.randint(0, 1000)})" session.add(conflict) - session.commit() + await session.commit() if not ds: ds = self @@ -1183,7 +1197,7 @@ def upsert(self, session: Session) -> "Variable": ds.sort = self.sort session.add(ds) - session.flush() # Ensure the object is written to the database and its ID is generated + await session.flush() # Ensure the object is written to the database and its ID is generated return ds @classmethod @@ -1363,6 +1377,11 @@ def from_id( elif isinstance(variable_id, list): return session.scalars(_select_columns(cls, columns).where(cls.id.in_(variable_id))).all() # type: ignore + @classmethod + async def load_from_catalog_path_async(cls, session: AsyncSession, catalog_path: str) -> "Variable": + assert "#" in catalog_path, "catalog_path should end with #indicator_short_name" + return (await session.scalars(select(cls).where(cls.catalogPath == catalog_path))).one() + @classmethod def catalog_paths_to_variable_ids(cls, session: Session, catalog_paths: List[str]) -> Dict[str, int]: """Return a mapping from catalog paths to variable IDs.""" @@ -1374,33 +1393,45 @@ def infer_type(cls, values: pd.Series) -> VARIABLE_TYPE: """Set type and sort fields based on indicator values.""" return _infer_variable_type(values) - def update_links( - self, session: Session, db_origins: List["Origin"], faqs: List[catalog.FaqLink], tag_names: List[str] - ) -> None: - """ - Establishes relationships between the current variable and a list of origins and a list of posts. - """ - assert self.id - - # establish relationships between variables and origins - OriginsVariablesLink.link_with_variable(session, self.id, [origin.id for origin in db_origins]) - + async def _link_faqs(self, session: AsyncSession, faqs: List[catalog.FaqLink]) -> None: # establish relationships between variables and posts required_gdoc_ids = {faq.gdoc_id for faq in faqs} query = select(PostsGdocs).where(PostsGdocs.id.in_(required_gdoc_ids)) - gdoc_posts = session.scalars(query).all() + gdoc_posts = (await session.scalars(query)).all() existing_gdoc_ids = {gdoc_post.id for gdoc_post in gdoc_posts} missing_gdoc_ids = required_gdoc_ids - existing_gdoc_ids if missing_gdoc_ids: log.warning("create_links.missing_faqs", missing_gdoc_ids=missing_gdoc_ids) - PostsGdocsVariablesFaqsLink.link_with_variable( + await PostsGdocsVariablesFaqsLink.link_with_variable( session, self.id, [faq for faq in faqs if faq.gdoc_id in existing_gdoc_ids] ) + async def update_links( + self, + session: AsyncSession, + db_origins: List["Origin"], + faqs: List[catalog.FaqLink], + tag_names: List[str], + ) -> None: + """ + Establishes relationships between the current variable and a list of origins and a list of posts. + """ + assert self.id + + # establish relationships between variables and origins + origins_variables_task = OriginsVariablesLink.link_with_variable( + session, self.id, [origin.id for origin in db_origins] + ) + + link_faqs_task = self._link_faqs(session, faqs) + # establish relationships between variables and tags - tags = Tag.load_tags_by_names(session, tag_names) + tags = await Tag.load_tags_by_names(session, tag_names) - TagsVariablesTopicTagsLink.link_with_variable(session, self.id, [tag.id for tag in tags]) + await TagsVariablesTopicTagsLink.link_with_variable(session, self.id, [tag.id for tag in tags]) + + await origins_variables_task + await link_faqs_task def s3_data_path(self, typ: S3_PATH_TYP = "s3") -> str: """Path to S3 with data in JSON format for Grapher. Typically @@ -1561,7 +1592,7 @@ def _upsert_select(self) -> Select: cls.dateAccessed == self.dateAccessed, ) - def upsert(self, session: Session) -> "Origin": + async def upsert(self, session: AsyncSession) -> "Origin": """ # NOTE: this would be an ideal solution if we only stored unique rows in # origins table, but there are weird race conditions and we cannot have @@ -1581,8 +1612,7 @@ def upsert(self, session: Session) -> "Origin": # select added object to get its id return session.scalars(self._upsert_select).one() """ - - origins = session.scalars(self._upsert_select).all() + origins = (await session.scalars(self._upsert_select)).all() if not origins: # create new origin origin = self diff --git a/etl/steps/__init__.py b/etl/steps/__init__.py index de7649cf9b2..4ad05d8d85f 100644 --- a/etl/steps/__init__.py +++ b/etl/steps/__init__.py @@ -2,6 +2,7 @@ # __init__.py # steps # +import asyncio import graphlib import hashlib import json @@ -10,10 +11,9 @@ import subprocess import sys import tempfile -import time import warnings from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from glob import glob from importlib import import_module @@ -891,58 +891,9 @@ def run(self) -> None: # Passing a BlockManager to Table is deprecated and will raise in a future version. Use public APIs instead. warnings.filterwarnings("ignore", category=DeprecationWarning) - catalog_paths = [] - - with ThreadPoolExecutor(max_workers=config.GRAPHER_INSERT_WORKERS) as thread_pool: - futures = [] - verbose = True - i = 0 - - # NOTE: multiple tables will be saved under a single dataset, this could cause problems if someone - # is fetching the whole dataset from data-api as they would receive all tables merged in a single - # table. This won't be a problem after we introduce the concept of "tables" - for table in dataset: - assert not table.empty, f"table {table.metadata.short_name} is empty" - - # if GRAPHER_FILTER is set, only upsert matching columns - if config.GRAPHER_FILTER: - cols = table.filter(regex=config.GRAPHER_FILTER).columns.tolist() - if not cols: - continue - cols += [c for c in table.columns if c in {"year", "date", "country"} and c not in cols] - table = table.loc[:, cols] - - table = gh._adapt_table_for_grapher(table, engine) - - for t in gh._yield_wide_table(table, na_action="drop"): - i += 1 - assert len(t.columns) == 1 - catalog_path = f"{self.path}/{table.metadata.short_name}#{t.columns[0]}" - catalog_paths.append(catalog_path) - - # stop logging to stop cluttering logs - if i > 20 and verbose: - verbose = False - thread_pool.submit( - lambda: (time.sleep(10), log.info("upsert_dataset.continue_without_logging")) - ) - - # generate table with entity_id, year and value for every column - futures.append( - thread_pool.submit( - gi.upsert_table, - engine, - admin_api, - t, - dataset_upsert_results, - catalog_path=catalog_path, - dimensions=(t.iloc[:, 0].metadata.additional_info or {}).get("dimensions"), - verbose=verbose, - ) - ) - - # wait for all tables to be inserted - [future.result() for future in as_completed(futures)] + catalog_paths = asyncio.run( + gi._upsert_tables_from_dataset_async(self.path, dataset, engine, admin_api, dataset_upsert_results) + ) if not config.GRAPHER_FILTER and not config.SUBSET: # cleaning up ghost resources could be unsuccessful if someone renamed short_name of a variable diff --git a/lib/catalog/owid/catalog/s3_utils.py b/lib/catalog/owid/catalog/s3_utils.py index 564306ed695..dbeb97e7a87 100644 --- a/lib/catalog/owid/catalog/s3_utils.py +++ b/lib/catalog/owid/catalog/s3_utils.py @@ -144,10 +144,7 @@ def _read_owid_rclone_config() -> Dict[str, str]: return dict(config["owid-r2"].items()) -def connect_r2() -> BaseClient: - "Return a connection to Cloudflare's R2." - import boto3 - +def r2_config(): # first, get the R2 credentials from dotenv R2_ACCESS_KEY = env.get("R2_ACCESS_KEY") R2_SECRET_KEY = env.get("R2_SECRET_KEY") @@ -165,7 +162,7 @@ def connect_r2() -> BaseClient: except KeyError: pass - client = boto3.client( + return dict( service_name="s3", aws_access_key_id=R2_ACCESS_KEY, aws_secret_access_key=R2_SECRET_KEY, @@ -173,7 +170,12 @@ def connect_r2() -> BaseClient: region_name=R2_REGION_NAME or "auto", ) - return client + +def connect_r2() -> BaseClient: + "Return a connection to Cloudflare's R2." + import boto3 + + return boto3.client(**r2_config()) @lru_cache(maxsize=None) diff --git a/pyproject.toml b/pyproject.toml index 16272913362..f39c9033788 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ dependencies = [ "html2text>=2020.1.16", "pygithub>=2.3.0", "pandas==2.2.3", - "sqlalchemy>=2.0.30", + "sqlalchemy[asyncio]>=2.0.34", "pymysql>=1.1.1", "tiktoken>=0.7.0", "earthengine-api>=0.1.411", @@ -68,6 +68,8 @@ dependencies = [ "geopy>=2.4.1", "py7zr>=0.22.0", "pyreadr>=0.5.2", + "aiobotocore>=2.14.0", + "aiomysql>=0.2.0" ] [tool.uv.sources] diff --git a/tests/test_grapher_helpers.py b/tests/test_grapher_helpers.py index 661afe0b633..84dd1bfbada 100644 --- a/tests/test_grapher_helpers.py +++ b/tests/test_grapher_helpers.py @@ -186,6 +186,7 @@ def _sample_table() -> Table: def test_adapt_table_for_grapher_multiindex(): with mock.patch("etl.grapher_helpers._get_entities_from_db") as mock_get_entities_from_db: with mock.patch("etl.grapher_io._fetch_entities") as mock_fetch_entities: + # with mock.patch("apps.backport.datasync.data_metadata._fetch_entities") as mock_fetch_entities: mock_get_entities_from_db.return_value = {"Poland": 1, "France": 2} mock_fetch_entities.return_value = pd.DataFrame( { diff --git a/uv.lock b/uv.lock index da42e2623a0..332b45a06bd 100644 --- a/uv.lock +++ b/uv.lock @@ -15,6 +15,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0b/f7/85273299ab57117850cc0a936c64151171fac4da49bc6fba0dad984a7c5f/affine-2.4.0-py3-none-any.whl", hash = "sha256:8a3df80e2b2378aef598a83c1392efd47967afec4242021a0b06b4c7cbc61a92", size = 15662 }, ] +[[package]] +name = "aiobotocore" +version = "2.16.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "aioitertools" }, + { name = "botocore" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/dc/5a44e1cd5e206b11abf67754d47dabcde4f927bb281b93dabdbf77eba3fd/aiobotocore-2.16.0.tar.gz", hash = "sha256:6d6721961a81570e9b920b98778d95eec3d52a9f83b7844c6c5cfdbf2a2d6a11", size = 107433 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c5/63/c03db9dafb0b3b8a90a1714a1949bc1e7db1d0e2c4062400901da35678fe/aiobotocore-2.16.0-py3-none-any.whl", hash = "sha256:eb3641a7b9c51113adbc33a029441de6201ebb026c64ff2e149c7fa802c9abfc", size = 77781 }, +] + [[package]] name = "aiohappyeyeballs" version = "2.4.4" @@ -29,13 +44,14 @@ name = "aiohttp" version = "3.11.10" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "aiohappyeyeballs", marker = "python_full_version >= '3.12'" }, - { name = "aiosignal", marker = "python_full_version >= '3.12'" }, - { name = "attrs", marker = "python_full_version >= '3.12'" }, - { name = "frozenlist", marker = "python_full_version >= '3.12'" }, - { name = "multidict", marker = "python_full_version >= '3.12'" }, - { name = "propcache", marker = "python_full_version >= '3.12'" }, - { name = "yarl", marker = "python_full_version >= '3.12'" }, + { name = "aiohappyeyeballs" }, + { name = "aiosignal" }, + { name = "async-timeout", marker = "python_full_version < '3.11'" }, + { name = "attrs" }, + { name = "frozenlist" }, + { name = "multidict" }, + { name = "propcache" }, + { name = "yarl" }, ] sdist = { url = "https://files.pythonhosted.org/packages/94/c4/3b5a937b16f6c2a0ada842a9066aad0b7a5708427d4a202a07bf09c67cbb/aiohttp-3.11.10.tar.gz", hash = "sha256:b1fc6b45010a8d0ff9e88f9f2418c6fd408c99c211257334aff41597ebece42e", size = 7668832 } wheels = [ @@ -101,12 +117,33 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/49/1f/deed34e9fca639a7f873d01150d46925d3e1312051eaa591c1aa1f2e6ddc/aiohttp-3.11.10-cp313-cp313-win_amd64.whl", hash = "sha256:beb39a6d60a709ae3fb3516a1581777e7e8b76933bb88c8f4420d875bb0267c6", size = 435837 }, ] +[[package]] +name = "aioitertools" +version = "0.12.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/06/de/38491a84ab323b47c7f86e94d2830e748780525f7a10c8600b67ead7e9ea/aioitertools-0.12.0.tar.gz", hash = "sha256:c2a9055b4fbb7705f561b9d86053e8af5d10cc845d22c32008c43490b2d8dd6b", size = 19369 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/85/13/58b70a580de00893223d61de8fea167877a3aed97d4a5e1405c9159ef925/aioitertools-0.12.0-py3-none-any.whl", hash = "sha256:fc1f5fac3d737354de8831cbba3eb04f79dd649d8f3afb4c5b114925e662a796", size = 24345 }, +] + +[[package]] +name = "aiomysql" +version = "0.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pymysql" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/67/76/2c5b55e4406a1957ffdfd933a94c2517455291c97d2b81cec6813754791a/aiomysql-0.2.0.tar.gz", hash = "sha256:558b9c26d580d08b8c5fd1be23c5231ce3aeff2dadad989540fee740253deb67", size = 114706 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/42/87/c982ee8b333c85b8ae16306387d703a1fcdfc81a2f3f15a24820ab1a512d/aiomysql-0.2.0-py3-none-any.whl", hash = "sha256:b7c26da0daf23a5ec5e0b133c03d20657276e4eae9b73e040b72787f6f6ade0a", size = 44215 }, +] + [[package]] name = "aiosignal" version = "1.3.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "frozenlist", marker = "python_full_version >= '3.12'" }, + { name = "frozenlist" }, ] sdist = { url = "https://files.pythonhosted.org/packages/ae/67/0952ed97a9793b4958e5736f6d2b346b414a2cd63e82d05940032f45b32f/aiosignal-1.3.1.tar.gz", hash = "sha256:54cd96e15e1649b75d6c87526a6ff0b6c1b0dd3459f43d9ca11d48c339b68cfc", size = 19422 } wheels = [ @@ -254,6 +291,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fa/9f/3c3503693386c4b0f245eaf5ca6198e3b28879ca0a40bde6b0e319793453/async_lru-2.0.4-py3-none-any.whl", hash = "sha256:ff02944ce3c288c5be660c42dbcca0742b32c3b279d6dceda655190240b99224", size = 6111 }, ] +[[package]] +name = "async-timeout" +version = "5.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a5/ae/136395dfbfe00dfc94da3f3e136d0b13f394cba8f4841120e34226265780/async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3", size = 9274 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fe/ba/e2081de779ca30d473f21f5b30e0e737c438205440784c7dfc81efc2b029/async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c", size = 6233 }, +] + [[package]] name = "attrs" version = "24.2.0" @@ -997,6 +1043,8 @@ name = "etl" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "aiobotocore" }, + { name = "aiomysql" }, { name = "bugsnag" }, { name = "cdsapi" }, { name = "click" }, @@ -1045,7 +1093,7 @@ dependencies = [ { name = "shapely" }, { name = "simplejson" }, { name = "sparqlwrapper" }, - { name = "sqlalchemy" }, + { name = "sqlalchemy", extra = ["asyncio"] }, { name = "structlog" }, { name = "tenacity" }, { name = "tiktoken" }, @@ -1117,6 +1165,8 @@ dev = [ [package.metadata] requires-dist = [ + { name = "aiobotocore", specifier = ">=2.14.0" }, + { name = "aiomysql", specifier = ">=0.2.0" }, { name = "bugsnag", specifier = ">=4.2.1" }, { name = "cdsapi", specifier = ">=0.7.0" }, { name = "click", specifier = ">=8.0.1" }, @@ -1174,7 +1224,7 @@ requires-dist = [ { name = "simplejson", specifier = ">=3.17.6" }, { name = "slack-sdk", marker = "extra == 'api'", specifier = ">=3.26.2" }, { name = "sparqlwrapper", specifier = ">=1.8.5" }, - { name = "sqlalchemy", specifier = ">=2.0.30" }, + { name = "sqlalchemy", extras = ["asyncio"], specifier = ">=2.0.34" }, { name = "statsmodels", marker = "extra == 'wizard'", specifier = ">=0.14.4" }, { name = "streamlit", marker = "extra == 'wizard'", specifier = ">=1.41.0" }, { name = "streamlit-ace", marker = "extra == 'wizard'", specifier = ">=0.1.1" }, @@ -3267,6 +3317,9 @@ wheels = [ name = "multidict" version = "6.1.0" source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] sdist = { url = "https://files.pythonhosted.org/packages/d6/be/504b89a5e9ca731cd47487e91c469064f8ae5af93b7259758dcfc2b9c848/multidict-6.1.0.tar.gz", hash = "sha256:22ae2ebf9b0c69d206c003e2f6a914ea33f0a932d4aa16f236afc049d9958f4a", size = 64002 } wheels = [ { url = "https://files.pythonhosted.org/packages/29/68/259dee7fd14cf56a17c554125e534f6274c2860159692a414d0b402b9a6d/multidict-6.1.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3380252550e372e8511d49481bd836264c009adb826b23fefcc5dd3c69692f60", size = 48628 }, @@ -6208,6 +6261,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b8/49/21633706dd6feb14cd3f7935fc00b60870ea057686035e1a99ae6d9d9d53/SQLAlchemy-2.0.36-py3-none-any.whl", hash = "sha256:fddbe92b4760c6f5d48162aef14824add991aeda8ddadb3c31d56eb15ca69f8e", size = 1883787 }, ] +[package.optional-dependencies] +asyncio = [ + { name = "greenlet" }, +] + [[package]] name = "st-annotated-text" version = "4.0.1" @@ -7473,9 +7531,9 @@ name = "yarl" version = "1.18.3" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "idna", marker = "python_full_version >= '3.12'" }, - { name = "multidict", marker = "python_full_version >= '3.12'" }, - { name = "propcache", marker = "python_full_version >= '3.12'" }, + { name = "idna" }, + { name = "multidict" }, + { name = "propcache" }, ] sdist = { url = "https://files.pythonhosted.org/packages/b7/9d/4b94a8e6d2b51b599516a5cb88e5bc99b4d8d4583e468057eaa29d5f0918/yarl-1.18.3.tar.gz", hash = "sha256:ac1801c45cbf77b6c99242eeff4fffb5e4e73a800b5c4ad4fc0be5def634d2e1", size = 181062 } wheels = [