Skip to content

open_mfdataset fails with cftime index when using parallel and dask delayed client #6226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
aidanheerdegen opened this issue Feb 1, 2022 · 6 comments · Fixed by #6249
Closed

Comments

@aidanheerdegen
Copy link
Contributor

aidanheerdegen commented Feb 1, 2022

What happened?

A call to open_mfdataset with parallel=true fails when using a dask delayed client with newer version of cftime and xarray. This happens with cftime==1.5.2 and xarray==0.20.2 but not cftime==1.5.1 and xarray==0.20.2.

What did you expect to happen?

I expected the call to open_mfdataset to work without error with parallel=True as it does with parallel=False and a previous version of cftime

Minimal Complete Verifiable Example

import xarray as xr
import numpy as np
from dask.distributed import Client

# Need a main routine for dask.distributed if run as script
if __name__ == "__main__":

    client = Client(n_workers=1) 

    t = xr.cftime_range('20010101','20010501', closed='left', calendar='noleap')
    x = np.arange(100)
    v = np.random.random((t.size,x.size))

    da = xr.DataArray(v, coords=[('time',t), ('x',x)])
    da.to_netcdf('sample.nc')

    # Works
    xr.open_mfdataset('sample.nc', parallel=False)

    # Throws TypeError exception
    xr.open_mfdataset('sample.nc', parallel=True)

Relevant log output

