Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing newest changes to API client #149

Merged
merged 11 commits into from
May 7, 2024
43 changes: 29 additions & 14 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import xarray as xr
import yaml
from tqdm.auto import tqdm

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,14 +93,23 @@
],
dataset_output_fields=["pid", "tracking_id", "further_info_url", "citation_url"],
)
iids = client.expand_instance_id_list(iids_raw)
iid_info_dict = client.get_instance_id_input(iids_raw)
iids = list(iid_info_dict.keys())
logger.info(f"{iids = }")

# Prune the url dict to only include items that have not been logged to BQ yet
logger.info("Pruning iids that already exist")
bq_interface = CMIPBQInterface(table_id=table_id)
# get lists of the iids already logged
iids_in_table = bq_interface.iid_list_exists(iids)

# TODO: Move this back to the BQ client https://github.com/leap-stc/leap-data-management-utils/issues/33
# Since we have more than 10k iids to check against the big query database,
# we need to run this in batches (bq does not take more than 10k inputs per query).
iids_in_table = []
batchsize = 10000
iid_batches = [iids[i : i + batchsize] for i in range(0, len(iids), batchsize)]
for iids_batch in tqdm(iid_batches):
iids_in_table_batch = bq_interface.iid_list_exists(iids_batch)
iids_in_table.extend(iids_in_table_batch)

# manual overrides (these will be rewritten each time as long as they exist here)
overwrite_iids = [
Expand All @@ -121,7 +131,21 @@


if prune_iids:
iids_filtered = iids_filtered[0:200]
iids_filtered = iids_filtered[0:20]


# Now that we have the iids that are not yet ingested, we can prune the full iid_info_dict and extract the 'id' field
iid_info_dict_filtered = {k: v for k, v in iid_info_dict.items() if k in iids_filtered}
dataset_ids_filtered = [v["id"] for v in iid_info_dict_filtered.values()]

print(f"🚀 Requesting a total of {len(dataset_ids_filtered)} datasets")
input_dict = client.get_recipe_inputs_from_dataset_ids(dataset_ids_filtered)

logger.debug(f"{input_dict=}")
input_dict_flat = {
iid: [(k, v) for k, v in data.items()] for iid, data in input_dict.items()
}
logger.debug(f"{input_dict_flat=}")


def combine_dicts(dicts):
Expand All @@ -135,23 +159,14 @@ def combine_dicts(dicts):
return result


print(f"🚀 Requesting a total of {len(iids_filtered)} iids")
input_dict = client.get_recipe_inputs_from_iid_list(iids_filtered)
logger.debug(f"{input_dict=}")
input_dict_flat = {
iid: [(k, v) for k, v in data.items()] for iid, data in input_dict.items()
}
logger.debug(f"{input_dict_flat=}")
recipe_dict = {
iid: combine_dicts([i[1] for i in sorted(data)])
for iid, data in input_dict_flat.items()
}
logger.debug(f"{recipe_dict=}")

if prune_submission:
recipe_dict = {
iid: {k: v[0:10] for k, v in data.items()} for iid, data in recipe_dict.items()
}
recipe_dict = {iid: recipe_dict[iid] for iid in list(recipe_dict.keys())[0:5]}

print(f"🚀 Submitting a total of {len(recipe_dict)} iids")

Expand Down
Loading