Skip to content

Commit

Permalink
Request 144 + testing new pangeo-forge-esgf client (#145)
Browse files Browse the repository at this point in the history
* Request 144

* Update iids_pr.yaml

* Update iids_pr.yaml

* Try new esgf client

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* switch pangeo-forge-esgf dependency to pr branch

* Update recipe.py

* Update recipe.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update recipe.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update recipe.py

* Update recipe.py

* Update recipe.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update recipe.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update recipe.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update recipe.py

* Update recipe.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update recipe.py

* Update iids.yaml

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update iids_pr.yaml

* Update iids_pr.yaml

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jbusecke and pre-commit-ci[bot] authored May 7, 2024
1 parent 017b1fc commit 2c66656
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 48 deletions.
4 changes: 4 additions & 0 deletions feedstock/iids.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# from https://github.com/Timh37/CMIP6cex/issues/2
- "CMIP6.*.*.*.[historical, ssp245, ssp585].*.day.[psl, pr, sfcWind].*.*"
# from
- 'CMIP6.HighResMIP.CMCC.CMCC-CM2-VHR4.[hist-1950,highres-future].r1i1p1f1.6hrPlevPt.[vas,uas,psl].gn.*'
- 'CMIP6.HighResMIP.EC-Earth-Consortium.EC-Earth3P-HR.[hist-1950,highres-future].r1i1p2f1.6hrPlevPt.[vas,uas,psl].gr.*'
- 'CMIP6.HighResMIP.MOHC.HadGEM3-GC31-HM.[hist-1950,highres-future].r1i1p1f1.[3hr, E3hr].[vas,uas,psl].gn.*'
# from https://github.com/pangeo-forge/cmip6-feedstock/issues/22
- "CMIP6.*.*.*.[historical, ssp126, ssp245, ssp585].*.Omon.zmeso.*.*"
# PMIP velocities
Expand Down
2 changes: 1 addition & 1 deletion feedstock/iids_pr.yaml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
- "CMIP6.*.*.[CNRM-CM6-1,CanESM5].historical.*.Omon.[tos, so].*.*"
- "CMIP6.*.*.[CNRM-CM6-1,CanESM5].historical.*.Omon.[tos, so].*.*"
100 changes: 54 additions & 46 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
"""Modified transforms from Pangeo Forge"""

import apache_beam as beam
from typing import List, Dict
from typing import Dict
from dask.utils import parse_bytes
from pangeo_forge_esgf import get_urls_from_esgf, setup_logging
from pangeo_forge_esgf import setup_logging
from leap_data_management_utils import CMIPBQInterface, LogCMIPToBigQuery
from leap_data_management_utils.data_management_transforms import Copy, InjectAttrs
from leap_data_management_utils.cmip_transforms import TestDataset, Preprocessor
from pangeo_forge_esgf.parsing import parse_instance_ids
from pangeo_forge_esgf.client import ESGFClient
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
Expand All @@ -17,7 +17,6 @@
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
)
import asyncio
import logging
import os
import xarray as xr
Expand Down Expand Up @@ -79,33 +78,26 @@
iids_raw = yaml.safe_load(f)
iids_raw = [iid for iid in iids_raw if iid]


def parse_wildcards(iids: List[str]) -> List[str]:
"""iterate through each list element and
if it contains wilcards apply the wildcard parser
"""
iids_parsed = []
for iid in iids:
if "*" in iid:
iids_parsed += parse_instance_ids(iid)
else:
iids_parsed.append(iid)
return iids_parsed


# parse out wildcard iids using pangeo-forge-esgf
print(f"{iids_raw = }")
iids = parse_wildcards(iids_raw)
print(f"{iids = }")

# exclude dupes
iids = list(set(iids))
# parse out wildcard/square brackets using pangeo-forge-esgf
logger.debug(f"{iids_raw = }")

client = ESGFClient(
file_output_fields=[
"pid",
"tracking_id",
"further_info_url",
"citation_url",
"checksum",
"checksum_type",
],
dataset_output_fields=["pid", "tracking_id", "further_info_url", "citation_url"],
)
iids = client.expand_instance_id_list(iids_raw)
logger.info(f"{iids = }")

