Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make weighted aggregation lazy again #5341

Merged
merged 9 commits into from
Jun 26, 2023
4 changes: 4 additions & 0 deletions docs/src/whatsnew/3.6.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ This document explains the changes made to Iris for this release
properly updated.
(:issue:`5339`, :pull:`5340`)

#. `@schlunma`_ fixed a bug which realized all weights during weighted
aggregation. Now weighted aggregation is fully lazy again.
(:issue:`5338`, :pull:`5341`)

🚀 **Performance Enhancements**

#. `@sloosvel`_ improved :meth:`~iris.cube.CubeList.concatenate_cube` and
Expand Down
148 changes: 76 additions & 72 deletions lib/iris/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
"PEAK",
"PERCENTILE",
"PROPORTION",
"PercentileAggregator",
"PointInCell",
"RMS",
"STD_DEV",
Expand All @@ -89,6 +90,7 @@
"VARIANCE",
"WPERCENTILE",
"WeightedAggregator",
"WeightedPercentileAggregator",
"clear_phenomenon_identity",
"create_weighted_aggregator_fn",
)
Expand Down Expand Up @@ -488,7 +490,7 @@ def __init__(
aggregation. Note that, it need not support all features of the
main operation, but should raise an error in unhandled cases.

Additional kwargs::
Additional kwargs:
Passed through to :data:`call_func`, :data:`lazy_func`, and
:data:`units_func`.

Expand Down Expand Up @@ -719,9 +721,12 @@ def __init__(self, units_func=None, **kwargs):
If provided, called to convert a cube's units.
Returns an :class:`cf_units.Unit`, or a
value that can be made into one.
To ensure backwards-compatibility, also accepts a callable with
call signature (units).

Additional kwargs::
Passed through to :data:`call_func` and :data:`lazy_func`.
Additional kwargs:
Passed through to :data:`call_func`, :data:`lazy_func`, and
:data:`units_func`.

This aggregator can used by cube aggregation methods such as
:meth:`~iris.cube.Cube.collapsed` and
Expand Down Expand Up @@ -960,14 +965,27 @@ def __init__(self, units_func=None, lazy_func=None, **kwargs):
If provided, called to convert a cube's units.
Returns an :class:`cf_units.Unit`, or a
value that can be made into one.
To ensure backwards-compatibility, also accepts a callable with
call signature (units).

trexfeathers marked this conversation as resolved.
Show resolved Hide resolved
If the aggregator is used by a cube aggregation method (e.g.,
trexfeathers marked this conversation as resolved.
Show resolved Hide resolved
:meth:`~iris.cube.Cube.collapsed`,
:meth:`~iris.cube.Cube.aggregated_by`,
:meth:`~iris.cube.Cube.rolling_window`), a keyword argument
`_weights_units` is provided to this function to allow updating
units based on the weights. `_weights_units` is determined from the
`weights` given to the aggregator (``None`` if no weights are
given). See :ref:`user guide <cube-statistics-collapsing-average>`
for an example of weighted aggregation that changes units.

* lazy_func (callable or None):
An alternative to :data:`call_func` implementing a lazy
aggregation. Note that, it need not support all features of the
main operation, but should raise an error in unhandled cases.

Additional kwargs::
Passed through to :data:`call_func` and :data:`lazy_func`.
Additional kwargs:
Passed through to :data:`call_func`, :data:`lazy_func`, and
:data:`units_func`.

This aggregator can used by cube aggregation methods such as
:meth:`~iris.cube.Cube.collapsed` and
Expand Down Expand Up @@ -1090,7 +1108,7 @@ class WeightedAggregator(Aggregator):
def __init__(
self, cell_method, call_func, units_func=None, lazy_func=None, **kwargs
):
"""
r"""
Create a weighted aggregator for the given :data:`call_func`.

Args:
Expand All @@ -1099,20 +1117,38 @@ def __init__(
Cell method string that supports string format substitution.

* call_func (callable):
Data aggregation function. Call signature `(data, axis, **kwargs)`.
Data aggregation function. Call signature `(data, axis,
\**kwargs)`.

Kwargs:

* units_func (callable):
Units conversion function.
| *Call signature*: (units, \**kwargs)

If provided, called to convert a cube's units.
Returns an :class:`cf_units.Unit`, or a
value that can be made into one.
To ensure backwards-compatibility, also accepts a callable with
call signature (units).

If the aggregator is used by a cube aggregation method (e.g.,
:meth:`~iris.cube.Cube.collapsed`,
:meth:`~iris.cube.Cube.aggregated_by`,
:meth:`~iris.cube.Cube.rolling_window`), a keyword argument
`_weights_units` is provided to this function to allow updating
units based on the weights. `_weights_units` is determined from the
`weights` given to the aggregator (``None`` if no weights are
given). See :ref:`user guide <cube-statistics-collapsing-average>`
for an example of weighted aggregation that changes units.

* lazy_func (callable or None):
An alternative to :data:`call_func` implementing a lazy
aggregation. Note that, it need not support all features of the
main operation, but should raise an error in unhandled cases.

Additional kwargs:
Passed through to :data:`call_func` and :data:`lazy_func`.
Passed through to :data:`call_func`, :data:`lazy_func`, and
:data:`units_func`.

"""
Aggregator.__init__(
Expand Down Expand Up @@ -1187,20 +1223,18 @@ def post_process(self, collapsed_cube, data_result, coords, **kwargs):
return result


class _Weights(np.ndarray):
class _Weights:
"""Class for handling weights for weighted aggregation.

This subclasses :class:`numpy.ndarray`; thus, all methods and properties of
:class:`numpy.ndarray` (e.g., `shape`, `ndim`, `view()`, etc.) are
available.
Provides the following two attributes:

Details on subclassing :class:`numpy.ndarray` are given here:
https://numpy.org/doc/stable/user/basics.subclassing.html
* ``array``: Lazy or non-lazy array of weights.
* ``units``: Units associated with the weights.

"""

def __new__(cls, weights, cube, units=None):
"""Create class instance.
def __init__(self, weights, cube):
"""Initialize class instance.

Args:

Expand All @@ -1212,83 +1246,47 @@ def __new__(cls, weights, cube, units=None):
one of :meth:`iris.cube.Cube.coords`,
:meth:`iris.cube.Cube.cell_measures`, or
:meth:`iris.cube.Cube.ancillary_variables`). If given as an
array-like object, use this directly and assume units of `1`. If
`units` is given, ignore all units derived above and use the ones
given by `units`.
array-like object, use this directly and assume units of `1`. Note:
this does **not** create a copy of the input array.
* cube (Cube):
Input cube for aggregation. If weights is given as :obj:`str` or
:class:`iris.coords._DimensionalMetadata`, try to extract the
:class:`iris.coords._DimensionalMetadata` object and corresponding
dimensional mappings from this cube. Otherwise, this argument is
ignored.
* units (string, Unit):
If ``None``, use units derived from `weights`. Otherwise, overwrite
the units derived from `weights` and use `units`.

"""
# `weights` is a cube
# Note: to avoid circular imports of Cube we use duck typing using the
# "hasattr" syntax here
# --> Extract data and units from cube
if hasattr(weights, "add_aux_coord"):
obj = np.asarray(weights.data).view(cls)
obj.units = weights.units
derived_array = weights.core_data()
derived_units = weights.units

# `weights`` is a string or _DimensionalMetadata object
# --> Extract _DimensionalMetadata object from cube, broadcast it to
# correct shape using the corresponding dimensional mapping, and use
# its data and units
elif isinstance(weights, (str, _DimensionalMetadata)):
dim_metadata = cube._dimensional_metadata(weights)
arr = dim_metadata._values
derived_array = dim_metadata._core_values()
if dim_metadata.shape != cube.shape:
arr = iris.util.broadcast_to_shape(
arr,
derived_array = iris.util.broadcast_to_shape(
derived_array,
cube.shape,
dim_metadata.cube_dims(cube),
)
obj = np.asarray(arr).view(cls)
obj.units = dim_metadata.units
derived_units = dim_metadata.units

# Remaining types (e.g., np.ndarray): try to convert to ndarray.
# Remaining types (e.g., np.ndarray, dask.array.core.Array, etc.)
# --> Use array directly and assign units of "1"
else:
obj = np.asarray(weights).view(cls)
obj.units = Unit("1")

# Overwrite units from units argument if necessary
if units is not None:
obj.units = units
derived_array = weights
derived_units = Unit("1")

return obj

def __array_finalize__(self, obj):
"""See https://numpy.org/doc/stable/user/basics.subclassing.html.

Note
----
`obj` cannot be `None` here since ``_Weights.__new__`` does not call
``super().__new__`` explicitly.

"""
self.units = getattr(obj, "units", Unit("1"))

@classmethod
def update_kwargs(cls, kwargs, cube):
"""Update ``weights`` keyword argument in-place.

Args:

* kwargs (dict):
Keyword arguments that will be updated in-place if a `weights`
keyword is present which is not ``None``.
* cube (Cube):
Input cube for aggregation. If weights is given as :obj:`str`, try
to extract a cell measure with the corresponding name from this
cube. Otherwise, this argument is ignored.

"""
if kwargs.get("weights") is not None:
kwargs["weights"] = cls(kwargs["weights"], cube)
self.array = derived_array
self.units = derived_units


def create_weighted_aggregator_fn(aggregator_fn, axis, **kwargs):
Expand Down Expand Up @@ -1752,11 +1750,17 @@ def _sum(array, **kwargs):
def _sum_units_func(units, **kwargs):
"""Multiply original units with weight units if possible."""
weights = kwargs.get("weights")
if weights is None: # no weights given or weights are None
result = units
elif hasattr(weights, "units"): # weights are _Weights
result = units * weights.units
else: # weights are regular np.ndarrays
weights_units = kwargs.get("_weights_units")
multiply_by_weights_units = all(
[
weights is not None,
weights_units is not None,
weights_units != "1",
]
)
if multiply_by_weights_units:
result = units * weights_units
else:
result = units
return result

Expand Down
52 changes: 29 additions & 23 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -3836,7 +3836,10 @@ def collapsed(self, coords, aggregator, **kwargs):
"""
# Update weights kwargs (if necessary) to handle different types of
# weights
_Weights.update_kwargs(kwargs, self)
trexfeathers marked this conversation as resolved.
Show resolved Hide resolved
weights_info = None
if kwargs.get("weights") is not None:
weights_info = _Weights(kwargs["weights"], self)
kwargs["weights"] = weights_info.array

# Convert any coordinate names to coordinates
coords = self._as_list_of_coords(coords)
Expand Down Expand Up @@ -3981,7 +3984,11 @@ def collapsed(self, coords, aggregator, **kwargs):
)

aggregator.update_metadata(
collapsed_cube, coords, axis=collapse_axis, **kwargs
collapsed_cube,
coords,
axis=collapse_axis,
_weights_units=getattr(weights_info, "units", None),
**kwargs,
)
result = aggregator.post_process(
collapsed_cube, data_result, coords, **kwargs
Expand Down Expand Up @@ -4074,7 +4081,10 @@ def aggregated_by(
"""
# Update weights kwargs (if necessary) to handle different types of
# weights
_Weights.update_kwargs(kwargs, self)
weights_info = None
if kwargs.get("weights") is not None:
weights_info = _Weights(kwargs["weights"], self)
kwargs["weights"] = weights_info.array

groupby_coords = []
dimension_to_groupby = None
Expand Down Expand Up @@ -4114,16 +4124,10 @@ def aggregated_by(
f"that is aggregated, got {len(weights):d}, expected "
f"{self.shape[dimension_to_groupby]:d}"
)

# iris.util.broadcast_to_shape does not preserve _Weights type
weights = _Weights(
iris.util.broadcast_to_shape(
weights,
self.shape,
(dimension_to_groupby,),
),
self,
units=weights.units,
weights = iris.util.broadcast_to_shape(
weights,
self.shape,
(dimension_to_groupby,),
)
if weights.shape != self.shape:
raise ValueError(
Expand Down Expand Up @@ -4274,7 +4278,11 @@ def aggregated_by(

# Add the aggregation meta data to the aggregate-by cube.
aggregator.update_metadata(
aggregateby_cube, groupby_coords, aggregate=True, **kwargs
aggregateby_cube,
groupby_coords,
aggregate=True,
_weights_units=getattr(weights_info, "units", None),
**kwargs,
)
# Replace the appropriate coordinates within the aggregate-by cube.
(dim_coord,) = self.coords(
Expand Down Expand Up @@ -4413,7 +4421,10 @@ def rolling_window(self, coord, aggregator, window, **kwargs):
"""
# Update weights kwargs (if necessary) to handle different types of
# weights
_Weights.update_kwargs(kwargs, self)
weights_info = None
if kwargs.get("weights") is not None:
weights_info = _Weights(kwargs["weights"], self)
kwargs["weights"] = weights_info.array

coord = self._as_list_of_coords(coord)[0]

Expand Down Expand Up @@ -4500,6 +4511,7 @@ def rolling_window(self, coord, aggregator, window, **kwargs):
new_cube,
[coord],
action="with a rolling window of length %s over" % window,
_weights_units=getattr(weights_info, "units", None),
**kwargs,
)
# and perform the data transformation, generating weights first if
Expand All @@ -4516,14 +4528,8 @@ def rolling_window(self, coord, aggregator, window, **kwargs):
"as the window."
)
kwargs = dict(kwargs)

# iris.util.broadcast_to_shape does not preserve _Weights type
kwargs["weights"] = _Weights(
iris.util.broadcast_to_shape(
weights, rolling_window_data.shape, (dimension + 1,)
),
self,
units=weights.units,
kwargs["weights"] = iris.util.broadcast_to_shape(
weights, rolling_window_data.shape, (dimension + 1,)
)
data_result = aggregator.aggregate(
rolling_window_data, axis=dimension + 1, **kwargs
Expand Down
Loading