Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Marigold committed Dec 19, 2024
1 parent 9517673 commit 4e90fe5
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 284 deletions.
4 changes: 1 addition & 3 deletions apps/backport/datasync/data_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,7 @@ def _variable_metadata(
return variableMetadata


def _move_population_origin_to_end(
origins: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
def _move_population_origin_to_end(origins: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Move population origin to the end of the list of origins. This way it gets displayed last on data page."""
new_origins = []
pop_origin = None
Expand Down
27 changes: 6 additions & 21 deletions apps/chart_sync/admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,36 +87,21 @@ def set_tags(self, chart_id: int, tags: List[Dict[str, Any]], user_id: Optional[
raise AdminAPIError({"error": js["error"], "tags": tags})
return js

def put_grapher_config(self, variable_id: int, grapher_config: Dict[str, Any]) -> dict:
async def put_grapher_config(self, variable_id: int, grapher_config: Dict[str, Any]) -> dict:
# If schema is missing, use the default one
grapher_config.setdefault("$schema", DEFAULT_GRAPHER_SCHEMA)

# Retry in case we're restarting Admin on staging server
resp = requests_with_retry().put(
self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL",
cookies={"sessionid": self.session_id},
json=grapher_config,
)
js = self._json_from_response(resp)
if not js["success"]:
raise AdminAPIError(
{
"error": js["error"],
"variable_id": variable_id,
"grapher_config": grapher_config,
}
)
return js

async def put_grapher_config(self, variable_id: int, grapher_config: Dict[str, Any]) -> dict:
async with aiohttp.ClientSession(cookies={"sessionid": self.session_id}) as session:
async with session.put(
self.base_url + f"/admin/api/variables/{variable_id}/grapherConfigETL",
self.owid_env.admin_api + f"/variables/{variable_id}/grapherConfigETL",
json=grapher_config,
) as resp:
# TODO: make _json_from_response async
js = await resp.json()
assert js["success"]
if not js["success"]:
raise AdminAPIError(
{"error": js["error"], "variable_id": variable_id, "grapher_config": grapher_config}
)
return js

# TODO: make it async
Expand Down
8 changes: 1 addition & 7 deletions etl/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,7 @@ def read_sql(sql: str, engine: Optional[Engine | Session] = None, *args, **kwarg
raise ValueError(f"Unsupported engine type {type(engine)}")


def to_sql(
df: pd.DataFrame,
name: str,
engine: Optional[Engine | Session] = None,
*args,
**kwargs,
):
def to_sql(df: pd.DataFrame, name: str, engine: Optional[Engine | Session] = None, *args, **kwargs):
"""Wrapper around pd.to_sql that creates a connection and closes it after reading the data.
This adds overhead, so if you need performance, reuse the same connection and cursor.
"""
Expand Down
11 changes: 2 additions & 9 deletions etl/grapher_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ def _yield_wide_table(
if dim_names:
# `dropna=False` makes sure we don't drop NaN values from index
grouped = table.groupby(
dim_names if len(dim_names) > 1 else dim_names[0],
as_index=False,
observed=True,
dropna=False,
dim_names if len(dim_names) > 1 else dim_names[0], as_index=False, observed=True, dropna=False
)
else:
# a situation when there's only year and entity_id in index with no additional dimensions
Expand All @@ -146,11 +143,7 @@ def _yield_wide_table(
# If all values are null, skip variable
if table_to_yield[column].isnull().all():
if warn_null_variables:
log.warning(
"yield_wide_table.null_variable",
column=column,
dim_dict=dim_dict,
)
log.warning("yield_wide_table.null_variable", column=column, dim_dict=dim_dict)
continue

# Safety check to see if the metadata is still intact
Expand Down
104 changes: 54 additions & 50 deletions etl/grapher_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,67 +269,71 @@ async def upsert_table(
checksum_data = calculate_checksum_data(df)
checksum_metadata = calculate_checksum_metadata(variable_meta, df)

with Session(engine) as session:
# compare checksums
try:
db_variable = gm.Variable.from_catalog_path(session, catalog_path)
except NoResultFound:
db_variable = None

upsert_metadata_kwargs = dict(
session=session,
df=df,
variable_meta=variable_meta,
# NOTE: this is useful for debugging, will be removed once it is stable
if os.environ.get("SKIP_CHECKSUMS"):
import random

checksum_metadata = str(random.randint(0, 10000))
checksum_data = str(random.randint(0, 10000))

async with semaphore:
log.info(
"upsert_dataset.upsert_table.start",
column_name=column_name,
dataset_upsert_result=dataset_upsert_result,
catalog_path=catalog_path,
dimensions=dimensions,
admin_api=admin_api,
)

# create variable if it doesn't exist
if not db_variable:
db_variable = upsert_metadata(**upsert_metadata_kwargs)
upsert_data(df, db_variable.s3_data_path())
async_session = async_sessionmaker(engine_async, expire_on_commit=False)
async with async_session() as session:
# compare checksums
try:
db_variable = await gm.Variable.load_from_catalog_path_async(session, catalog_path)
except NoResultFound:
db_variable = None

upsert_metadata_kwargs = dict(
session=session,
df=df,
variable_meta=variable_meta,
column_name=column_name,
dataset_upsert_result=dataset_upsert_result,
catalog_path=catalog_path,
dimensions=dimensions,
admin_api=admin_api,
client=client,
)

# create variable if it doesn't exist
if not db_variable:
db_variable = await upsert_metadata(**upsert_metadata_kwargs)
await upsert_data(df, db_variable.s3_data_path(), client)

# variable exists
else:
if db_variable.dataChecksum == checksum_data and db_variable.metadataChecksum == checksum_metadata:
if verbose:
log.info("upsert_table.skipped_no_changes", size=len(df), variable_id=db_variable.id)
return

# NOTE: sequantial upserts are slower than parallel, but they will be useful once we switch to asyncio
# if db_variable.dataChecksum != checksum_data:
# upsert_data(df, db_variable.s3_data_path())
# if db_variable.metadataChecksum != checksum_metadata:
# db_variable = upsert_metadata(**upsert_metadata_kwargs)

futures = {}
with ThreadPoolExecutor() as executor:
# variable exists
else:
if db_variable.dataChecksum == checksum_data and db_variable.metadataChecksum == checksum_metadata:
if verbose:
log.info("upsert_table.skipped_no_changes", size=len(df), variable_id=db_variable.id)
return

upsert_data_task = None
if db_variable.dataChecksum != checksum_data:
futures["data"] = executor.submit(upsert_data, df, db_variable.s3_data_path())
upsert_data_task = upsert_data(df, db_variable.s3_data_path(), client)

if db_variable.metadataChecksum != checksum_metadata:
futures["metadata"] = executor.submit(upsert_metadata, **upsert_metadata_kwargs)
db_variable = await upsert_metadata(**upsert_metadata_kwargs)

if futures:
# Wait for futures to complete in case exceptions are raised
if "data" in futures:
futures["data"].result()
if "metadata" in futures:
db_variable = futures["metadata"].result()
if upsert_data_task:
await upsert_data_task

# Update checksums
db_variable.dataChecksum = checksum_data
db_variable.metadataChecksum = checksum_metadata
# Update checksums
db_variable.dataChecksum = checksum_data
db_variable.metadataChecksum = checksum_metadata

# Commit new checksums
session.add(db_variable)
session.commit()
# Commit new checksums
session.add(db_variable)
await session.commit()

if verbose:
log.info("upsert_table.uploaded_to_s3", size=len(df), variable_id=db_variable.id)
if verbose:
log.info("upsert_table.uploaded_to_s3", size=len(df), variable_id=db_variable.id)


async def upsert_data(df: pd.DataFrame, s3_data_path: str, client) -> None:
Expand Down
10 changes: 3 additions & 7 deletions etl/grapher_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,7 @@ def variable_data_table_from_catalog(


def get_dataset_id(
dataset_name: str,
db_conn: Optional[pymysql.Connection] = None,
version: Optional[str] = None,
dataset_name: str, db_conn: Optional[pymysql.Connection] = None, version: Optional[str] = None
) -> Any:
"""Get the dataset ID of a specific dataset name from database.
Expand Down Expand Up @@ -606,9 +604,7 @@ def get_dataset_id(

@deprecated("This function is deprecated. Its logic will be soon moved to etl.grapher_model.Dataset.")
def get_variables_in_dataset(
dataset_id: int,
only_used_in_charts: bool = False,
db_conn: Optional[pymysql.Connection] = None,
dataset_id: int, only_used_in_charts: bool = False, db_conn: Optional[pymysql.Connection] = None
) -> Any:
"""Get all variables data for a specific dataset ID from database.
Expand Down Expand Up @@ -664,7 +660,7 @@ def get_all_datasets(archived: bool = True, db_conn: Optional[pymysql.Connection
if db_conn is None:
db_conn = get_connection()

query = " SELECT namespace, name, id, updatedAt FROM datasets"
query = " SELECT namespace, name, id, updatedAt, isArchived FROM datasets"
if not archived:
query += " WHERE isArchived = 0"
datasets = pd.read_sql(query, con=db_conn)
Expand Down
Loading

0 comments on commit 4e90fe5

Please sign in to comment.