Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boundary options for rolling.construct #3587

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ Breaking changes

New Features
~~~~~~~~~~~~
- Some boundary options ('edge' | 'reflect' | 'symmetric' | 'wrap') are
newly supported in :py:meth:`~xarray.DataArray.rolling.construct`.
(:issue:`2007`)
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
- :py:meth:`Dataset.quantile`, :py:meth:`DataArray.quantile` and ``GroupBy.quantile``
now work with dask Variables.
By `Deepak Cherian <https://github.com/dcherian>`_.
Expand Down
105 changes: 104 additions & 1 deletion xarray/core/dask_array_ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from distutils import version
import numpy as np

from . import dtypes, nputils
Expand Down Expand Up @@ -27,7 +28,109 @@ def dask_rolling_wrapper(moving_func, a, window, min_count=None, axis=-1):
return result


def rolling_window(a, axis, window, center, fill_value):
def rolling_window(a, axis, window, center, fill_value, mode):
import dask

if version.LooseVersion(dask.__version__) < "1.7":
if mode is not None:
raise NotImplementedError(
"dask >= 1.7 is required for mode={}.".format(mode)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right version number for dask? I can't find a 1.7 release.

Also note that we only need to support versions of dask < 6 months old, per http://xarray.pydata.org/en/stable/installing.html. If we can just delete the old code entirely, that might be cleaner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dask API page says it came at the version 1.7, but I just noticed this doc is just copied from numpy...
I'll fix it.

)
return _rolling_window_old_dask(a, axis, window, center, fill_value)

return _rolling_window(a, axis, window, center, fill_value, mode)


def _rolling_window(a, axis, window, center, fill_value, mode):
"""Dask's equivalence to np.utils.rolling_window
"""
import dask.array as da

orig_shape = a.shape
if axis < 0:
axis = a.ndim + axis
depth = {d: 0 for d in range(a.ndim)}
depth[axis] = int(window / 2)
# For evenly sized window, we need to crop the first point of each block.
offset = 1 if window % 2 == 0 else 0

if depth[axis] > min(a.chunks[axis]):
raise ValueError(
"For window size %d, every chunk should be larger than %d, "
"but the smallest chunk size is %d. Rechunk your array\n"
"with a larger chunk size or a chunk size that\n"
"more evenly divides the shape of your array."
% (window, depth[axis], min(a.chunks[axis]))
)

# Although da.overlap pads values to boundaries of the array,
# the size of the generated array is smaller than what we want
# if center == False.
if center:
start = int(window / 2) # 10 -> 5, 9 -> 4
end = window - 1 - start
else:
start, end = window - 1, 0
pad_size = max(start, end) + offset - depth[axis]
drop_size = 0
# pad_size becomes more than 0 when the overlapped array is smaller than
# needed. In this case, we need to enlarge the original array by padding
# before overlapping.
if pad_size > 0:
if pad_size < depth[axis]:
# overlapping requires each chunk larger than depth. If pad_size is
# smaller than the depth, we enlarge this and truncate it later.
drop_size = depth[axis] - pad_size
pad_size = depth[axis]

# pad manually
pads = (
((0, 0),) * axis
+ ((pad_size + depth[axis], depth[axis]),)
+ ((0, 0),) * (a.ndim - axis - 1)
)
if mode is None:
a = da.pad(a, pads, mode="constant", constant_values=fill_value)
else:
a = da.pad(a, pads, mode=mode)

# merge the padded part into the edge block
new_chunks = list(a.chunks[axis])
first_chunk = new_chunks.pop(0)
last_chunk = new_chunks.pop(-1)
if len(new_chunks) == 0:
new_chunks = (first_chunk + last_chunk,)
else:
new_chunks[0] += first_chunk
new_chunks[-1] += last_chunk

chunks = list(a.chunks)
chunks[axis] = tuple(new_chunks)
a = da.rechunk(a, tuple(chunks))

# create overlap arrays without boundary
boundary = {d: "none" for d in range(a.ndim)}
ag = da.overlap.overlap(a, depth=depth, boundary=boundary)

