diff --git a/xarray/core/computation.py b/xarray/core/computation.py index b4a268b5c98..3d0a19030e0 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -525,6 +525,8 @@ def apply_groupby_func(func, *args): from .variable import Variable groupbys = [arg for arg in args if isinstance(arg, GroupBy)] + for g in groupbys: + g._initialize_old() assert groupbys, "must have at least one groupby to iterate over" first_groupby = groupbys[0] if any(not first_groupby._group.equals(gb._group) for gb in groupbys[1:]): diff --git a/xarray/core/groupby.py b/xarray/core/groupby.py index 54499c16e90..f02db5c80e9 100644 --- a/xarray/core/groupby.py +++ b/xarray/core/groupby.py @@ -9,12 +9,14 @@ from . import dtypes, duck_array_ops, nputils, ops from ._reductions import DataArrayGroupByReductions, DatasetGroupByReductions +from .alignment import align from .arithmetic import DataArrayGroupbyArithmetic, DatasetGroupbyArithmetic from .concat import concat +from .duck_array_ops import isnull from .formatting import format_array_flat from .indexes import create_default_index_implicit, filter_indexes_from_coords from .options import _get_keep_attrs -from .pycompat import integer_types +from .pycompat import integer_types, is_duck_dask_array from .utils import ( either_dict_or_kwargs, hashable, @@ -175,10 +177,18 @@ def ndim(self): def values(self): return range(self.size) + @property + def data(self): + return self.values + @property def shape(self): return (self.size,) + @property + def sizes(self): + return {self.name: self.size} + def __getitem__(self, key): if isinstance(key, tuple): key = key[0] @@ -257,7 +267,7 @@ class GroupBy: "_inserted_dims", "_group", "_group_dim", - "_group_indices", + "_cached_group_indices", "_groups", "_obj", "_restore_coord_dims", @@ -265,9 +275,10 @@ class GroupBy: "_unique_coord", "_dims", "_squeeze", + "_grouper", # Save unstacked object for flox "_original_obj", - "_unstacked_group", + "_original_group", "_bins", ) @@ -330,46 +341,95 @@ def __init__( if getattr(group, "name", None) is None: group.name = "group" + if is_duck_dask_array(group.data): + warnings.warn( + "Grouping by a dask array computes that array. " + "This will raise an error in the future. " + "Use `.groupby(group.compute())` to avoid an error in the future.", + UserWarning, + stacklevel=3, + ) + group = group.compute() + self._original_obj = obj - self._unstacked_group = group - self._bins = bins + self._original_group = group + full_index = None - group, obj, stacked_dim, inserted_dims = _ensure_1d(group, obj) - (group_dim,) = group.dims + for dim in group.dims: + if group.sizes[dim] != obj.sizes[dim]: + raise ValueError( + "the group variable's length does not " + f"match length along dimension {dim}" + ) - expected_size = obj.sizes[group_dim] - if group.size != expected_size: + if bins is not None: + if isnull(bins).all(): + raise ValueError("All bin edges are NaN.") + + binned, bins = pd.cut( + np.ravel(group.values), bins, **cut_kwargs, retbins=True + ) + full_index = binned.categories + + # Can't do this without factorizing eagerly + if (binned.codes == -1).all(): + raise ValueError( + f"None of the data falls within bins with edges {bins!r}" + ) + + if not is_duck_dask_array(group.data) and isnull(group.data).all(): raise ValueError( - "the group variable's length does not " - "match the length of this variable along its " - "dimension" + "Failed to group data. Grouping by a variable that is all NaN." ) + # specification for the groupby operation + self._obj = obj + self._group = None + + self._group_dim = None + self._cached_group_indices = None + self._unique_coord = None + self._stacked_dim = None + self._inserted_dims = None + self._full_index = full_index + self._restore_coord_dims = restore_coord_dims + self._bins = bins + self._squeeze = squeeze + self._grouper = grouper + + # cached attributes + self._groups = None + self._dims = None + + def _initialize_old(self): + + # resample factorizes early by default + if self._grouper is not None: + return + + if self._group is not None and self._group_dim is not None: + return + + from .dataarray import DataArray + + assert not is_duck_dask_array(self._original_group.data) + full_index = None + group, obj, stacked_dim, inserted_dims = _ensure_1d( + self._original_group, self._original_obj + ) + (group_dim,) = group.dims - if bins is not None: - if duck_array_ops.isnull(bins).all(): - raise ValueError("All bin edges are NaN.") - binned, bins = pd.cut(group.values, bins, **cut_kwargs, retbins=True) + if self._bins is not None: + binned = pd.cut(group.values, self._bins) new_dim_name = group.name + "_bins" group = DataArray(binned, group.coords, name=new_dim_name) full_index = binned.categories - if grouper is not None: - index = safe_cast_to_index(group) - if not index.is_monotonic_increasing: - # TODO: sort instead of raising an error - raise ValueError("index must be monotonic for resampling") - full_index, first_items = self._get_index_and_items(index, grouper) - sbins = first_items.values.astype(np.int64) - group_indices = [slice(i, j) for i, j in zip(sbins[:-1], sbins[1:])] + [ - slice(sbins[-1], None) - ] - unique_coord = IndexVariable(group.name, first_items.index) - elif group.dims == (group.name,) and _unique_and_monotonic(group): + if group.dims == (group.name,) and _unique_and_monotonic(group): # no need to factorize group_indices = np.arange(group.size) - if not squeeze: + if not self._squeeze: # use slices to do views instead of fancy indexing # equivalent to: group_indices = group_indices.reshape(-1, 1) group_indices = [slice(i, i + 1) for i in group_indices] @@ -384,41 +444,36 @@ def __init__( # look through group to find the unique values group_as_index = safe_cast_to_index(group) - sort = bins is None and (not isinstance(group_as_index, pd.MultiIndex)) + sort = self._bins is None and ( + not isinstance(group_as_index, pd.MultiIndex) + ) unique_values, group_indices = unique_value_groups( group_as_index, sort=sort ) unique_coord = IndexVariable(group.name, unique_values) - if len(group_indices) == 0: - if bins is not None: - raise ValueError( - f"None of the data falls within bins with edges {bins!r}" - ) - else: - raise ValueError( - "Failed to group data. Are you grouping by a variable that is all NaN?" - ) - - # specification for the groupby operation + # TODO: rename to _group_1d, _obj_1d self._obj = obj self._group = group self._group_dim = group_dim - self._group_indices = group_indices + + self._full_index = full_index + + self._cached_group_indices = group_indices self._unique_coord = unique_coord self._stacked_dim = stacked_dim self._inserted_dims = inserted_dims - self._full_index = full_index - self._restore_coord_dims = restore_coord_dims - self._bins = bins - self._squeeze = squeeze - # cached attributes - self._groups = None - self._dims = None + @property + def _group_indices(self): + if self._cached_group_indices is None: + self._initialize_old() + return self._cached_group_indices @property def dims(self): + if self._group_indices is None: + self._initialize_old() if self._dims is None: self._dims = self._obj.isel( **{self._group_dim: self._group_indices[0]} @@ -433,6 +488,7 @@ def groups(self): """ # provided to mimic pandas.groupby if self._groups is None: + self._initialize_old() self._groups = dict(zip(self._unique_coord.values, self._group_indices)) return self._groups @@ -440,15 +496,23 @@ def __getitem__(self, key): """ Get DataArray or Dataset corresponding to a particular group label. """ + if self._group_dim is None: + self._initialize_old() return self._obj.isel({self._group_dim: self.groups[key]}) def __len__(self): + if self._unique_coord is None: + self._initialize_old() return self._unique_coord.size def __iter__(self): + if self._unique_coord is None: + self._initialize_old() return zip(self._unique_coord.values, self._iter_grouped()) def __repr__(self): + if self._unique_coord is None: + self._initialize_old() return "{}, grouped over {!r}\n{!r} groups with labels {}.".format( self.__class__.__name__, self._unique_coord.name, @@ -494,11 +558,26 @@ def _binary_op(self, other, f, reflexive=False): g = f if not reflexive else lambda x, y: f(y, x) - obj = self._obj - group = self._group - dim = self._group_dim + if self._bins is None: + obj = self._original_obj + group = self._original_group + dims = group.dims + else: + self._initialize_old() + obj = self._maybe_unstack(self._obj) + group = self._maybe_unstack(self._group) + dims = (self._group_dim,) + if isinstance(group, _DummyGroup): - group = obj[dim] + group = obj[group.name] + coord = group + else: + # TODO: only need output coordinate here. + self._initialize_old() + coord = self._unique_coord + assert coord is not None + if not isinstance(coord, DataArray): + coord = DataArray(self._unique_coord) name = group.name if not isinstance(other, (Dataset, DataArray)): @@ -515,37 +594,19 @@ def _binary_op(self, other, f, reflexive=False): "is not a dimension on the other argument" ) - try: - expanded = other.sel({name: group}) - except KeyError: - # some labels are absent i.e. other is not aligned - # so we align by reindexing and then rename dimensions. - - # Broadcast out scalars for backwards compatibility - # TODO: get rid of this when fixing GH2145 - for var in other.coords: - if other[var].ndim == 0: - other[var] = ( - other[var].drop_vars(var).expand_dims({name: other.sizes[name]}) - ) - expanded = ( - other.reindex({name: group.data}) - .rename({name: dim}) - .assign_coords({dim: obj[dim]}) - ) + # Broadcast out scalars for backwards compatibility + # TODO: get rid of this when fixing GH2145 + for var in other.coords: + if other[var].ndim == 0: + other[var] = ( + other[var].drop_vars(var).expand_dims({name: other.sizes[name]}) + ) - if self._bins is not None and name == dim and dim not in obj.xindexes: - # When binning by unindexed coordinate we need to reindex obj. - # _full_index is IntervalIndex, so idx will be -1 where - # a value does not belong to any bin. Using IntervalIndex - # accounts for any non-default cut_kwargs passed to the constructor - idx = pd.cut(group, bins=self._full_index).codes - obj = obj.isel({dim: np.arange(group.size)[idx != -1]}) + other, _ = align(other, coord, join="outer") + expanded = other.sel({name: group}) result = g(obj, expanded) - result = self._maybe_unstack(result) - group = self._maybe_unstack(group) if group.ndim > 1: # backcompat: # TODO: get rid of this when fixing GH2145 @@ -555,8 +616,9 @@ def _binary_op(self, other, f, reflexive=False): if isinstance(result, Dataset) and isinstance(obj, Dataset): for var in set(result): - if dim not in obj[var].dims: - result[var] = result[var].transpose(dim, ...) + for d in dims: + if d not in obj[var].dims: + result[var] = result[var].transpose(d, ...) return result def _maybe_restore_empty_groups(self, combined): @@ -608,20 +670,23 @@ def _flox_reduce(self, dim, keep_attrs=None, **kwargs): # weird backcompat # reducing along a unique indexed dimension with squeeze=True # should raise an error + group_name = self._original_group.name if ( - dim is None or dim == self._group.name - ) and self._group.name in obj.xindexes: - index = obj.indexes[self._group.name] + (dim is None or dim == group_name) + and group_name in obj.xindexes + and self._bins is None + ): + index = obj.indexes[group_name] if index.is_unique and self._squeeze: - raise ValueError(f"cannot reduce over dimensions {self._group.name!r}") + raise ValueError(f"cannot reduce over dimensions {group_name!r}") # group is only passed by resample group = kwargs.pop("group", None) if group is None: - if isinstance(self._unstacked_group, _DummyGroup): - group = self._unstacked_group.name + if isinstance(self._original_group, _DummyGroup): + group = self._original_group.name else: - group = self._unstacked_group + group = self._original_group unindexed_dims = tuple() if isinstance(group, str): @@ -658,11 +723,11 @@ def _flox_reduce(self, dim, keep_attrs=None, **kwargs): # flox's default would not set np.nan for integer dtypes kwargs.setdefault("fill_value", np.nan) else: - expected_groups = (self._unique_coord.values,) + expected_groups = None isbin = False result = xarray_reduce( - self._original_obj.drop_vars(non_numeric), + obj.drop_vars(non_numeric), group, dim=dim, expected_groups=expected_groups, @@ -683,18 +748,24 @@ def _flox_reduce(self, dim, keep_attrs=None, **kwargs): ) if self._bins is not None: + # TODO: pass cut_kwargs to flox to avoid this # bins provided to flox are at full precision # the bin edge labels have a default precision of 3 # reassign to fix that. - new_coord = [ - pd.Interval(inter.left, inter.right) for inter in self._full_index - ] - result[self._group.name] = new_coord + if self._full_index is not None: + new_coord = [ + pd.Interval(inter.left, inter.right) for inter in self._full_index + ] + result[group_name + "_bins"] = new_coord # Fix dimension order when binning a dimension coordinate # Needed as long as we do a separate code path for pint; # For some reason Datasets and DataArrays behave differently! - if isinstance(self._obj, Dataset) and self._group_dim in self._obj.dims: - result = result.transpose(self._group.name, ...) + if ( + isinstance(self._obj, Dataset) + and len(group.dims) == 1 + and group.dims[0] in self._obj.dims + ): + result = result.transpose(f"{group_name}_bins", ...) return result @@ -847,6 +918,8 @@ def quantile( "Sample quantiles in statistical packages," The American Statistician, 50(4), pp. 361-365, 1996 """ + self._initialize_old() + if dim is None: dim = self._group_dim @@ -1080,6 +1153,7 @@ def reduce( Array with summarized data and the indicated dimension(s) removed. """ + self._initialize_old() if dim is None: dim = self._group_dim @@ -1209,6 +1283,8 @@ def reduce( Array with summarized data and the indicated dimension(s) removed. """ + if self._group_dim is None: + self._initialize_old() if dim is None: dim = self._group_dim diff --git a/xarray/core/resample.py b/xarray/core/resample.py index e38deb3e440..d4f7e870a3c 100644 --- a/xarray/core/resample.py +++ b/xarray/core/resample.py @@ -6,7 +6,13 @@ import numpy as np from ._reductions import DataArrayResampleReductions, DatasetResampleReductions -from .groupby import DataArrayGroupByBase, DatasetGroupByBase, GroupBy +from .groupby import ( + DataArrayGroupByBase, + DatasetGroupByBase, + GroupBy, + safe_cast_to_index, +) +from .variable import IndexVariable RESAMPLE_DIM = "__resample_dim__" @@ -25,27 +31,44 @@ class Resample(GroupBy): """ + def __init__(self, obj, group, **kwargs): + super().__init__(obj, group, **kwargs) + + (self._group_dim,) = group.dims + self._group = group + index = safe_cast_to_index(group) + if not index.is_monotonic_increasing: + # TODO: sort instead of raising an error + raise ValueError("index must be monotonic for resampling") + self._full_index, first_items = self._get_index_and_items(index, self._grouper) + sbins = first_items.values.astype(np.int64) + self._cached_group_indices = [ + slice(i, j) for i, j in zip(sbins[:-1], sbins[1:]) + ] + [slice(sbins[-1], None)] + self._unique_coord = IndexVariable(group.name, first_items.index) + def _flox_reduce(self, dim, **kwargs): from .dataarray import DataArray kwargs.setdefault("method", "cohorts") - # now create a label DataArray since resample doesn't do that somehow + assert len(self._group.dims) == 1 + (group_dim,) = self._group.dims + repeats = [] + # now create a label DataArray since resample doesn't do that somehow for slicer in self._group_indices: stop = ( - slicer.stop - if slicer.stop is not None - else self._obj.sizes[self._group_dim] + slicer.stop if slicer.stop is not None else self._obj.sizes[group_dim] ) repeats.append(stop - slicer.start) labels = np.repeat(self._unique_coord.data, repeats) - group = DataArray(labels, dims=(self._group_dim,), name=self._unique_coord.name) + group = DataArray(labels, dims=(group_dim,), name=self._unique_coord.name) result = super()._flox_reduce(dim=dim, group=group, **kwargs) result = self._maybe_restore_empty_groups(result) - result = result.rename({RESAMPLE_DIM: self._group_dim}) + result = result.rename({RESAMPLE_DIM: group_dim}) return result def _upsample(self, method, *args, **kwargs): diff --git a/xarray/tests/test_groupby.py b/xarray/tests/test_groupby.py index f0b16bc42c7..e1594022715 100644 --- a/xarray/tests/test_groupby.py +++ b/xarray/tests/test_groupby.py @@ -504,6 +504,13 @@ def test_groupby_repr_datetime(obj) -> None: assert actual == expected +@requires_dask +def test_groupby_dask_array_raises_warning(): + ds = Dataset({"foo": ("x", np.arange(10)), "baz": ("x", np.arange(10))}).chunk() + with pytest.warns(UserWarning): + ds.groupby("baz") + + def test_groupby_drops_nans() -> None: # GH2383 # nan in 2D data variable (requires stacking)