# Prune the url dict to only include items that have not been logged to BQ yet
print("Pruning iids that already exist")

logger.info("Pruning iids that already exist")
bq_interface = CMIPBQInterface(table_id=table_id)

# get lists of the iids already logged
iids_in_table = bq_interface.iid_list_exists(iids)

Expand All @@ -121,35 +113,50 @@ def parse_wildcards(iids: List[str]) -> List[str]:
del bq_interface

# Maybe I want a more finegrained check here at some point, but for now this will prevent logged iids from rerunning
print(f"{overwrite_iids =}")
logger.debug(f"{overwrite_iids =}")
iids_to_skip = set(iids_in_table) - set(overwrite_iids)
print(f"{iids_to_skip =}")
logger.debug(f"{iids_to_skip =}")
iids_filtered = list(set(iids) - iids_to_skip)
print(f"Pruned {len(iids) - len(iids_filtered)}/{len(iids)} iids from input list")
logger.info(f"Pruned {len(iids) - len(iids_filtered)}/{len(iids)} iids from input list")


if prune_iids:
iids_filtered = iids_filtered[0:200]

print(f"🚀 Requesting a total of {len(iids_filtered)} iids")

# Get the urls from ESGF at Runtime (only for the pruned list to save time)
url_dict = asyncio.run(
get_urls_from_esgf(
iids_filtered,
limit_per_host=20,
max_concurrency=20,
max_concurrency_response=20,
)
)
def combine_dicts(dicts):
result = {}
for d in dicts:
for key, value in d.items():
if key in result:
result[key].append(value)
else:
result[key] = [value]
return result


print(f"🚀 Requesting a total of {len(iids_filtered)} iids")
input_dict = client.get_recipe_inputs_from_iid_list(iids_filtered)
logger.debug(f"{input_dict=}")
input_dict_flat = {
iid: [(k, v) for k, v in data.items()] for iid, data in input_dict.items()
}
logger.debug(f"{input_dict_flat=}")
recipe_dict = {
iid: combine_dicts([i[1] for i in sorted(data)])
for iid, data in input_dict_flat.items()
}
logger.debug(f"{recipe_dict=}")

if prune_submission:
url_dict = {iid: url_dict[iid] for iid in list(url_dict.keys())[0:10]}
recipe_dict = {
iid: {k: v[0:10] for k, v in data.items()} for iid, data in recipe_dict.items()
}

print(f"🚀 Submitting a total of {len(url_dict)} iids")
print(f"🚀 Submitting a total of {len(recipe_dict)} iids")

# Print the actual urls
print(f"{url_dict = }")
logger.debug(f"{recipe_dict = }")


## Dynamic Chunking Wrapper
Expand Down Expand Up @@ -231,7 +238,8 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]:
## Create the recipes
recipes = {}

for iid, urls in url_dict.items():
for iid, data in recipe_dict.items():
urls = data["url"]
pattern = pattern_from_file_sequence(urls, concat_dim="time")
recipes[iid] = (
f"Creating {iid}" >> beam.Create(pattern.items())
Expand All @@ -244,7 +252,7 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]:
combine_dims=pattern.combine_dim_keys,
dynamic_chunking_fn=dynamic_chunking_func,
)
| InjectAttrs()
| InjectAttrs({"pangeo_forge_file_data": data})
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(
Expand Down
3 changes: 2 additions & 1 deletion feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
leap-data-management-utils==0.0.7
pangeo-forge-esgf==0.2.0
#pangeo-forge-esgf==0.2.0
git+https://github.com/jbusecke/pangeo-forge-esgf.git@new-request-scheme
dynamic-chunks==0.0.3
git+https://github.com/ranchodeluxe/xarray@ranchodeluxe-patch-1#egg=xarray
git+https://github.com/ranchodeluxe/rioxarray
Expand Down

0 comments on commit 2c66656

Please sign in to comment.