diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 038f9d9337..6d24d4312d 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -47,7 +47,9 @@ def is_lazy_data(data): return result -def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")): +def _optimum_chunksize( + chunks, shape, limit=None, dtype=np.dtype("f4"), dims_fixed=None +): """ Reduce or increase an initial chunk shape to get close to a chosen ideal size, while prioritising the splitting of the earlier (outer) dimensions @@ -55,7 +57,7 @@ def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")): Args: - * chunks (tuple of int, or None): + * chunks (tuple of int): Pre-existing chunk shape of the target data : None if unknown. * shape (tuple of int): The full array shape of the target data. @@ -63,6 +65,11 @@ def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")): The 'ideal' target chunk size, in bytes. Default from dask.config. * dtype (np.dtype): Numpy dtype of target data. + * dims_fixed (list of bool): + If set, a list of values equal in length to 'chunks' or 'shape'. + 'True' values indicate a dimension that can not be changed, i.e. that + element of the result must equal the corresponding value in 'chunks' or + data.shape. Returns: * chunk (tuple of int): @@ -83,6 +90,9 @@ def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")): "chunks = [c[0] for c in normalise_chunks('auto', ...)]". """ + if chunks is None: + chunks = list(shape) + # Set the chunksize limit. if limit is None: # Fetch the default 'optimal' chunksize from the dask config. @@ -92,61 +102,93 @@ def _optimum_chunksize(chunks, shape, limit=None, dtype=np.dtype("f4")): point_size_limit = limit / dtype.itemsize - # Create result chunks, starting with a copy of the input. - result = list(chunks) - - if np.prod(result) < point_size_limit: - # If size is less than maximum, expand the chunks, multiplying later - # (i.e. inner) dims first. - i_expand = len(shape) - 1 - while np.prod(result) < point_size_limit and i_expand >= 0: - factor = np.floor(point_size_limit * 1.0 / np.prod(result)) - new_dim = result[i_expand] * int(factor) - if new_dim >= shape[i_expand]: - # Clip to dim size : chunk dims must not exceed the full shape. - new_dim = shape[i_expand] - else: - # 'new_dim' is less than the relevant dim of 'shape' -- but it - # is also the largest possible multiple of the input-chunks, - # within the size limit. - # So : 'i_expand' is the outer (last) dimension over which we - # will multiply the input chunks, and 'new_dim' is a value that - # ensures the fewest possible chunks within that dim. - - # Now replace 'new_dim' with the value **closest to equal-size - # chunks**, for the same (minimum) number of chunks. - # More-equal chunks are practically better. - # E.G. : "divide 8 into multiples of 2, with a limit of 7", - # produces new_dim=6, which would mean chunks of sizes (6, 2). - # But (4, 4) is clearly better for memory and time cost. - - # Calculate how many (expanded) chunks fit into this dimension. - dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim) - # Get "ideal" (equal) size for that many chunks. - ideal_equal_chunk_size = shape[i_expand] / dim_chunks - # Use the nearest whole multiple of input chunks >= ideal. - new_dim = int( - result[i_expand] - * np.ceil(ideal_equal_chunk_size / result[i_expand]) - ) - - result[i_expand] = new_dim - i_expand -= 1 + if dims_fixed is not None: + if not np.any(dims_fixed): + dims_fixed = None + + if dims_fixed is None: + # Get initial result chunks, starting with a copy of the input. + working = list(chunks) + else: + # Adjust the operation to ignore the 'fixed' dims. + # (We reconstruct the original later, before return). + chunks = np.array(chunks) + dims_fixed_arr = np.array(dims_fixed) + # Reduce the target size by the fixed size of all the 'fixed' dims. + point_size_limit = point_size_limit // np.prod(chunks[dims_fixed_arr]) + # Work on only the 'free' dims. + original_shape = tuple(shape) + shape = tuple(np.array(shape)[~dims_fixed_arr]) + working = list(chunks[~dims_fixed_arr]) + + if len(working) >= 1: + if np.prod(working) < point_size_limit: + # If size is less than maximum, expand the chunks, multiplying + # later (i.e. inner) dims first. + i_expand = len(shape) - 1 + while np.prod(working) < point_size_limit and i_expand >= 0: + factor = np.floor(point_size_limit * 1.0 / np.prod(working)) + new_dim = working[i_expand] * int(factor) + if new_dim >= shape[i_expand]: + # Clip to dim size : must not exceed the full shape. + new_dim = shape[i_expand] + else: + # 'new_dim' is less than the relevant dim of 'shape' -- but + # it is also the largest possible multiple of the + # input-chunks, within the size limit. + # So : 'i_expand' is the outer (last) dimension over which + # we will multiply the input chunks, and 'new_dim' is a + # value giving the fewest possible chunks within that dim. + + # Now replace 'new_dim' with the value **closest to + # equal-size chunks**, for the same (minimum) number of + # chunks. More-equal chunks are practically better. + # E.G. : "divide 8 into multiples of 2, with a limit of 7", + # produces new_dim=6, meaning chunks of sizes (6, 2). + # But (4, 4) is clearly better for memory and time cost. + + # Calculate how many (expanded) chunks fit in this dim. + dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim) + # Get "ideal" (equal) size for that many chunks. + ideal_equal_chunk_size = shape[i_expand] / dim_chunks + # Use the nearest whole multiple of input chunks >= ideal. + new_dim = int( + working[i_expand] + * np.ceil(ideal_equal_chunk_size / working[i_expand]) + ) + + working[i_expand] = new_dim + i_expand -= 1 + else: + # Similarly, reduce if too big, reducing earlier (outer) dims first. + i_reduce = 0 + while np.prod(working) > point_size_limit: + factor = np.ceil(np.prod(working) / point_size_limit) + new_dim = int(working[i_reduce] / factor) + if new_dim < 1: + new_dim = 1 + working[i_reduce] = new_dim + i_reduce += 1 + + working = tuple(working) + + if dims_fixed is None: + result = working else: - # Similarly, reduce if too big, reducing earlier (outer) dims first. - i_reduce = 0 - while np.prod(result) > point_size_limit: - factor = np.ceil(np.prod(result) / point_size_limit) - new_dim = int(result[i_reduce] / factor) - if new_dim < 1: - new_dim = 1 - result[i_reduce] = new_dim - i_reduce += 1 + # Reconstruct the original form + result = [] + for i_dim in range(len(original_shape)): + if dims_fixed[i_dim]: + dim = chunks[i_dim] + else: + dim = working[0] + working = working[1:] + result.append(dim) - return tuple(result) + return result -def as_lazy_data(data, chunks=None, asarray=False): +def as_lazy_data(data, chunks=None, asarray=False, dims_fixed=None): """ Convert the input array `data` to a dask array. @@ -165,6 +207,11 @@ def as_lazy_data(data, chunks=None, asarray=False): If True, then chunks will be converted to instances of `ndarray`. Set to False (default) to pass passed chunks through unchanged. + * dims_fixed (list of bool): + If set, a list of values equal in length to 'chunks' or data.ndim. + 'True' values indicate a dimension which can not be changed, i.e. the + result for that index must equal the value in 'chunks' or data.shape. + Returns: The input array converted to a dask array. @@ -186,7 +233,9 @@ def as_lazy_data(data, chunks=None, asarray=False): # PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0). if all(elem > 0 for elem in data.shape): # Expand or reduce the basic chunk shape to an optimum size. - chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype) + chunks = _optimum_chunksize( + chunks, shape=data.shape, dtype=data.dtype, dims_fixed=dims_fixed + ) if isinstance(data, ma.core.MaskedConstant): data = ma.masked_array(data.data, mask=data.mask) diff --git a/lib/iris/fileformats/netcdf.py b/lib/iris/fileformats/netcdf.py index 100ab29daa..f86e3438be 100644 --- a/lib/iris/fileformats/netcdf.py +++ b/lib/iris/fileformats/netcdf.py @@ -14,11 +14,15 @@ import collections import collections.abc +from contextlib import contextmanager +from copy import deepcopy from itertools import repeat, zip_longest import os import os.path import re import string +import threading +from typing import Iterable, Mapping, Union import warnings import cf_units @@ -549,10 +553,13 @@ def _get_actual_dtype(cf_var): def _get_cf_var_data(cf_var, filename): + global CHUNK_CONTROL + # Get lazy chunked data out of a cf variable. + # Creates Dask wrappers around data arrays for any cube components which + # can have lazy values, e.g. Cube, Coord, CellMeasure, AuxiliaryVariable. dtype = _get_actual_dtype(cf_var) - # Create cube with deferred data, but no metadata fill_value = getattr( cf_var.cf_data, "_FillValue", @@ -566,8 +573,28 @@ def _get_cf_var_data(cf_var, filename): chunks = cf_var.cf_data.chunking() # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. if chunks == "contiguous": - chunks = None - return as_lazy_data(proxy, chunks=chunks) + # Equivalent to chunks=None, but value required by chunking control + chunks = list(cf_var.shape) + + # Modify the chunking in the context of an active chunking control. + # N.B. settings specific to this named var override global ('*') ones. + dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get( + cf_var.name + ) or CHUNK_CONTROL.var_dim_chunksizes.get("*") + if not dim_chunks: + dims_fixed = None + else: + # Modify the chunks argument, and pass in a list of 'fixed' dims, for + # any of our dims which are controlled. + dims = cf_var.cf_data.dimensions + dims_fixed = np.zeros(len(dims), dtype=bool) + for i_dim, dim_name in enumerate(dims): + dim_chunksize = dim_chunks.get(dim_name) + if dim_chunksize: + chunks[i_dim] = dim_chunksize + dims_fixed[i_dim] = True + + return as_lazy_data(proxy, chunks=chunks, dims_fixed=dims_fixed) class OrderedAddableList(list): @@ -588,6 +615,18 @@ def add(self, msg): def _load_cube(engine, cf, cf_var, filename): + global CHUNK_CONTROL + + # Translate dimension chunk-settings specific to this cube (i.e. named by + # it's data-var) into global ones, for the duration of this load. + # Thus, by default, we will create any AuxCoords, CellMeasures et al with + # any per-dimension chunksizes specified for the cube. + these_settings = CHUNK_CONTROL.var_dim_chunksizes.get(cf_var.name, {}) + with CHUNK_CONTROL.set(**these_settings): + return _load_cube_inner(engine, cf, cf_var, filename) + + +def _load_cube_inner(engine, cf, cf_var, filename): from iris.cube import Cube """Create the cube associated with the CF-netCDF data variable.""" @@ -3206,3 +3245,114 @@ def is_valid_packspec(p): # Add conventions attribute. sman.update_global_attributes(Conventions=conventions) + + +class ChunkControl(threading.local): + def __init__(self, var_dim_chunksizes=None): + """ + Provide user control of Dask chunking. + + The netcdf loader is controlled by the single instance of this : the + :data:`~iris.fileformats.netcdf.CHUNK_CONTROL` object. + + A chunksize can be set for a specific (named) file dimension, when + loading specific (named) variables, or for all variables. + + When a selected variable is a CF data-variable, which loads as a + cube, then the given dimension chunksize is *also* fixed for all + variables which are components of that cube, i.e. its Coords, + CellMeasures, Ancillary variables, etc. + This can be overriden, if required, by variable-specific settings. + + For this purpose, Mesh coordinates and connectivities are *not* cube + components, and a chunk control on a cube data-variable will not affect + them. + + """ + self.var_dim_chunksizes = var_dim_chunksizes or {} + + @contextmanager + def set( + self, + var_names: Union[str, Iterable[str]] = None, + **dimension_chunksizes: Mapping[str, int], + ) -> None: + """ + Control the Dask chunksizes applied to netcdf variables during loading. + + Parameters + ---------- + var_names : str or list of str + apply the ``dimension_chunksizes`` controls only to these variables, + or when building cubes from these data variables. + If None (the default), settings apply to all loaded variables. + dimension_chunksizes : dict: str --> int + Kwargs specifying chunksizes for dimensions of file variables. + Each key-value pair defines a chunksize for a named file + dimension, e.g. {'time': 10, 'model_levels':1}. + + Notes + ----- + This function acts as a contextmanager, for use in a 'with' block. + + Example: + + >>> from iris.fileformats.netcdf import CHUNK_CONTROL + >>> with CHUNK_CONTROL.set('var1', model_level=1, time=50): + ... cubes = iris.load(filename) + + When ``var_names`` is present, the chunksize adjustments are applied + only to the selected variables. However, for a CF data variable, this + extends to all components of the (raw) cube created from it. + + **Un**-adjusted dimensions have chunk sizes set in the 'usual' way. + That is, according to the normal behaviour of + :meth:`iris._lazy_data.as_lazy_data`, which is : chunksize is based on + the file variable chunking, or full variable shape; this is scaled up + or down by integer factors to best match the Dask "default chunksize", + i.e. the setting configured by + ``dask.config.set({'array.chunk-size': '250MiB'})``. + + """ + old_settings = deepcopy(self.var_dim_chunksizes) + if var_names is None: + var_names = ["*"] + elif isinstance(var_names, str): + var_names = [var_names] + try: + for var_name in var_names: + # Note: here we simply treat '*' as another name. + # A specific name match should override a '*' setting, but + # that is implemented elsewhere. + if not isinstance(var_name, str): + msg = ( + "'var_names' should be an iterable of strings, " + f"not {var_names!r}." + ) + raise ValueError(msg) + dim_chunks = self.var_dim_chunksizes.setdefault(var_name, {}) + for dim_name, chunksize in dimension_chunksizes.items(): + if not ( + isinstance(dim_name, str) + and isinstance(chunksize, int) + ): + msg = ( + "'dimension_chunksizes' kwargs should be an iterable " + f"of (str, int) pairs, not {dimension_chunksizes!r}." + ) + raise ValueError(msg) + dim_chunks[dim_name] = chunksize + yield + finally: + self.var_dim_chunksizes = old_settings + + +# Note: the CHUNK_CONTROL object controls chunk sizing in the +# :meth:`_get_cf_var_data` method. +# N.B. :meth:`_load_cube` also modifies this when loading each cube, +# introducing an additional context in which any cube-specific settings are +# 'promoted' into being global ones. + +#: A :class:`ChunkControl` object providing user-control of Dask chunking +#: when Iris loads netcdf files. +CHUNK_CONTROL: ChunkControl = ChunkControl() diff --git a/lib/iris/tests/integration/test_netcdf__chunk_control.py b/lib/iris/tests/integration/test_netcdf__chunk_control.py new file mode 100644 index 0000000000..413af50a23 --- /dev/null +++ b/lib/iris/tests/integration/test_netcdf__chunk_control.py @@ -0,0 +1,94 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the LGPL license. +# See COPYING and COPYING.LESSER in the root of the repository for full +# licensing details. +"""Integration tests for loading and saving netcdf files.""" + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests # isort:skip + +from pathlib import Path +import shutil +import tempfile + +import iris +from iris.fileformats.netcdf import CHUNK_CONTROL +import iris.tests.stock as istk + + +class TestChunking(tests.IrisTest): + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.mkdtemp() + cube = istk.simple_4d_with_hybrid_height() + cls.cube = cube + cls.cube_varname = "my_var" + cls.sigma_varname = "my_sigma" + cube.var_name = cls.cube_varname + cube.coord("sigma").var_name = cls.sigma_varname + cube.coord("sigma").guess_bounds() + cls.tempfile_path = Path(cls.temp_dir) / "tmp.nc" + iris.save(cls.cube, cls.tempfile_path) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir) + + def test_default(self): + cube = iris.load_cube(self.tempfile_path, self.cube_varname) + self.assertEqual((3, 4, 5, 6), cube.shape) + self.assertEqual((3, 4, 5, 6), cube.lazy_data().chunksize) + sigma = cube.coord("sigma") + self.assertEqual((4,), sigma.shape) + self.assertEqual((4,), sigma.lazy_points().chunksize) + self.assertEqual((4, 2), sigma.lazy_bounds().chunksize) + + def test_control_global(self): + with CHUNK_CONTROL.set(model_level_number=2): + cube = iris.load_cube(self.tempfile_path, self.cube_varname) + self.assertEqual((3, 4, 5, 6), cube.shape) + self.assertEqual((3, 2, 5, 6), cube.lazy_data().chunksize) + sigma = cube.coord("sigma") + self.assertEqual((4,), sigma.shape) + self.assertEqual((2,), sigma.lazy_points().chunksize) + self.assertEqual((2, 2), sigma.lazy_bounds().chunksize) + + def test_control_sigma_only(self): + with CHUNK_CONTROL.set(self.sigma_varname, model_level_number=2): + cube = iris.load_cube(self.tempfile_path, self.cube_varname) + self.assertEqual((3, 4, 5, 6), cube.shape) + self.assertEqual((3, 4, 5, 6), cube.lazy_data().chunksize) + sigma = cube.coord("sigma") + self.assertEqual((4,), sigma.shape) + self.assertEqual((2,), sigma.lazy_points().chunksize) + # N.B. this does not apply to bounds array + self.assertEqual((4, 2), sigma.lazy_bounds().chunksize) + + def test_control_cube_var(self): + with CHUNK_CONTROL.set(self.cube_varname, model_level_number=2): + cube = iris.load_cube(self.tempfile_path, self.cube_varname) + self.assertEqual((3, 4, 5, 6), cube.shape) + self.assertEqual((3, 2, 5, 6), cube.lazy_data().chunksize) + sigma = cube.coord("sigma") + self.assertEqual((4,), sigma.shape) + self.assertEqual((2,), sigma.lazy_points().chunksize) + # N.B. this does not apply to bounds array + self.assertEqual((2, 2), sigma.lazy_bounds().chunksize) + + def test_control_multiple(self): + with CHUNK_CONTROL.set( + self.cube_varname, model_level_number=2 + ), CHUNK_CONTROL.set(self.sigma_varname, model_level_number=3): + cube = iris.load_cube(self.tempfile_path, self.cube_varname) + self.assertEqual((3, 4, 5, 6), cube.shape) + self.assertEqual((3, 2, 5, 6), cube.lazy_data().chunksize) + sigma = cube.coord("sigma") + self.assertEqual((4,), sigma.shape) + self.assertEqual((3,), sigma.lazy_points().chunksize) + self.assertEqual((2, 2), sigma.lazy_bounds().chunksize) + + +if __name__ == "__main__": + tests.main()