Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward Fill not working when there are all-NaN chunks #6112

Closed
josephnowak opened this issue Dec 27, 2021 · 6 comments · Fixed by #6118
Closed

Forward Fill not working when there are all-NaN chunks #6112

josephnowak opened this issue Dec 27, 2021 · 6 comments · Fixed by #6118

Comments

@josephnowak
Copy link
Contributor

josephnowak commented Dec 27, 2021

What happened: I'm working with a report dataset that only has data on some specific periods of time, the problem is that when I use the forward fill method it returns me many nans even on the last cells (it's a forward fill without limit).

What you expected to happen: The array should not have nans in the last cells if it has data in any other cell or there should be a warning somewhere.

Minimal Complete Verifiable Example:

import xarray as xr

xr.DataArray(
    [1, 2, np.nan, np.nan, np.nan, np.nan],
    dims=['a']
).chunk(
    2
).ffill(
    'a'
).compute()

output: array([ 1., 2., 2., 2., nan, nan])

Anything else we need to know?: I check a little bit the internal code of Xarray for forward filling when it use dask and I think that the problem is that the algorithm makes an initial forward fill on all the blocks and then it makes a map_overlap for forward filling between chunks which in case that there is an empty chunk will not work due that it is going to take the last value of the empty chunk which is nan (hope this help).
Environment:

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.10.0 | packaged by conda-forge | (default, Nov 20 2021, 02:25:18) [GCC 9.4.0]
python-bits: 64
OS: Linux
OS-release: 5.4.0-1025-aws
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_US.UTF-8
LANG: en_US.UTF-8
LOCALE: ('en_US', 'UTF-8')
libhdf5: None
libnetcdf: None

xarray: 0.20.2
pandas: 1.3.5
numpy: 1.21.4
scipy: None
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.10.3
cftime: None
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.3.2
dask: 2021.12.0
distributed: 2021.12.0
matplotlib: None
cartopy: None
seaborn: None
numbagg: None
fsspec: 2021.11.1
cupy: None
pint: None
sparse: None
setuptools: 59.4.0
pip: 21.3.1
conda: None
pytest: 6.2.5
IPython: 7.30.1
sphinx: None

@josephnowak josephnowak changed the title Forward Fill not working when there is more than 1 empty chunk Forward Fill not working when there are empty chunks Dec 27, 2021
@josephnowak
Copy link
Contributor Author

Probably using the logic of the cumsum and cumprod of dask you can implement the forward fill. I check a little bit the dask code that is on Xarray and apparently none of them use the HighLevelGraph so if the idea is to avoid building the graph manually I think that you can use the cumreduction function of dask to make the work (Probably there is a better dask function for doing this kind of computations but I haven't find it).

def ffill(x: xr.DataArray, dim: str, limit=None):
    
    def _fill_with_last_one(a, b):
        # cumreduction apply the push func over all the blocks first so, 
        # the only missing part is filling the missing values using
        # the last data for every one of them
        if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
            a = np.ma.getdata(a)
            b = np.ma.getdata(b)
            values = np.where(~np.isnan(b), b, a)
            return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
        
        return np.where(~np.isnan(b), b, a)

    
    from bottleneck import push

    return xr.DataArray(
        da.reductions.cumreduction(
            func=push,
            binop=_fill_with_last_one,
            ident=np.nan,
            x=x.data,
            axis=x.dims.index(dim),
            dtype=x.dtype,
            method="sequential",
        ),
        dims=x.dims,
        coords=x.coords
    )

@dcherian
Copy link
Contributor

Thanks @josephnowak . This is a great idea! 👏🏾 👏🏾 Can you send in a pull request please? We'll need to add the example from your first post as a test.

I think you can replace this dask_array_ops.push with your version:

def push(array, n, axis):
"""
Dask-aware bottleneck.push
"""
from bottleneck import push
if len(array.chunks[axis]) > 1 and n is not None and n < array.shape[axis]:
raise NotImplementedError(
"Cannot fill along a chunked axis when limit is not None."
"Either rechunk to a single chunk along this axis or call .compute() or .load() first."
)
if all(c == 1 for c in array.chunks[axis]):
array = array.rechunk({axis: 2})
pushed = array.map_blocks(push, axis=axis, n=n, dtype=array.dtype, meta=array._meta)
if len(array.chunks[axis]) > 1:
pushed = pushed.map_overlap(
push,
axis=axis,
n=n,
depth={axis: (1, 0)},
boundary="none",
dtype=array.dtype,
meta=array._meta,
)
return pushed

This function is expected to return a dask array, so you can just return the result of cumreduction instead of wrapping it up in a DataArray.

@josephnowak
Copy link
Contributor Author

josephnowak commented Dec 27, 2021

yes, of course, by the way, it would be possible to add something like the following code for the case that there is a limit? I know this code generates like 4x more tasks but at least it does the job so, probably a warning could be sufficient. (If it is not good enough to be added there is no problem, probably building the graph manually will be a better option than using this algorithm for the forward fill with limits).

def ffill(x: xr.DataArray, dim: str, limit=None):
    
    def _fill_with_last_one(a, b):
        # cumreduction apply the push func over all the blocks first so, 
        # the only missing part is filling the missing values using
        # the last data for every one of them
        if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
            a = np.ma.getdata(a)
            b = np.ma.getdata(b)
            values = np.where(~np.isnan(b), b, a)
            return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
        
        return np.where(~np.isnan(b), b, a)

    
    from bottleneck import push
    
    
    def _ffill(arr):
        return xr.DataArray(
            da.reductions.cumreduction(
                func=push,
                binop=_fill_with_last_one,
                ident=np.nan,
                x=arr.data,
                axis=arr.dims.index(dim),
                dtype=arr.dtype,
                method="sequential",
            ),
            dims=x.dims,
            coords=x.coords
        )
    
    if limit is not None:
        axis = x.dims.index(dim)
        arange = xr.DataArray(
            da.broadcast_to(
                da.arange(
                    x.shape[axis],
                    chunks=x.chunks[axis],
                    dtype=x.dtype
                ).reshape(
                    tuple(size if i == axis else 1 for i, size in enumerate(x.shape))
                ),
                x.shape,
                x.chunks
            ),
            coords=x.coords,
            dims=x.dims
        )
        valid_limits = (arange - _ffill(arange.where(x.notnull(), np.nan))) <= limit
        return _ffill(arr).where(valid_limits, np.nan)
    
    return _ffill(arr)

@josephnowak
Copy link
Contributor Author

Two questions:

  1. Is possible to set the array used for the test_push_dask as np.array([np.nan, 1, 2, 3, np.nan, np.nan, np.nan, np.nan, 4, 5, np.nan, 6])?, using that array you can validate the test case that I put on this issue without creating another array (It's the original array but permuted).
  2. Can I erase the conditional that checks for the case where all the chunks have size 1?, I think that with the new method that is not necessary.
    # I think this is only necessary due to the use of the map_overlap of the previous method.
    if all(c == 1 for c in array.chunks[axis]):
        array = array.rechunk({axis: 2})

@dcherian
Copy link
Contributor

Both sound good to me.

Your code for limit looks OK though I didn't look closely. It looks very similar to

# algorithm from https://github.com/pydata/xarray/pull/3302#discussion_r324707072
arange = ones_like(obj) * index
valid = obj.notnull()
valid_arange = arange.where(valid)
cumulative_nans = valid_arange.ffill(dim=dim).fillna(index[0])

@josephnowak
Copy link
Contributor Author

josephnowak commented Dec 27, 2021

I will be on the lookout for any changes that may be required.

@dcherian dcherian changed the title Forward Fill not working when there are empty chunks Forward Fill not working when there are all-NaN chunks Dec 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants