Skip to content

Commit

Permalink
Factor out all custom stages (#142)
Browse files Browse the repository at this point in the history
* Factor out all custom stages

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update requirements.txt

* Update recipe.py

* Update requirements.txt

* Update recipe.py

* Update recipe.py

* Update recipe.py

* REduce PR iids

* Update recipe.py

* Update requirements.txt

* Update iids_pr.yaml

* Update requirements.txt

* Update recipe.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jbusecke and pre-commit-ci[bot] authored May 1, 2024
1 parent 3986455 commit 017b1fc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 101 deletions.
2 changes: 1 addition & 1 deletion feedstock/iids_pr.yaml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
- "CMIP6.*.*.*.historical.*.Omon.[tos, so].*.*"
- "CMIP6.*.*.[CNRM-CM6-1,CanESM5].historical.*.Omon.[tos, so].*.*"
113 changes: 14 additions & 99 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@
"""Modified transforms from Pangeo Forge"""

import apache_beam as beam
from dataclasses import dataclass
from typing import List, Dict
from dask.utils import parse_bytes
from pangeo_forge_esgf import get_urls_from_esgf, setup_logging
from leap_data_management_utils import CMIPBQInterface, LogCMIPToBigQuery
from leap_data_management_utils.cmip_testing import test_all
from leap_data_management_utils.data_management_transforms import Copy, InjectAttrs
from leap_data_management_utils.cmip_transforms import TestDataset, Preprocessor
from pangeo_forge_esgf.parsing import parse_instance_ids
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
Indexed,
T,
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
)
Expand All @@ -24,108 +22,21 @@
import os
import xarray as xr
import yaml
import zarr

logger = logging.getLogger(__name__)

# Custom Beam Transforms


@dataclass
class Preprocessor(beam.PTransform):
"""
Preprocessor for xarray datasets.
Set all data_variables except for `variable_id` attrs to coord
Add additional information
"""

@staticmethod
def _keep_only_variable_id(item: Indexed[T]) -> Indexed[T]:
"""
Many netcdfs contain variables other than the one specified in the `variable_id` facet.
Set them all to coords
"""
index, ds = item
print(f"Preprocessing before {ds =}")
new_coords_vars = [
var for var in ds.data_vars if var != ds.attrs["variable_id"]
]
ds = ds.set_coords(new_coords_vars)
print(f"Preprocessing after {ds =}")
return index, ds

@staticmethod
def _sanitize_attrs(item: Indexed[T]) -> Indexed[T]:
"""Removes non-ascii characters from attributes see https://github.com/pangeo-forge/pangeo-forge-recipes/issues/586"""
index, ds = item
for att, att_value in ds.attrs.items():
if isinstance(att_value, str):
new_value = att_value.encode("utf-8", "ignore").decode()
if new_value != att_value:
print(
f"Sanitized datasets attributes field {att}: \n {att_value} \n ----> \n {new_value}"
)
ds.attrs[att] = new_value
return index, ds

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return (
pcoll
| "Fix coordinates" >> beam.Map(self._keep_only_variable_id)
| "Sanitize Attrs" >> beam.Map(self._sanitize_attrs)
)


@dataclass
class TestDataset(beam.PTransform):
"""
Test stage for data written to zarr store
"""

iid: str

def _test(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
test_all(store, self.iid)
return store

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | "Testing - Running all tests" >> beam.Map(self._test)


@dataclass
class Copy(beam.PTransform):
target_prefix: str

def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# We do need the gs:// prefix?
# TODO: Determine this dynamically from zarr.storage.FSStore
source = f"gs://{os.path.normpath(store.path)}/" # FIXME more elegant. `.copytree` needs trailing slash
target = os.path.join(*[self.target_prefix] + source.split("/")[-3:])
# gcs = gcsio.GcsIO()
# gcs.copytree(source, target)
print(f"HERE: Copying {source} to {target}")
import gcsfs

fs = gcsfs.GCSFileSystem()
fs.cp(source, target, recursive=True)
# return a new store with the new path that behaves exactly like the input
# to this stage (so we can slot this stage right before testing/logging stages)
return zarr.storage.FSStore(target)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | "Copying Store" >> beam.Map(self._copy)


## Create recipes
is_test = (
os.environ["IS_TEST"] == "true"
) # There must be a better way to do this, but for now this will do
print(f"{is_test =}")

run_id = os.environ["GITHUB_RUN_ID"]
run_attempt = os.environ["GITHUB_RUN_ATTEMPT"]

if is_test:
setup_logging("DEBUG")
copy_target_bucket = "gs://leap-scratch/data-library/cmip6-pr-copied/"
copy_target_prefix = "gs://leap-scratch/data-library/cmip6-pr-copied/"
iid_file = "feedstock/iids_pr.yaml"
prune_iids = True
prune_submission = (
Expand All @@ -146,7 +57,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:

else:
setup_logging("INFO")
copy_target_bucket = "gs://cmip6/cmip6-pgf-ingestion-test/zarr_stores/"
copy_target_prefix = "gs://cmip6/cmip6-pgf-ingestion-test/zarr_stores/"
iid_file = "feedstock/iids.yaml"
prune_iids = False
prune_submission = (
Expand All @@ -157,7 +68,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
print(f"{table_id = } {prune_submission = } {iid_file = }")

print("Running with the following parameters:")
print(f"{copy_target_bucket = }")
print(f"{copy_target_prefix = }")
print(f"{iid_file = }")
print(f"{prune_iids = }")
print(f"{prune_submission = }")
Expand Down Expand Up @@ -321,7 +232,6 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]:
recipes = {}

for iid, urls in url_dict.items():
print(f"{copy_target_bucket = }")
pattern = pattern_from_file_sequence(urls, concat_dim="time")
recipes[iid] = (
f"Creating {iid}" >> beam.Create(pattern.items())
Expand All @@ -334,9 +244,14 @@ def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]:
combine_dims=pattern.combine_dim_keys,
dynamic_chunking_fn=dynamic_chunking_func,
)
| InjectAttrs()
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target_prefix=copy_target_bucket)
| Copy(
target=os.path.join(
copy_target_prefix, f"{run_id}_{run_attempt}", f"{iid}.zarr"
)
)
| "Logging to bigquery (non-QC)"
>> LogCMIPToBigQuery(iid=iid, table_id=table_id, tests_passed=False)
| TestDataset(iid=iid)
Expand Down
2 changes: 1 addition & 1 deletion feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
leap-data-management-utils==0.0.2
leap-data-management-utils==0.0.7
pangeo-forge-esgf==0.2.0
dynamic-chunks==0.0.3
git+https://github.com/ranchodeluxe/xarray@ranchodeluxe-patch-1#egg=xarray
Expand Down

0 comments on commit 017b1fc

Please sign in to comment.