def func(x, window, axis=-1):
x = np.asarray(x)
rolling = nputils._rolling_window(x, window, axis)
return rolling[(slice(None),) * axis + (slice(offset, None),)]

# we need a correct shape
chunks = list(ag.chunks)
chunks[axis] = tuple([c - (window - 1 + offset) for c in chunks[axis]])
chunks.append(window)
out = ag.map_blocks(
func, dtype=a.dtype, new_axis=a.ndim, chunks=chunks, window=window, axis=axis
)

# crop boundary.
index = (slice(None),) * axis + (slice(drop_size, drop_size + orig_shape[axis]),)
return out[index]


def _rolling_window_old_dask(a, axis, window, center, fill_value):
"""Dask's equivalence to np.utils.rolling_window
"""
import dask.array as da
Expand Down
8 changes: 5 additions & 3 deletions xarray/core/duck_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,14 @@ def last(values, axis, skipna=None):
return take(values, -1, axis=axis)


def rolling_window(array, axis, window, center, fill_value):
def rolling_window(array, axis, window, center, fill_value, mode):
"""
Make an ndarray with a rolling window of axis-th dimension.
The rolling dimension will be placed at the last dimension.
"""
if isinstance(array, dask_array_type):
return dask_array_ops.rolling_window(array, axis, window, center, fill_value)
return dask_array_ops.rolling_window(
array, axis, window, center, fill_value, mode
)
else: # np.ndarray
return nputils.rolling_window(array, axis, window, center, fill_value)
return nputils.rolling_window(array, axis, window, center, fill_value, mode)
7 changes: 5 additions & 2 deletions xarray/core/nputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __setitem__(self, key, value):
self._array[key] = np.moveaxis(value, vindex_positions, mixed_positions)


def rolling_window(a, axis, window, center, fill_value):
def rolling_window(a, axis, window, center, fill_value, mode):
""" rolling window with padding. """
pads = [(0, 0) for s in a.shape]
if center:
Expand All @@ -149,7 +149,10 @@ def rolling_window(a, axis, window, center, fill_value):
pads[axis] = (start, end)
else:
pads[axis] = (window - 1, 0)
a = np.pad(a, pads, mode="constant", constant_values=fill_value)
if mode is None:
a = np.pad(a, pads, mode="constant", constant_values=fill_value)
else:
a = np.pad(a, pads, mode=mode)
return _rolling_window(a, window, axis)


Expand Down
22 changes: 18 additions & 4 deletions xarray/core/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def __iter__(self):

yield (label, window)

def construct(self, window_dim, stride=1, fill_value=dtypes.NA):
def construct(self, window_dim, stride=1, fill_value=dtypes.NA, mode=None):
"""
Convert this rolling object to xr.DataArray,
where the window dimension is stacked as a new dimension
Expand All @@ -206,6 +206,11 @@ def construct(self, window_dim, stride=1, fill_value=dtypes.NA):
Size of stride for the rolling window.
fill_value: optional. Default dtypes.NA
Filling value to match the dimension size.
mode: optional. Default None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mode seems a little vague here in the context of constructing a rolling window. Maybe we could call this pad_mode or boundary_mode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pad_mode seems better since we should eventually have da.pad(mode="constant").

One of None | 'edge' | 'reflect' | 'symmetric' | 'wrap'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the default should be constant, which would match np.pad?

For the details of the mode, see
https://docs.scipy.org/doc/numpy/reference/generated/numpy.pad.html
If it is not None, fill_value is ignored.

Returns
-------
Expand Down Expand Up @@ -234,7 +239,12 @@ def construct(self, window_dim, stride=1, fill_value=dtypes.NA):
from .dataarray import DataArray

window = self.obj.variable.rolling_window(
self.dim, self.window, window_dim, self.center, fill_value=fill_value
self.dim,
self.window,
window_dim,
self.center,
fill_value=fill_value,
mode=mode,
)
result = DataArray(
window, dims=self.obj.dims + (window_dim,), coords=self.obj.coords
Expand Down Expand Up @@ -466,7 +476,7 @@ def _numpy_or_bottleneck_reduce(
**kwargs,
)

def construct(self, window_dim, stride=1, fill_value=dtypes.NA):
def construct(self, window_dim, stride=1, fill_value=dtypes.NA, mode=None):
"""
Convert this rolling object to xr.Dataset,
where the window dimension is stacked as a new dimension
Expand All @@ -479,6 +489,10 @@ def construct(self, window_dim, stride=1, fill_value=dtypes.NA):
size of stride for the rolling window.
fill_value: optional. Default dtypes.NA
Filling value to match the dimension size.
mode: optional. Default None
One of None | 'edge' | 'reflect' | 'symmetric' | 'wrap'
For the details of the mode, see
https://docs.scipy.org/doc/numpy/reference/generated/numpy.pad.html

