Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parallel regridding with dask-distributed #71

Open
jhamman opened this issue Oct 19, 2019 · 5 comments · Fixed by pangeo-data/xESMF#39
Open

Support parallel regridding with dask-distributed #71

jhamman opened this issue Oct 19, 2019 · 5 comments · Fixed by pangeo-data/xESMF#39

Comments

@jhamman
Copy link

jhamman commented Oct 19, 2019

Previous issues have discussed supporting dask enabled parallel regridding (e.g. #3). This seems to be working for the threaded scheduler but not for the distributed scheduler. It seems like this should be doable at this point with some work to solve some serialization problems.

Current behavior

If you run the current dask regridding example in this repo's binder setup with dask-distributed, you get a bunch of serialization errors:

[21] result = ds_out['air'].compute()  # actually applies regridding
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')
distributed.comm.utils - ERROR - ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 29, in _to_frames
    msg, serializers=serializers, on_error=on_error, context=context
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/batched.py", line 93, in _background_send
    payload, serializers=self.serializers, on_error="raise"
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 227, in write
    context={"sender": self._local_addr, "recipient": self._peer_addr},
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 37, in to_frames
    res = yield offload(_to_frames)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py", line 1370, in offload
    return (yield _offload_executor.submit(fn, *args, **kwargs))
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/srv/conda/envs/notebook/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/srv/conda/envs/notebook/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 29, in _to_frames
    msg, serializers=serializers, on_error=on_error, context=context
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')

From what I can tell, it seems like there is some object that dask is trying to serialize that can't be pickled. Has anyone looked into this to diagnose why this is happening?

@JiaweiZhuang
Copy link
Owner

JiaweiZhuang commented Oct 19, 2019

In pangeo-data/pangeo#334 (comment), I got Dask-distributed working on a synthetic regridding problem (just a sparse matrix multiply) by using dask.array.map_blocks . Just switching to the distributed scheduler generally won't work, due to the relatively large size of regridding weights (often > 100 MB), which takes very long to serialize and often kills the cluster, as detailed here.

To make Dask-distributed work, there needs to be an explicit call to broadcast the weights to all workers: weights_future = client.scatter(weights, broadcast=True). And then pass the future as an additional argument: da.map_blocks(apply_weights, input_data, weights_future, ...). See the full code in this notebook.

I haven't tested dask-distributed on xarray DataArray/Dataset yet, and your error might be due to a different issue associated with xarray metadata other than dask. A quick way to test this is to pass the raw data instead, i.e. regridder(ds['air'].data), since xesmf also works on pure dask arrays.

@JiaweiZhuang
Copy link
Owner

Hmm, with regridder(ds['air'].data), I got ValueError: ctypes objects containing pointers cannot be pickled and the end of this long trace:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
   3173     try:
-> 3174         result = cache[func]
   3175     except KeyError:

/srv/conda/envs/notebook/lib/python3.7/site-packages/zict/lru.py in __getitem__(self, key)
     47     def __getitem__(self, key):
---> 48         result = self.d[key]
     49         self.i += 1

TypeError: unhashable type: 'SubgraphCallable'

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x)
     37     try:
---> 38         result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     39         if len(result) < 1000:

ValueError: ctypes objects containing pointers cannot be pickled

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<timed exec> in <module>

/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2532             retries=retries,
   2533             user_priority=priority,
-> 2534             actors=actors,
   2535         )
   2536         packed = pack_data(keys, futures)

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2459                 {
   2460                     "op": "update-graph",
-> 2461                     "tasks": valmap(dumps_task, dsk3),
   2462                     "dependencies": dependencies,
   2463                     "keys": list(flatkeys),

/srv/conda/envs/notebook/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/srv/conda/envs/notebook/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py in dumps_task(task)
   3209             return d
   3210         elif not any(map(_maybe_complex, task[1:])):
-> 3211             return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   3212     return to_serialize(task)
   3213 

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
   3178             cache[func] = result
   3179     except TypeError:
-> 3180         result = pickle.dumps(func)
   3181     return result
   3182 

/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x)
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

/srv/conda/envs/notebook/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
   1123     try:
   1124         cp = CloudPickler(file, protocol=protocol)
-> 1125         cp.dump(obj)
   1126         return file.getvalue()
   1127     finally:

/srv/conda/envs/notebook/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    480         self.inject_addons()
    481         try:
--> 482             return Pickler.dump(self, obj)
    483         except RuntimeError as e:
    484             if 'recursion' in e.args[0]:

/srv/conda/envs/notebook/lib/python3.7/pickle.py in dump(self, obj)
    435         if self.proto >= 4:
    436             self.framer.start_framing()
--> 437         self.save(obj)
    438         self.write(STOP)
    439         self.framer.end_framing()

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    636         else:
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)
    640 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_tuple(self, obj)
    784         write(MARK)
    785         for element in obj:
--> 786             save(element)
    787 
    788         if id(obj) in memo:

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

/srv/conda/envs/notebook/lib/python3.7/pickle.py in _batch_setitems(self, items)
    885                 k, v = tmp[0]
    886                 save(k)
--> 887                 save(v)
    888                 write(SETITEM)
    889             # else tmp is empty, and we're done

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/srv/conda/envs/notebook/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj)
    888         else:
    889             if PY3:  # pragma: no branch
--> 890                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
    891             else:
    892                 self.save_reduce(

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    636         else:
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)
    640 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

/srv/conda/envs/notebook/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

/srv/conda/envs/notebook/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

/srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    522             reduce = getattr(obj, "__reduce_ex__", None)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:
    526                 reduce = getattr(obj, "__reduce__", None)

ValueError: ctypes objects containing pointers cannot be pickled

@JiaweiZhuang
Copy link
Owner

JiaweiZhuang commented Oct 19, 2019

Aha, problem solved. Just set this before applying the regridder to data:

regridder._grid_in = None
regridder._grid_out = None

regridder._grid_in was to linked to ESMF objects that involve f2py and ctypes, and Dask was having trouble pickling it. In the next version I will make sure that the Regridder class does not refer to any ESMF objects.

@JiaweiZhuang
Copy link
Owner

The explicit broadcasting of regridding weights is still TBD, though.

In pangeo-data/pangeo#334 (comment), I was thinking about adding an explicit regridder.set_distributed(client=client) call, to send the weights to all worker nodes. Then, regridder.weights will become a Dask future pointing to the distributed weights on all workers.

Or maybe there is a cleverer way to hide this explicit call from users. Any suggestions & PRs are extremely welcome!

@dgergel
Copy link

dgergel commented May 14, 2020

@JiaweiZhuang in your example notebook that you include above, what versions of xESMF and dask distributed are you using? I am still getting the ...cannot be pickled... error when I replicate your sample workflow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants