diff --git a/leap_data_management_utils/cmip_transforms.py b/leap_data_management_utils/cmip_transforms.py index 122e347..7b8f7c9 100644 --- a/leap_data_management_utils/cmip_transforms.py +++ b/leap_data_management_utils/cmip_transforms.py @@ -3,10 +3,19 @@ """ import datetime +import logging +import warnings from dataclasses import dataclass import apache_beam as beam +import xarray as xr import zarr +from dask.utils import parse_bytes +from dynamic_chunks.algorithms import ( + NoMatchingChunks, + even_divisor_algo, + iterative_ratio_increase_algo, +) from google.cloud import bigquery from pangeo_forge_recipes.transforms import Indexed, T from tqdm.auto import tqdm @@ -14,6 +23,75 @@ from leap_data_management_utils.cmip_testing import test_all from leap_data_management_utils.data_management_transforms import BQInterface +# TODO: I am not sure the chunking function belongs here, but it clutters the recipe and I did not want +# To open a whole file for this. +logger = logging.getLogger(__name__) + + +## Dynamic Chunking Wrapper +def dynamic_chunking_func(ds: xr.Dataset) -> dict[str, int]: + logger.info(f'Input Dataset for dynamic chunking {ds =}') + + target_chunk_size = '150MB' + target_chunks_aspect_ratio = { + 'time': 10, + 'x': 1, + 'i': 1, + 'ni': 1, + 'xh': 1, + 'nlon': 1, + 'lon': 1, # TODO: Maybe import all the known spatial dimensions from xmip? + 'y': 1, + 'j': 1, + 'nj': 1, + 'yh': 1, + 'nlat': 1, + 'lat': 1, + } + size_tolerance = 0.5 + + # Some datasets are smaller than the target chunk size and should not be chunked at all + if ds.nbytes < parse_bytes(target_chunk_size): + target_chunks = dict(ds.dims) + + else: + try: + target_chunks = even_divisor_algo( + ds, + target_chunk_size, + target_chunks_aspect_ratio, + size_tolerance, + allow_extra_dims=True, + ) + + except NoMatchingChunks: + warnings.warn( + 'Primary algorithm using even divisors along each dimension failed ' + 'with. Trying secondary algorithm.' + f'Input {ds=}' + ) + try: + target_chunks = iterative_ratio_increase_algo( + ds, + target_chunk_size, + target_chunks_aspect_ratio, + size_tolerance, + allow_extra_dims=True, + ) + except NoMatchingChunks: + raise ValueError( + 'Could not find any chunk combinations satisfying ' + 'the size constraint with either algorithm.' + f'Input {ds=}' + ) + # If something fails + except Exception as e: + raise e + except Exception as e: + raise e + logger.info(f'Dynamic Chunking determined {target_chunks =}') + return target_chunks + @dataclass class IIDEntry: diff --git a/leap_data_management_utils/tests/test_cmip_transforms.py b/leap_data_management_utils/tests/test_cmip_transforms.py index 72cc6b7..f7f6389 100644 --- a/leap_data_management_utils/tests/test_cmip_transforms.py +++ b/leap_data_management_utils/tests/test_cmip_transforms.py @@ -1,6 +1,8 @@ +import numpy as np import pytest +import xarray as xr -from leap_data_management_utils.cmip_transforms import IIDEntry +from leap_data_management_utils.cmip_transforms import IIDEntry, dynamic_chunking_func class TestIIDEntry: @@ -24,4 +26,11 @@ def test_too_long(self): IIDEntry(iid, store, retracted, tests_passed) +class TestDynamicChunks: + def test_too_small(self): + ds = xr.DataArray(np.random.rand(4, 6)).to_dataset(name='data') + chunks = dynamic_chunking_func(ds) + assert chunks == {'dim_0': 4, 'dim_1': 6} + + # TODO Its super hard to test anything involving big query, because AFAIK there is no way to mock it. diff --git a/pyproject.toml b/pyproject.toml index f31140a..e3e9eb8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,8 @@ pangeo-forge=[ "pangeo-forge-esgf", "pangeo-forge-recipes", "apache-beam", + "dynamic-chunks", + "dask" ] catalog = [