Skip to content

Commit

Permalink
Lazy implementation of multi_model_statistics and `ensemble_statist…
Browse files Browse the repository at this point in the history
…ics` preprocessors (#968)

Co-authored-by: Barbara Vreede <b.vreede@esciencecenter.nl>
Co-authored-by: Stef Smeets <s.smeets@esciencecenter.nl>
Co-authored-by: Bouwe Andela <b.andela@esciencecenter.nl>
  • Loading branch information
4 people authored Jun 6, 2023
1 parent 3067ce6 commit 83650df
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 48 deletions.
4 changes: 3 additions & 1 deletion esmvalcore/preprocessor/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ def save(cubes,
"The cube is probably unchanged.", cubes, filename)
return filename

logger.debug("Saving cubes %s to %s", cubes, filename)
for cube in cubes:
logger.debug("Saving cube:\n%s\nwith %s data to %s", cube,
"lazy" if cube.has_lazy_data() else "realized", filename)
if optimize_access:
cube = cubes[0]
if optimize_access == 'map':
Expand Down
110 changes: 68 additions & 42 deletions esmvalcore/preprocessor/_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,18 @@ def _map_to_new_time(cube, time_points):
Missing data inside original bounds is filled with nearest neighbour
Missing data outside original bounds is masked.
"""
time_points = cube.coord('time').units.num2date(time_points)
time_coord = cube.coord('time')

# Try if the required time points can be obtained by slicing the cube.
time_slice = np.isin(time_coord.points, time_points)
if np.any(time_slice) and np.array_equal(time_coord.points[time_slice],
time_points):
time_idx, = cube.coord_dims('time')
indices = tuple(time_slice if i == time_idx else slice(None)
for i in range(cube.ndim))
return cube[indices]

time_points = time_coord.units.num2date(time_points)
sample_points = [('time', time_points)]
scheme = iris.analysis.Nearest(extrapolation_mode='mask')

Expand Down Expand Up @@ -505,43 +516,17 @@ def _compute_eager(
"""Compute statistics one slice at a time."""
_ = [cube.data for cube in cubes] # make sure the cubes' data are realized

result_slices = []
result_slices = iris.cube.CubeList()
for chunk in _compute_slices(cubes):
if chunk is None:
single_model_slices = cubes # scalar cubes
input_slices = cubes # scalar cubes
else:
single_model_slices = [cube[chunk] for cube in cubes]
combined_slice = _combine(single_model_slices)
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message=(
"Collapsing a non-contiguous coordinate. "
f"Metadata may not be fully descriptive for '{CONCAT_DIM}."
),
category=UserWarning,
module='iris',
)
warnings.filterwarnings(
'ignore',
message=(
f"Cannot check if coordinate is contiguous: Invalid "
f"operation for '{CONCAT_DIM}'"
),
category=UserWarning,
module='iris',
)
collapsed_slice = combined_slice.collapsed(CONCAT_DIM, operator,
**kwargs)

# some iris aggregators modify dtype, see e.g.
# https://numpy.org/doc/stable/reference/generated/numpy.ma.average.html
collapsed_slice.data = collapsed_slice.data.astype(np.float32)

result_slices.append(collapsed_slice)
input_slices = [cube[chunk] for cube in cubes]
result_slice = _compute(input_slices, operator=operator, **kwargs)
result_slices.append(result_slice)

try:
result_cube = CubeList(result_slices).concatenate_cube()
result_cube = result_slices.concatenate_cube()
except Exception as excinfo:
raise ValueError(
f"Multi-model statistics failed to concatenate results into a "
Expand All @@ -551,7 +536,45 @@ def _compute_eager(
f"dtypes") from excinfo

result_cube.data = np.ma.array(result_cube.data)

return result_cube


def _compute(cubes: list, *, operator: iris.analysis.Aggregator, **kwargs):
"""Compute statistic."""
cube = _combine(cubes)

with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message=(
"Collapsing a non-contiguous coordinate. "
f"Metadata may not be fully descriptive for '{CONCAT_DIM}."
),
category=UserWarning,
module='iris',
)
warnings.filterwarnings(
'ignore',
message=(
f"Cannot check if coordinate is contiguous: Invalid "
f"operation for '{CONCAT_DIM}'"
),
category=UserWarning,
module='iris',
)
# This will always return a masked array
result_cube = cube.collapsed(CONCAT_DIM, operator, **kwargs)

# Remove concatenation dimension added by _combine
result_cube.remove_coord(CONCAT_DIM)
for cube in cubes:
cube.remove_coord(CONCAT_DIM)

# some iris aggregators modify dtype, see e.g.
# https://numpy.org/doc/stable/reference/generated/numpy.ma.average.html
result_cube.data = result_cube.core_data().astype(np.float32)

if result_cube.cell_methods:
cell_method = result_cube.cell_methods[0]
result_cube.cell_methods = None
Expand All @@ -578,12 +601,12 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False):
)

# Avoid modifying inputs
copied_cubes = [cube.copy() for cube in cubes]
cubes = [cube.copy() for cube in cubes]

# Remove scalar coordinates in input cubes if desired to ignore them when
# merging
if ignore_scalar_coords:
for cube in copied_cubes:
for cube in cubes:
for scalar_coord in cube.coords(dimensions=()):
cube.remove_coord(scalar_coord)
logger.debug(
Expand All @@ -595,11 +618,11 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False):

# If all cubes contain a time coordinate, align them. If no cube contains a
# time coordinate, do nothing. Else, raise an exception.
time_coords = [cube.coords('time') for cube in copied_cubes]
time_coords = [cube.coords('time') for cube in cubes]
if all(time_coords):
aligned_cubes = _align_time_coord(copied_cubes, span=span)
cubes = _align_time_coord(cubes, span=span)
elif not any(time_coords):
aligned_cubes = copied_cubes
pass
else:
raise ValueError(
"Multi-model statistics failed to merge input cubes into a single "
Expand All @@ -609,13 +632,14 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False):

# Calculate statistics
statistics_cubes = {}
lazy_input = any(cube.has_lazy_data() for cube in cubes)
for statistic in statistics:
logger.debug('Multicube statistics: computing: %s', statistic)
operator, kwargs = _resolve_operator(statistic)

result_cube = _compute_eager(aligned_cubes,
operator=operator,
**kwargs)
if lazy_input and operator.lazy_func is not None:
result_cube = _compute(cubes, operator=operator, **kwargs)
else:
result_cube = _compute_eager(cubes, operator=operator, **kwargs)
statistics_cubes[statistic] = result_cube

return statistics_cubes
Expand Down Expand Up @@ -720,6 +744,8 @@ def multi_model_statistics(products,
arguments. Except for percentiles, these operators are currently not
supported.
Lazy operation is supported for all statistics, except ``median``.
Parameters
----------
products: list
Expand Down
28 changes: 28 additions & 0 deletions esmvalcore/preprocessor/_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pathlib import Path
from typing import Dict

import dask.array as da
import iris
import numpy as np
import stratify
Expand Down Expand Up @@ -650,11 +651,38 @@ def regrid(cube, target_grid, scheme, lat_offset=True, lon_offset=True):
# scheme is a function f(src_cube, grid_cube) -> Cube
cube = loaded_scheme
else:
cube = _rechunk(cube, target_grid)
cube = cube.regrid(target_grid, loaded_scheme)

return cube


def _rechunk(cube, target_grid):
"""Re-chunk cube with optimal chunk sizes for target grid."""
if not cube.has_lazy_data() or cube.ndim < 3:
# Only rechunk lazy multidimensional data
return cube

if 2 * np.prod(cube.shape[-2:]) > np.prod(target_grid.shape):
# Only rechunk if target grid is more than a factor of 2 larger,
# because rechunking will keep the original chunk in memory.
return cube

data = cube.lazy_data()

# Compute a good chunk size for the target array
tgt_shape = data.shape[:-2] + target_grid.shape
tgt_chunks = data.chunks[:-2] + target_grid.shape
tgt_data = da.empty(tgt_shape, dtype=data.dtype, chunks=tgt_chunks)
tgt_data = tgt_data.rechunk({i: "auto" for i in range(cube.ndim - 2)})

# Adjust chunks to source array and rechunk
chunks = tgt_data.chunks[:-2] + data.shape[-2:]
cube.data = data.rechunk(chunks)

return cube


def _horizontal_grid_is_close(cube1, cube2):
"""Check if two cubes have the same horizontal grid definition.
Expand Down
6 changes: 3 additions & 3 deletions tests/sample_data/multimodel_statistics/test_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
SPAN_PARAMS = ('overlap', 'full')


def assert_array_almost_equal(this, other):
def assert_array_almost_equal(this, other, rtol=1e-7):
"""Assert that array `this` almost equals array `other`."""
if np.ma.isMaskedArray(this) or np.ma.isMaskedArray(other):
np.testing.assert_array_equal(this.mask, other.mask)

np.testing.assert_allclose(this, other)
np.testing.assert_allclose(this, other, rtol=rtol)


def assert_coords_equal(this: list, other: list):
Expand Down Expand Up @@ -188,7 +188,7 @@ def multimodel_regression_test(cubes, span, name):
if filename.exists():
reference_cube = iris.load_cube(str(filename))

assert_array_almost_equal(result_cube.data, reference_cube.data)
assert_array_almost_equal(result_cube.data, reference_cube.data, 5e-7)
assert_metadata_equal(result_cube.metadata, reference_cube.metadata)
assert_coords_equal(result_cube.coords(), reference_cube.coords())

Expand Down
2 changes: 0 additions & 2 deletions tests/unit/preprocessor/_multimodel/test_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def test_multimodel_statistics(frequency, span, statistics, expected):
assert_array_allclose(result_cube.data, expected_data)


@pytest.mark.xfail(reason='Lazy data not (yet) supported.')
@pytest.mark.parametrize('span', SPAN_OPTIONS)
def test_lazy_data_consistent_times(span):
"""Test laziness of multimodel statistics with consistent time axis."""
Expand All @@ -362,7 +361,6 @@ def test_lazy_data_consistent_times(span):
assert result_cube.has_lazy_data()


@pytest.mark.xfail(reason='Lazy data not (yet) supported.')
@pytest.mark.parametrize('span', SPAN_OPTIONS)
def test_lazy_data_inconsistent_times(span):
"""Test laziness of multimodel statistics with inconsistent time axis.
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/preprocessor/_regrid/test_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import unittest
from unittest import mock

import dask
import dask.array as da
import iris
import numpy as np
import pytest
Expand All @@ -14,6 +16,7 @@
_CACHE,
HORIZONTAL_SCHEMES,
_horizontal_grid_is_close,
_rechunk,
)


Expand Down Expand Up @@ -65,6 +68,7 @@ def setUp(self):
regrid=self.regrid,
dtype=float,
)
self.src_cube.ndim = 1
self.tgt_grid_coord = mock.Mock()
self.tgt_grid = mock.Mock(
spec=iris.cube.Cube, coord=self.tgt_grid_coord)
Expand Down Expand Up @@ -262,5 +266,60 @@ def test_regrid_is_skipped_if_grids_are_the_same():
assert expected_different_cube is not cube


def test_rechunk_on_increased_grid():
"""Test that an increase in grid size rechunks."""
with dask.config.set({'array.chunk-size': '128 M'}):

time_dim = 246
src_grid_dims = (91, 180)
data = da.empty((time_dim, ) + src_grid_dims, dtype=np.float32)

tgt_grid_dims = (361, 720)
tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32)

result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid))

assert result.core_data().chunks == ((123, 123), (91, ), (180, ))


def test_no_rechunk_on_decreased_grid():
"""Test that a decrease in grid size does not rechunk."""
with dask.config.set({'array.chunk-size': '128 M'}):

time_dim = 200
src_grid_dims = (361, 720)
data = da.empty((time_dim, ) + src_grid_dims, dtype=np.float32)

tgt_grid_dims = (91, 180)
tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32)

result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid))

assert result.core_data().chunks == data.chunks


def test_no_rechunk_2d():
"""Test that a 2D cube is not rechunked."""
with dask.config.set({'array.chunk-size': '64 MiB'}):

src_grid_dims = (361, 720)
data = da.empty(src_grid_dims, dtype=np.float32)

tgt_grid_dims = (3601, 7200)
tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32)

result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid))

assert result.core_data().chunks == data.chunks


def test_no_rechunk_non_lazy():
"""Test that a cube with non-lazy data does not crash."""
cube = iris.cube.Cube(np.arange(2 * 4).reshape([1, 2, 4]))
tgt_cube = iris.cube.Cube(np.arange(4 * 8).reshape([4, 8]))
result = _rechunk(cube, tgt_cube)
assert result.data is cube.data


if __name__ == '__main__':
unittest.main()

0 comments on commit 83650df

Please sign in to comment.