Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request 144 + testing new pangeo-forge-esgf client #145

Merged
merged 27 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2de6125
Request 144
jbusecke May 1, 2024
298cea7
Update iids_pr.yaml
jbusecke May 1, 2024
b8bdda6
Update iids_pr.yaml
jbusecke May 1, 2024
0df0d7a
Try new esgf client
jbusecke May 6, 2024
ea05400
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 6, 2024
a41bd6d
switch pangeo-forge-esgf dependency to pr branch
jbusecke May 6, 2024
214fc07
Update recipe.py
jbusecke May 6, 2024
17edbd4
Update recipe.py
jbusecke May 6, 2024
7deaa97
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 6, 2024
347fb74
Update recipe.py
jbusecke May 6, 2024
51c0ad2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 6, 2024
fb34aa8
Update recipe.py
jbusecke May 6, 2024
029cf49
Update recipe.py
jbusecke May 6, 2024
e35c720
Update recipe.py
jbusecke May 7, 2024
951819c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 7, 2024
6b521b8
Update recipe.py
jbusecke May 7, 2024
ed871b5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 7, 2024
7aade7d
Update recipe.py
jbusecke May 7, 2024
2eb595d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 7, 2024
2dc6553
Update recipe.py
jbusecke May 7, 2024
1e3c797
Update recipe.py
jbusecke May 7, 2024
d20efa8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 7, 2024
5bb730d
Update recipe.py
jbusecke May 7, 2024
e3954b7
Update iids.yaml
jbusecke May 7, 2024
1a1ba32
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 7, 2024
e701a19
Update iids_pr.yaml
jbusecke May 7, 2024
9323229
Update iids_pr.yaml
jbusecke May 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions feedstock/iids.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# from https://github.com/Timh37/CMIP6cex/issues/2
- "CMIP6.*.*.*.[historical, ssp245, ssp585].*.day.[psl, pr, sfcWind].*.*"
# from
- 'CMIP6.HighResMIP.CMCC.CMCC-CM2-VHR4.[hist-1950,highres-future].r1i1p1f1.6hrPlevPt.[vas,uas,psl].gn.*'
- 'CMIP6.HighResMIP.EC-Earth-Consortium.EC-Earth3P-HR.[hist-1950,highres-future].r1i1p2f1.6hrPlevPt.[vas,uas,psl].gr.*'
- 'CMIP6.HighResMIP.MOHC.HadGEM3-GC31-HM.[hist-1950,highres-future].r1i1p1f1.[3hr, E3hr].[vas,uas,psl].gn.*'
# from https://github.com/pangeo-forge/cmip6-feedstock/issues/22
- "CMIP6.*.*.*.[historical, ssp126, ssp245, ssp585].*.Omon.zmeso.*.*"
# PMIP velocities
Expand Down
2 changes: 1 addition & 1 deletion feedstock/iids_pr.yaml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
- "CMIP6.*.*.[CNRM-CM6-1,CanESM5].historical.*.Omon.[tos, so].*.*"
- "CMIP6.*.*.[CNRM-CM6-1,CanESM5].historical.*.Omon.[tos, so].*.*"
100 changes: 54 additions & 46 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
"""Modified transforms from Pangeo Forge"""

import apache_beam as beam
from typing import List, Dict
from typing import Dict
from dask.utils import parse_bytes
from pangeo_forge_esgf import get_urls_from_esgf, setup_logging
from pangeo_forge_esgf import setup_logging
from leap_data_management_utils import CMIPBQInterface, LogCMIPToBigQuery
from leap_data_management_utils.data_management_transforms import Copy, InjectAttrs
from leap_data_management_utils.cmip_transforms import TestDataset, Preprocessor
from pangeo_forge_esgf.parsing import parse_instance_ids
from pangeo_forge_esgf.client import ESGFClient
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
Expand All @@ -17,7 +17,6 @@
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
)
import asyncio
import logging
import os
import xarray as xr
Expand Down Expand Up @@ -79,33 +78,26 @@
iids_raw = yaml.safe_load(f)
iids_raw = [iid for iid in iids_raw if iid]


def parse_wildcards(iids: List[str]) -> List[str]:
"""iterate through each list element and
if it contains wilcards apply the wildcard parser
"""
iids_parsed = []
for iid in iids:
if "*" in iid:
iids_parsed += parse_instance_ids(iid)
else:
iids_parsed.append(iid)
return iids_parsed


# parse out wildcard iids using pangeo-forge-esgf
print(f"{iids_raw = }")
iids = parse_wildcards(iids_raw)
print(f"{iids = }")

# exclude dupes
iids = list(set(iids))
# parse out wildcard/square brackets using pangeo-forge-esgf
logger.debug(f"{iids_raw = }")

client = ESGFClient(
file_output_fields=[
"pid",
"tracking_id",
"further_info_url",
"citation_url",
"checksum",
"checksum_type",
],
dataset_output_fields=["pid", "tracking_id", "further_info_url", "citation_url"],
)
iids = client.expand_instance_id_list(iids_raw)
logger.info(f"{iids = }")

# Prune the url dict to only include items that have not been logged to BQ yet
print("Pruning iids that already exist")

logger.info("Pruning iids that already exist")
bq_interface = CMIPBQInterface(table_id=table_id)

# get lists of the iids already logged
iids_in_table = bq_interface.iid_list_exists(iids)

Expand All @@ -121,35 +113,50 @@ def parse_wildcards(iids: List[str]) -> List[str]:
del bq_interface

# Maybe I want a more finegrained check here at some point, but for now this will prevent logged iids from rerunning
print(f"{overwrite_iids =}")
logger.debug(f"{overwrite_iids =}")
iids_to_skip = set(iids_in_table) - set(overwrite_iids)
print(f"{iids_to_skip =}")
logger.debug(f"{iids_to_skip =}")
iids_filtered = list(set(iids) - iids_to_skip)
print(f"Pruned {len(iids) - len(iids_filtered)}/{len(iids)} iids from input list")
logger.info(f"Pruned {len(iids) - len(iids_filtered)}/{len(iids)} iids from input list")


if prune_iids:
iids_filtered = iids_filtered[0:200]

print(f"🚀 Requesting a total of {len(iids_filtered)} iids")

# Get the urls from ESGF at Runtime (only for the pruned list to save time)
url_dict = asyncio.run(
get_urls_from_esgf(
iids_filtered,
limit_per_host=20,
max_concurrency=20,
max_concurrency_response=20,
)
)
def combine_dicts(dicts):
result = {}
for d in dicts:
for key, value in d.items():
if key in result:
result[key].append(value)
else:
result[key] = [value]
return result


print(f"🚀 Requesting a total of {len(iids_filtered)} iids")
input_dict = client.get_recipe_inputs_from_iid_list(iids_filtered)
logger.debug(f"{input_dict=}")
input_dict_flat = {
iid: [(k, v) for k, v in data.items()] for iid, data in input_dict.items()
}
logger.debug(f"{input_dict_flat=}")
recipe_dict = {
iid: combine_dicts([i[1] for i in sorted(data)])
for iid, data in input_dict_flat.items()
}
logger.debug(f"{recipe_dict=}")

if prune_submission:
url_dict = {iid: url_dict[iid] for iid in list(url_dict.keys())[0:10]}
recipe_dict = {
iid: {k: v[0:10] for k, v in data.items()} for iid, data in recipe_dict.items()
}

print(f"🚀 Submitting a total of {len(url_dict)} iids")
print(f"🚀 Submitting a total of {len(recipe_dict)} iids")

# Print the actual urls
print(f"{url_dict = }")
logger.debug(f"{recipe_dict = }")


## Dynamic Chunking Wrapper
Expand Down Expand Up @@ -231,7 +238,8 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]:
## Create the recipes
recipes = {}

for iid, urls in url_dict.items():
for iid, data in recipe_dict.items():
urls = data["url"]
pattern = pattern_from_file_sequence(urls, concat_dim="time")
recipes[iid] = (
f"Creating {iid}" >> beam.Create(pattern.items())
Expand All @@ -244,7 +252,7 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]:
combine_dims=pattern.combine_dim_keys,
dynamic_chunking_fn=dynamic_chunking_func,
)
| InjectAttrs()
| InjectAttrs({"pangeo_forge_file_data": data})
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(
Expand Down
3 changes: 2 additions & 1 deletion feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
leap-data-management-utils==0.0.7
pangeo-forge-esgf==0.2.0
#pangeo-forge-esgf==0.2.0
git+https://github.com/jbusecke/pangeo-forge-esgf.git@new-request-scheme
dynamic-chunks==0.0.3
git+https://github.com/ranchodeluxe/xarray@ranchodeluxe-patch-1#egg=xarray
git+https://github.com/ranchodeluxe/rioxarray
Expand Down
Loading