diff --git a/feedstock/iids.yaml b/feedstock/iids.yaml index 91efaf7a..19d0af24 100644 --- a/feedstock/iids.yaml +++ b/feedstock/iids.yaml @@ -1,5 +1,9 @@ # from https://github.com/Timh37/CMIP6cex/issues/2 - "CMIP6.*.*.*.[historical, ssp245, ssp585].*.day.[psl, pr, sfcWind].*.*" + # from + - 'CMIP6.HighResMIP.CMCC.CMCC-CM2-VHR4.[hist-1950,highres-future].r1i1p1f1.6hrPlevPt.[vas,uas,psl].gn.*' + - 'CMIP6.HighResMIP.EC-Earth-Consortium.EC-Earth3P-HR.[hist-1950,highres-future].r1i1p2f1.6hrPlevPt.[vas,uas,psl].gr.*' + - 'CMIP6.HighResMIP.MOHC.HadGEM3-GC31-HM.[hist-1950,highres-future].r1i1p1f1.[3hr, E3hr].[vas,uas,psl].gn.*' # from https://github.com/pangeo-forge/cmip6-feedstock/issues/22 - "CMIP6.*.*.*.[historical, ssp126, ssp245, ssp585].*.Omon.zmeso.*.*" # PMIP velocities diff --git a/feedstock/iids_pr.yaml b/feedstock/iids_pr.yaml index 03aba3d6..ea10b8ca 100644 --- a/feedstock/iids_pr.yaml +++ b/feedstock/iids_pr.yaml @@ -1 +1 @@ - - "CMIP6.*.*.[CNRM-CM6-1,CanESM5].historical.*.Omon.[tos, so].*.*" + - "CMIP6.*.*.[CNRM-CM6-1,CanESM5].historical.*.Omon.[tos, so].*.*" diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 28993457..b5daac14 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -2,13 +2,13 @@ """Modified transforms from Pangeo Forge""" import apache_beam as beam -from typing import List, Dict +from typing import Dict from dask.utils import parse_bytes -from pangeo_forge_esgf import get_urls_from_esgf, setup_logging +from pangeo_forge_esgf import setup_logging from leap_data_management_utils import CMIPBQInterface, LogCMIPToBigQuery from leap_data_management_utils.data_management_transforms import Copy, InjectAttrs from leap_data_management_utils.cmip_transforms import TestDataset, Preprocessor -from pangeo_forge_esgf.parsing import parse_instance_ids +from pangeo_forge_esgf.client import ESGFClient from pangeo_forge_recipes.patterns import pattern_from_file_sequence from pangeo_forge_recipes.transforms import ( OpenURLWithFSSpec, @@ -17,7 +17,6 @@ ConsolidateMetadata, ConsolidateDimensionCoordinates, ) -import asyncio import logging import os import xarray as xr @@ -79,33 +78,26 @@ iids_raw = yaml.safe_load(f) iids_raw = [iid for iid in iids_raw if iid] - -def parse_wildcards(iids: List[str]) -> List[str]: - """iterate through each list element and - if it contains wilcards apply the wildcard parser - """ - iids_parsed = [] - for iid in iids: - if "*" in iid: - iids_parsed += parse_instance_ids(iid) - else: - iids_parsed.append(iid) - return iids_parsed - - -# parse out wildcard iids using pangeo-forge-esgf -print(f"{iids_raw = }") -iids = parse_wildcards(iids_raw) -print(f"{iids = }") - -# exclude dupes -iids = list(set(iids)) +# parse out wildcard/square brackets using pangeo-forge-esgf +logger.debug(f"{iids_raw = }") + +client = ESGFClient( + file_output_fields=[ + "pid", + "tracking_id", + "further_info_url", + "citation_url", + "checksum", + "checksum_type", + ], + dataset_output_fields=["pid", "tracking_id", "further_info_url", "citation_url"], +) +iids = client.expand_instance_id_list(iids_raw) +logger.info(f"{iids = }") # Prune the url dict to only include items that have not been logged to BQ yet -print("Pruning iids that already exist") - +logger.info("Pruning iids that already exist") bq_interface = CMIPBQInterface(table_id=table_id) - # get lists of the iids already logged iids_in_table = bq_interface.iid_list_exists(iids) @@ -121,35 +113,50 @@ def parse_wildcards(iids: List[str]) -> List[str]: del bq_interface # Maybe I want a more finegrained check here at some point, but for now this will prevent logged iids from rerunning -print(f"{overwrite_iids =}") +logger.debug(f"{overwrite_iids =}") iids_to_skip = set(iids_in_table) - set(overwrite_iids) -print(f"{iids_to_skip =}") +logger.debug(f"{iids_to_skip =}") iids_filtered = list(set(iids) - iids_to_skip) -print(f"Pruned {len(iids) - len(iids_filtered)}/{len(iids)} iids from input list") +logger.info(f"Pruned {len(iids) - len(iids_filtered)}/{len(iids)} iids from input list") if prune_iids: iids_filtered = iids_filtered[0:200] -print(f"🚀 Requesting a total of {len(iids_filtered)} iids") -# Get the urls from ESGF at Runtime (only for the pruned list to save time) -url_dict = asyncio.run( - get_urls_from_esgf( - iids_filtered, - limit_per_host=20, - max_concurrency=20, - max_concurrency_response=20, - ) -) +def combine_dicts(dicts): + result = {} + for d in dicts: + for key, value in d.items(): + if key in result: + result[key].append(value) + else: + result[key] = [value] + return result + + +print(f"🚀 Requesting a total of {len(iids_filtered)} iids") +input_dict = client.get_recipe_inputs_from_iid_list(iids_filtered) +logger.debug(f"{input_dict=}") +input_dict_flat = { + iid: [(k, v) for k, v in data.items()] for iid, data in input_dict.items() +} +logger.debug(f"{input_dict_flat=}") +recipe_dict = { + iid: combine_dicts([i[1] for i in sorted(data)]) + for iid, data in input_dict_flat.items() +} +logger.debug(f"{recipe_dict=}") if prune_submission: - url_dict = {iid: url_dict[iid] for iid in list(url_dict.keys())[0:10]} + recipe_dict = { + iid: {k: v[0:10] for k, v in data.items()} for iid, data in recipe_dict.items() + } -print(f"🚀 Submitting a total of {len(url_dict)} iids") +print(f"🚀 Submitting a total of {len(recipe_dict)} iids") # Print the actual urls -print(f"{url_dict = }") +logger.debug(f"{recipe_dict = }") ## Dynamic Chunking Wrapper @@ -231,7 +238,8 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]: ## Create the recipes recipes = {} -for iid, urls in url_dict.items(): +for iid, data in recipe_dict.items(): + urls = data["url"] pattern = pattern_from_file_sequence(urls, concat_dim="time") recipes[iid] = ( f"Creating {iid}" >> beam.Create(pattern.items()) @@ -244,7 +252,7 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]: combine_dims=pattern.combine_dim_keys, dynamic_chunking_fn=dynamic_chunking_func, ) - | InjectAttrs() + | InjectAttrs({"pangeo_forge_file_data": data}) | ConsolidateDimensionCoordinates() | ConsolidateMetadata() | Copy( diff --git a/feedstock/requirements.txt b/feedstock/requirements.txt index 333e2616..5efdfa90 100644 --- a/feedstock/requirements.txt +++ b/feedstock/requirements.txt @@ -1,5 +1,6 @@ leap-data-management-utils==0.0.7 -pangeo-forge-esgf==0.2.0 +#pangeo-forge-esgf==0.2.0 +git+https://github.com/jbusecke/pangeo-forge-esgf.git@new-request-scheme dynamic-chunks==0.0.3 git+https://github.com/ranchodeluxe/xarray@ranchodeluxe-patch-1#egg=xarray git+https://github.com/ranchodeluxe/rioxarray