distributed.protocol.core - CRITICAL - Failed to deserialize                                                                                                                                       [32/525]
Traceback (most recent call last):                                                                                                                                                                        
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
    return msgpack.loads(                                                                                                                                                                                 
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default                                                  
    return merge_and_deserialize(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 488, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 417, in deserialize
    return loads(header, frames)                    
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 255, in _new_Index
    return cls.__new__(cls, **d)
TypeError: __new__() got an unexpected keyword argument 'dtype'
Traceback (most recent call last):
  File "/g/data/v45/aph502/notebooks/test_pickle.py", line 21, in <module>
    xr.open_mfdataset('sample.nc', parallel=True)                               
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/xarray/backends/api.py", line 916, in open_mfdataset
    datasets, closers = dask.compute(datasets, closers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)           
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 2746, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1946, in gather
    return self.sync(                             
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(                
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 364, in sync
    raise exc.with_traceback(tb)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
    result[0] = yield future
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1840, in _gather
    response = await future
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1891, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/core.py", line 900, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/core.py", line 669, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/tcp.py", line 232, in read
    msg = await from_frames(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/utils.py", line 78, in from_frames
    res = _from_frames()
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default
    return merge_and_deserialize(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 488, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 417, in deserialize
    return loads(header, frames)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
    return pickle.loads(x)                                                                                                    
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)                                                                                                    
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 255, in _new_Index
    return cls.__new__(cls, **d)                                                                                                          
TypeError: __new__() got an unexpected keyword argument 'dtype'

Anything else we need to know?

It seems similar to previous issues with pickling #5686 which was fixed in cftime Unidata/cftime#252 but the tests in previous issues still work, so it isn't exactly the same.

Environment


INSTALLED VERSIONS
------------------
commit: None
python: 3.9.9 | packaged by conda-forge | (main, Dec 20 2021, 02:41:03) 
[GCC 9.4.0]
python-bits: 64
OS: Linux
OS-release: 4.18.0-348.2.1.el8.nci.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_AU.utf8
LANG: en_AU.ISO8859-1
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.10.6
libnetcdf: 4.7.4

xarray: 0.20.2
pandas: 1.4.0
numpy: 1.22.1
scipy: 1.7.3
netCDF4: 1.5.6
pydap: installed
h5netcdf: 0.13.1
h5py: 3.6.0
Nio: None
zarr: 2.10.3
cftime: 1.5.2
nc_time_axis: 1.4.0
PseudoNetCDF: None
rasterio: 1.2.6
cfgrib: 0.9.9.1
iris: 3.1.0
bottleneck: 1.3.2
dask: 2022.01.0
distributed: 2022.01.0
matplotlib: 3.5.1
cartopy: 0.19.0.post1
seaborn: 0.11.2
numbagg: None
fsspec: 2022.01.0
cupy: 10.1.0
pint: 0.18
sparse: 0.13.0
setuptools: 59.8.0
pip: 21.3.1
conda: 4.11.0
pytest: 6.2.5
IPython: 8.0.1
sphinx: 4.4.0
@aidanheerdegen
Copy link
Contributor Author

aidanheerdegen commented Feb 1, 2022

Update: It is pandas that is the critical package. Pinning distributed<2022.01.0, xarray<0.21.0 and cftime<1.5.2 didn't fix it, but adding pandas<1.4.0 makes the above test pass. Will now try unpinning other packages and confirm it is pandas that is the issue.

Edit: Confirmed it is pandas==1.4.0 that causes this issue. Following version combination does not produce this error:

INSTALLED VERSIONS
------------------
commit: None
python: 3.9.10 | packaged by conda-forge | (main, Jan 30 2022, 18:04:04) 
[GCC 9.4.0]
python-bits: 64
OS: Linux
OS-release: 4.18.0-348.2.1.el8.nci.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_AU.utf8
LANG: en_AU.ISO8859-1
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.10.6
libnetcdf: 4.7.4

xarray: 0.21.0
pandas: 1.3.5
numpy: 1.22.1
scipy: 1.7.3
netCDF4: 1.5.6
pydap: installed
h5netcdf: 0.13.1
h5py: 3.6.0
Nio: None
zarr: 2.10.3
cftime: 1.5.2
nc_time_axis: 1.4.0
PseudoNetCDF: None
rasterio: 1.2.6
cfgrib: 0.9.10.0
iris: 3.1.0
bottleneck: 1.3.2
dask: 2022.01.1
distributed: 2022.01.1
matplotlib: 3.5.1
cartopy: 0.19.0.post1
seaborn: 0.11.2
numbagg: None
fsspec: 2022.01.0
cupy: 10.1.0
pint: 0.18
sparse: 0.13.0
setuptools: 59.8.0
pip: 21.3.1
conda: 4.11.0
pytest: 6.2.5
IPython: 8.0.1
sphinx: 4.4.0

@mathause
Copy link
Collaborator

mathause commented Feb 1, 2022

Smaller repro

import xarray as xr
import pickle
t = xr.cftime_range("20010101", "20010520")
pickle.loads(pickle.dumps(t))

Looks like pandas now passes dtype on to __new__ which CFTimeIndex.__new__ does not accept:

def __new__(cls, data, name=None):

Might be pandas-dev/pandas#43188. So CFTimeIndex.__new__ might need to accept dtype? @spencerkclark

@mathause mathause added topic-cftime topic-internals and removed needs triage Issue that has not been reviewed by xarray team member labels Feb 1, 2022
@antarcticrainforest
Copy link
Contributor

I just ran into the very same issue. Are you sure that this is a problem with pandas? I've had a look into the pandas changes between 1.3.X and 1.4.X. Apparently the _new_Index method, which gets involved when serialising the index object, has been changed:

    elif "dtype" not in d and "data" in d:
        # Prevent Index.__new__ from conducting inference;
        #  "data" key not in RangeIndex
        d["dtype"] = d["data"].dtype
    return cls.__new__(cls, **d)

the problem is, that __new__ doesn't except a dtype argument. I've tried adding a dtype argument and it works. So I guess since this class inherits from pd.Index it needs to be updated?

@spencerkclark
Copy link
Member

Thanks @antarcticrainforest -- I think that's exactly what @mathause is getting at. It seems fairly safe to add a new keyword argument to CFTimeIndex.__new__, and the example @mathause uses would make a nice test. Would either of you be up to make a PR?

@antarcticrainforest
Copy link
Contributor

antarcticrainforest commented Feb 1, 2022

Are we expecting the CFTimeIndex object to always have "O" as dtype? If so the solution would be straight forward. Which means I can create a PR.

@spencerkclark
Copy link
Member

Awesome, thanks @antarcticrainforest -- yes, it will always have an object dtype.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants