diff --git a/leap_data_management_utils/cmip_transforms.py b/leap_data_management_utils/cmip_transforms.py index a0e8f87..122e347 100644 --- a/leap_data_management_utils/cmip_transforms.py +++ b/leap_data_management_utils/cmip_transforms.py @@ -9,6 +9,7 @@ import zarr from google.cloud import bigquery from pangeo_forge_recipes.transforms import Indexed, T +from tqdm.auto import tqdm from leap_data_management_utils.cmip_testing import test_all from leap_data_management_utils.data_management_transforms import BQInterface @@ -136,23 +137,14 @@ def iid_exists(self, iid: str) -> bool: """Check if iid exists in the table""" return self._get_iid_results(iid).exists - def iid_list_exists(self, iids: list[str]) -> list[str]: + def _iid_list_exists_batch(self, iids: list[str]) -> list[str]: """More efficient way to check if a list of iids exists in the table Passes the entire list to a single SQL query. Returns a list of iids that exist in the table - Only supports list up to 10k elements. If you want to check more, you should - work in batches: - ``` - iids = df['instance_id'].tolist() - iids_in_bq = [] - batchsize = 10000 - iid_batches = [iids[i : i + batchsize] for i in range(0, len(iids), batchsize)] - for iids_batch in tqdm(iid_batches): - iids_in_bq_batch = bq.iid_list_exists(iids_batch) - iids_in_bq.extend(iids_in_bq_batch) ``` """ - assert len(iids) <= 10000 + if len(iids) > 10000: + raise ValueError('List of iids is too long. Please work in batches.') # source: https://stackoverflow.com/questions/26441928/how-do-i-check-if-multiple-values-exists-in-database query = f""" @@ -164,6 +156,21 @@ def iid_list_exists(self, iids: list[str]) -> list[str]: # this is a full row iterator, for now just return the iids return list(set([r['instance_id'] for r in results])) + def iid_list_exists(self, iids: list[str]) -> list[str]: + """More efficient way to check if a list of iids exists in the table + Passes the entire list in batches into SQL querys for maximum efficiency. + Returns a list of iids that exist in the table + """ + + # make batches of the input, since bq cannot handle more than 10k elements here + iids_in_bq = [] + batchsize = 10000 + iid_batches = [iids[i : i + batchsize] for i in range(0, len(iids), batchsize)] + for iids_batch in tqdm(iid_batches): + iids_in_bq_batch = self._iid_list_exists_batch(iids_batch) + iids_in_bq.extend(iids_in_bq_batch) + return iids_in_bq + # ---------------------------------------------------------------------------------------------- # apache Beam stages diff --git a/pyproject.toml b/pyproject.toml index 6e7b04e..f31140a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ [project.optional-dependencies] pangeo-forge=[ + "tqdm", "db_dtypes", "google-api-core", "google-cloud-bigquery",