Returns
-------
Expand All @@ -491,7 +505,7 @@ def construct(self, window_dim, stride=1, fill_value=dtypes.NA):
for key, da in self.obj.data_vars.items():
if self.dim in da.dims:
dataset[key] = self.rollings[key].construct(
window_dim, fill_value=fill_value
window_dim, fill_value=fill_value, mode=mode
)
else:
dataset[key] = da
Expand Down
6 changes: 5 additions & 1 deletion xarray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ def rank(self, dim, pct=False):
return Variable(self.dims, ranked)

def rolling_window(
self, dim, window, window_dim, center=False, fill_value=dtypes.NA
self, dim, window, window_dim, center=False, fill_value=dtypes.NA, mode=None
):
"""
Make a rolling_window along dim and add a new_dim to the last place.
Expand All @@ -1824,6 +1824,9 @@ def rolling_window(
of the axis.
fill_value:
value to be filled.
mode: optional. Default None
One of None | 'edge' | 'reflect' | 'symmetric' | 'wrap'
If it is not None, fill_value is ignored.

Returns
-------
Expand Down Expand Up @@ -1861,6 +1864,7 @@ def rolling_window(
window=window,
center=center,
fill_value=fill_value,
mode=mode,
),
)

Expand Down
8 changes: 8 additions & 0 deletions xarray/tests/test_dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4257,6 +4257,14 @@ def test_rolling_wrapped_dask_nochunk(center):
assert_allclose(actual, expected)


@pytest.mark.parametrize("da", (1, 2), indirect=True)
@pytest.mark.parametrize("center", [True, False])
@pytest.mark.parametrize("mode", ["edge", "symmetric", "reflect", "wrap"])
def test_rolling_mode(da, center, mode):
# just a working test
da.rolling(time=7, center=center).construct("constructed", mode=mode)


@pytest.mark.parametrize("center", (True, False))
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
@pytest.mark.parametrize("window", (1, 2, 3, 4))
Expand Down
8 changes: 5 additions & 3 deletions xarray/tests/test_duck_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,10 @@ def test_dask_rolling(axis, window, center):
dx = da.from_array(x, chunks=[(6, 30, 30, 20, 14), 8])

expected = rolling_window(
x, axis=axis, window=window, center=center, fill_value=np.nan
x, axis=axis, window=window, center=center, fill_value=np.nan, mode=None
)
actual = rolling_window(
dx, axis=axis, window=window, center=center, fill_value=np.nan
dx, axis=axis, window=window, center=center, fill_value=np.nan, mode=None
)
assert isinstance(actual, da.Array)
assert_array_equal(actual, expected)
Expand All @@ -515,7 +515,9 @@ def test_dask_rolling(axis, window, center):
# we need to take care of window size if chunk size is small
# window/2 should be smaller than the smallest chunk size.
with pytest.raises(ValueError):
rolling_window(dx, axis=axis, window=100, center=center, fill_value=np.nan)
rolling_window(
dx, axis=axis, window=100, center=center, fill_value=np.nan, mode=None
)


@pytest.mark.skipif(not has_dask, reason="This is for dask.")
Expand Down
24 changes: 21 additions & 3 deletions xarray/tests/test_nputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,35 @@ def test_vindex():
def test_rolling():
x = np.array([1, 2, 3, 4], dtype=float)

actual = rolling_window(x, axis=-1, window=3, center=True, fill_value=np.nan)
actual = rolling_window(
x, axis=-1, window=3, center=True, fill_value=np.nan, mode=None
)
expected = np.array(
[[np.nan, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, np.nan]], dtype=float
)
assert_array_equal(actual, expected)

actual = rolling_window(x, axis=-1, window=3, center=False, fill_value=0.0)
actual = rolling_window(
x, axis=-1, window=3, center=False, fill_value=0.0, mode=None
)
expected = np.array([[0, 0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]], dtype=float)
assert_array_equal(actual, expected)

actual = rolling_window(
x, axis=-1, window=3, center=True, fill_value=None, mode="wrap"
)
expected = np.array([[4, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 1]], dtype=float)
assert_array_equal(actual, expected)

actual = rolling_window(
x, axis=-1, window=3, center=True, fill_value=None, mode="reflect"
)
expected = np.array([[2, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 3]], dtype=float)
assert_array_equal(actual, expected)

x = np.stack([x, x * 1.1])
actual = rolling_window(x, axis=-1, window=3, center=False, fill_value=0.0)
actual = rolling_window(
x, axis=-1, window=3, center=True, fill_value=None, mode="reflect"
)
expected = np.stack([expected, expected * 1.1], axis=0)
assert_array_equal(actual, expected)
31 changes: 30 additions & 1 deletion xarray/tests/test_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from xarray.core.pycompat import dask_array_type
from xarray.core.utils import NDArrayMixin
from xarray.core.variable import as_compatible_data, as_variable
from xarray.tests import requires_bottleneck
from xarray.tests import requires_bottleneck, LooseVersion

from . import (
assert_allclose,
Expand Down Expand Up @@ -821,6 +821,11 @@ def test_pad(self):
assert_array_equal(actual, expected)

def test_rolling_window(self):
v = self.cls(["x", "y", "z"], np.arange(40 * 30 * 2).reshape(40, 30, 2))
self._test_rolling_window(v)
self._test_rolling_window_mode(v)

def _test_rolling_window(self, v):
# Just a working test. See test_nputils for the algorithm validation
v = self.cls(["x", "y", "z"], np.arange(40 * 30 * 2).reshape(40, 30, 2))
for (d, w) in [("x", 3), ("y", 5)]:
Expand All @@ -836,6 +841,19 @@ def test_rolling_window(self):
v_loaded = v.load().rolling_window(d, w, d + "_window", center=True)
assert_array_equal(v_rolling, v_loaded)

def _test_rolling_window_mode(self, v):
# Just a working test. See test_nputils for the algorithm validation
v = self.cls(["x", "y", "z"], np.arange(40 * 30 * 2).reshape(40, 30, 2))
for (d, w) in [("x", 3), ("y", 5)]:
# dask and numpy result should be the same
v_rolling = v.rolling_window(
d, w, d + "_window", center=True, mode="symmetric"
)
v_loaded = v.load().rolling_window(
d, w, d + "_window", center=True, mode="symmetric"
)
assert_array_equal(v_rolling, v_loaded)

# numpy backend should not be over-written
if isinstance(v._data, np.ndarray):
with pytest.raises(ValueError):
Expand Down Expand Up @@ -1871,6 +1889,17 @@ def test_getitem_with_mask_nd_indexer(self):
self.cls(("x", "y"), [[0, -1], [-1, 2]]),
)

def test_rolling_window(self):
v = self.cls(["x", "y", "z"], np.arange(40 * 30 * 2).reshape(40, 30, 2))
self._test_rolling_window(v)
self._test_rolling_window(v.chunk({"x": 4, "y": 5, "z": -1}))

import dask

if LooseVersion(dask.__version__) >= "1.7":
self._test_rolling_window_mode(v)
self._test_rolling_window_mode(v.chunk({"x": 4, "y": 5, "z": -1}))


@requires_sparse
class TestVariableWithSparse:
Expand Down