From 6b932e465f14cb5af88b7863f3ba5b552fc2d4a1 Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Mon, 27 Dec 2021 18:32:12 -0400 Subject: [PATCH 1/7] Adding the new algorithm for forward filling --- xarray/core/dask_array_ops.py | 58 ++++++++++++++++++++--------- xarray/tests/test_duck_array_ops.py | 24 ++++++------ 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index 5eeb22767c8..7e0be89c2b5 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -57,24 +57,46 @@ def push(array, n, axis): """ Dask-aware bottleneck.push """ - from bottleneck import push + import numpy as np + import dask.array as da + import bottleneck - if len(array.chunks[axis]) > 1 and n is not None and n < array.shape[axis]: - raise NotImplementedError( - "Cannot fill along a chunked axis when limit is not None." - "Either rechunk to a single chunk along this axis or call .compute() or .load() first." - ) - if all(c == 1 for c in array.chunks[axis]): - array = array.rechunk({axis: 2}) - pushed = array.map_blocks(push, axis=axis, n=n, dtype=array.dtype, meta=array._meta) - if len(array.chunks[axis]) > 1: - pushed = pushed.map_overlap( - push, + def _fill_with_last_one(a, b): + # cumreduction apply the push func over all the blocks first so, the only missing part is filling + # the missing values using the last data of the previous chunk + if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array): + a, b = np.ma.getdata(a), np.ma.getdata(b) + values = np.where(~np.isnan(b), b, a) + return np.ma.masked_array(values, mask=np.ma.getmaskarray(b)) + + return np.where(~np.isnan(b), b, a) + + def _ffill(x): + return da.reductions.cumreduction( + func=bottleneck.push, + binop=_fill_with_last_one, + ident=np.nan, + x=x, axis=axis, - n=n, - depth={axis: (1, 0)}, - boundary="none", - dtype=array.dtype, - meta=array._meta, + dtype=x.dtype, + method="sequential", ) - return pushed + + if n is not None and n > 0: + arange = da.broadcast_to( + da.arange( + array.shape[axis], + chunks=array.chunks[axis], + dtype=array.dtype + ).reshape( + tuple(size if i == axis else 1 for i, size in enumerate(array.shape)) + ), + array.shape, + array.chunks + ) + valid_arange = da.where(da.notnull(array), arange, np.nan) + valid_limits = (arange - _ffill(valid_arange)) <= n + # omit the forward fill that violate the limit + return da.where(valid_limits, _ffill(array), np.nan) + + return _ffill(array) diff --git a/xarray/tests/test_duck_array_ops.py b/xarray/tests/test_duck_array_ops.py index c032a781e47..e12798b70c9 100644 --- a/xarray/tests/test_duck_array_ops.py +++ b/xarray/tests/test_duck_array_ops.py @@ -884,16 +884,18 @@ def test_push_dask(): import bottleneck import dask.array - array = np.array([np.nan, np.nan, np.nan, 1, 2, 3, np.nan, np.nan, 4, 5, np.nan, 6]) - expected = bottleneck.push(array, axis=0) - for c in range(1, 11): + array = np.array([np.nan, 1, 2, 3, np.nan, np.nan, np.nan, np.nan, 4, 5, np.nan, 6]) + + for n in [None, 1, 2, 3, 4, 5, 11]: + expected = bottleneck.push(array, axis=0, n=n) + for c in range(1, 11): + with raise_if_dask_computes(): + actual = push(dask.array.from_array(array, chunks=c), axis=0, n=n) + np.testing.assert_equal(actual, expected) + + # some chunks of size-1 with NaN with raise_if_dask_computes(): - actual = push(dask.array.from_array(array, chunks=c), axis=0, n=None) + actual = push( + dask.array.from_array(array, chunks=(1, 2, 3, 2, 2, 1, 1)), axis=0, n=n + ) np.testing.assert_equal(actual, expected) - - # some chunks of size-1 with NaN - with raise_if_dask_computes(): - actual = push( - dask.array.from_array(array, chunks=(1, 2, 3, 2, 2, 1, 1)), axis=0, n=None - ) - np.testing.assert_equal(actual, expected) From 151a101488c2a71964544fb0d2175a594c8e0c7c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 27 Dec 2021 22:38:02 +0000 Subject: [PATCH 2/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- xarray/core/dask_array_ops.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index 7e0be89c2b5..f2a927fb13b 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -57,9 +57,9 @@ def push(array, n, axis): """ Dask-aware bottleneck.push """ - import numpy as np - import dask.array as da import bottleneck + import dask.array as da + import numpy as np def _fill_with_last_one(a, b): # cumreduction apply the push func over all the blocks first so, the only missing part is filling @@ -85,14 +85,12 @@ def _ffill(x): if n is not None and n > 0: arange = da.broadcast_to( da.arange( - array.shape[axis], - chunks=array.chunks[axis], - dtype=array.dtype + array.shape[axis], chunks=array.chunks[axis], dtype=array.dtype ).reshape( tuple(size if i == axis else 1 for i, size in enumerate(array.shape)) ), array.shape, - array.chunks + array.chunks, ) valid_arange = da.where(da.notnull(array), arange, np.nan) valid_limits = (arange - _ffill(valid_arange)) <= n From 6b09e89ef823cb6a7f48879b28455ce75bdf394a Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Mon, 27 Dec 2021 20:57:58 -0400 Subject: [PATCH 3/7] Adding the new algorithm for forward filling --- xarray/tests/test_missing.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/xarray/tests/test_missing.py b/xarray/tests/test_missing.py index 69b59a7418c..35b21cbbed2 100644 --- a/xarray/tests/test_missing.py +++ b/xarray/tests/test_missing.py @@ -451,10 +451,6 @@ def test_ffill_bfill_dask(method): expected = numpy_method("time", limit=3) assert_equal(actual, expected) - # limit < axis size - with pytest.raises(NotImplementedError): - actual = dask_method("x", limit=2) - # limit > axis size with raise_if_dask_computes(): actual = dask_method("x", limit=41) From 1c5f9c6d81276130bef6a170aff551b7c2d3a6f1 Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Mon, 27 Dec 2021 22:11:46 -0400 Subject: [PATCH 4/7] Adding the new algorithm for forward filling --- xarray/core/dask_array_ops.py | 1 - 1 file changed, 1 deletion(-) diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index f2a927fb13b..8ac7a2e7b94 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -79,7 +79,6 @@ def _ffill(x): x=x, axis=axis, dtype=x.dtype, - method="sequential", ) if n is not None and n > 0: From cd5214aa5cc6701a804b552f2c1d0857a17df0de Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Tue, 28 Dec 2021 00:25:16 -0400 Subject: [PATCH 5/7] Adding the new algorithm for forward filling --- xarray/core/dask_array_ops.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index 8ac7a2e7b94..6375efa77e3 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -71,16 +71,6 @@ def _fill_with_last_one(a, b): return np.where(~np.isnan(b), b, a) - def _ffill(x): - return da.reductions.cumreduction( - func=bottleneck.push, - binop=_fill_with_last_one, - ident=np.nan, - x=x, - axis=axis, - dtype=x.dtype, - ) - if n is not None and n > 0: arange = da.broadcast_to( da.arange( @@ -92,8 +82,16 @@ def _ffill(x): array.chunks, ) valid_arange = da.where(da.notnull(array), arange, np.nan) - valid_limits = (arange - _ffill(valid_arange)) <= n + valid_limits = (arange - push(valid_arange, None, axis)) <= n # omit the forward fill that violate the limit - return da.where(valid_limits, _ffill(array), np.nan) + return da.where(valid_limits, push(array, None, axis), np.nan) - return _ffill(array) + # The method parameter makes that the tests for python 3.7 fails. + return da.reductions.cumreduction( + func=bottleneck.push, + binop=_fill_with_last_one, + ident=np.nan, + x=array, + axis=axis, + dtype=array.dtype, + ) From b498552f59a45f8b5ebb3b9d30d10b531617efd0 Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Tue, 28 Dec 2021 17:27:40 -0400 Subject: [PATCH 6/7] 1. Deleting the masked array logic from the push method. 2. Adding the test for push with limit 3. Adding the information to the whats-new.rst --- doc/whats-new.rst | 6 ++++-- xarray/core/dask_array_ops.py | 7 +------ xarray/tests/test_missing.py | 6 ++++++ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 1c4b49097a3..3d9ff344aa2 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -21,7 +21,8 @@ v0.21.0 (unreleased) New Features ~~~~~~~~~~~~ - +- Enable the limit option for dask array in the following methods :py:meth:`DataArray.ffill`, :py:meth:`DataArray.bfill`, :py:meth:`Dataset.ffill` and :py:meth:`Dataset.bfill` + By `Joseph Nowak `_. Breaking changes ~~~~~~~~~~~~~~~~ @@ -33,7 +34,8 @@ Deprecations Bug fixes ~~~~~~~~~ - +- Properly support :py:meth:`DataArray.ffill`, :py:meth:`DataArray.bfill`, :py:meth:`Dataset.ffill` and :py:meth:`Dataset.bfill` along chunked dimensions (:issue:`6112`). + By `Joseph Nowak `_. Documentation ~~~~~~~~~~~~~ diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index 6375efa77e3..fa497dbca20 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -64,14 +64,9 @@ def push(array, n, axis): def _fill_with_last_one(a, b): # cumreduction apply the push func over all the blocks first so, the only missing part is filling # the missing values using the last data of the previous chunk - if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array): - a, b = np.ma.getdata(a), np.ma.getdata(b) - values = np.where(~np.isnan(b), b, a) - return np.ma.masked_array(values, mask=np.ma.getmaskarray(b)) - return np.where(~np.isnan(b), b, a) - if n is not None and n > 0: + if n is not None and 0 < n < array.shape[axis] - 1: arange = da.broadcast_to( da.arange( array.shape[axis], chunks=array.chunks[axis], dtype=array.dtype diff --git a/xarray/tests/test_missing.py b/xarray/tests/test_missing.py index 35b21cbbed2..4121b62a9e8 100644 --- a/xarray/tests/test_missing.py +++ b/xarray/tests/test_missing.py @@ -451,6 +451,12 @@ def test_ffill_bfill_dask(method): expected = numpy_method("time", limit=3) assert_equal(actual, expected) + # limit < axis size + with raise_if_dask_computes(): + actual = dask_method("x", limit=2) + expected = numpy_method("x", limit=2) + assert_equal(actual, expected) + # limit > axis size with raise_if_dask_computes(): actual = dask_method("x", limit=41) From 05cd2d354f4c03e9839b3e33ac61a62904cab695 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Jan 2022 17:00:40 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- doc/whats-new.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index f3a65f49ad7..5659cb029da 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -46,7 +46,7 @@ Bug fixes ~~~~~~~~~ - Properly support :py:meth:`DataArray.ffill`, :py:meth:`DataArray.bfill`, :py:meth:`Dataset.ffill` and :py:meth:`Dataset.bfill` along chunked dimensions (:issue:`6112`). By `Joseph Nowak `_. - + - Subclasses of ``byte`` and ``str`` (e.g. ``np.str_`` and ``np.bytes_``) will now serialise to disk rather than raising a ``ValueError: unsupported dtype for netCDF4 variable: object`` as they did previously (:pull:`5264`). By `Zeb Nicholls `_.