Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing a distributed.Future to the kwargs of apply_ufunc should resolve the future #6803

Closed
alessioarena opened this issue Jul 18, 2022 · 10 comments

Comments

@alessioarena
Copy link

What is your issue?

I am trying to scatter an large array and pass it as keyword argument to a function applied using apply_ufunc but that is currently not working.
The same function works if providing the actual array, but if providing the Future linked to the scatter data the task fails.

Here is a minimal example to reproduce this issue

import dask.array as da
import xarray as xr
import numpy as np

data = xr.DataArray(data=da.random.random((15, 15, 20)), coords={'x': range(15), 'y': range(15), 'z': range(20)}, dims=('x', 'y', 'z'))

test = np.full((20,), 30)
test_future = client.scatter(test, broadcast=True)

def _copy_test(d, test=None):
    return test


new_data_actual = xr.apply_ufunc(
    _copy_test,
    data, 
    input_core_dims=[['z']],
    output_core_dims=[['new_z']],
    vectorize=True,
    dask='parallelized',
    output_dtypes="float64",
    kwargs={'test':test},
    dask_gufunc_kwargs = {'output_sizes':{'new_z':20}}
)

new_data_future = xr.apply_ufunc(
    _copy_test,
    data, 
    input_core_dims=[['z']],
    output_core_dims=[['new_z']],
    vectorize=True,
    dask='parallelized',
    output_dtypes="float64",
    kwargs={'test':test_future},
    dask_gufunc_kwargs = {'output_sizes':{'new_z':20}}
)

data[0, 0].compute()
#[0.3034994 , 0.08172002, 0.34731092, ...]

new_data_actual[0, 0].compute()
#[30.0, 30.0, 30.0, ...]

new_data_future[0,0].compute()
#KilledWorker

I tried different versions of this, going from explicitly calling test.result() to change the way the Future was passed, but nothing worked.
I also tried to raise exceptions within the function and various way to print information, but that also did not work. This last issue makes me think that if passing a Future I actually don't get to the scope of that function

Am I trying to do something completely silly? or is this an unexpected behavior?

@alessioarena alessioarena added the needs triage Issue that has not been reviewed by xarray team member label Jul 18, 2022
@TomNicholas TomNicholas added topic-dask and removed needs triage Issue that has not been reviewed by xarray team member labels Jul 19, 2022
@alessioarena
Copy link
Author

This is still an issue.
I noticed that the documentation of map_blocks states:
kwargs (mapping) – Passed verbatim to func after unpacking. xarray objects, if any, will not be subset to blocks. Passing dask collections in kwargs is not allowed.

Is this the case for apply_ufunc as well? if yes than it is not documented.
Is there another recommended way to pass data to workers without clogging the scheduler for this application?

@alessioarena
Copy link
Author

I think I may have narrowed down the problem to a limitation in dask using dask_gateway.

If passing a Future to a worker, the worker will try to unpickle that Future, and as part of that unpickle the Client object passed when creating such Future.

Unfortunately, in a dask_gateway context the client is behind a gateway connection that is not understood by the worker as normally does not have to deal with a gateway at all.
In my case I do not get any error message, just the task failing and retrying over and over, but fiddling around I managed to get the same error as this post (https://stackoverflow.com/questions/70775315/scattering-data-to-dask-cluster-workers-unknown-address-scheme-gateway)

@crusaderky
Copy link
Contributor

This is still an issue. I noticed that the documentation of map_blocks states: kwargs (mapping) – Passed verbatim to func after unpacking. xarray objects, if any, will not be subset to blocks. Passing dask collections in kwargs is not allowed.

Is this the case for apply_ufunc as well?

test_future is not a dask collection. It's a distributed.Future, which points to an arbitrary, opaque data blob that xarray has no means to know about.

FWIW, I could reproduce the issue, where the future in the kwargs is not resolved to the data it points to as one would expect.
Minimal reproducer:

import distributed
import xarray

client = distributed.Client(processes=False)
x = xarray.DataArray([1, 2]).chunk()
test_future = client.scatter("Hello World")


def f(d, test):
    print(test)
    return d


y = xarray.apply_ufunc(
    f,
    x,
    dask='parallelized',
    output_dtypes="float64",
    kwargs={'test':test_future},
)
y.compute()

Expected print output: Hello World
Actual print output: <Future: finished, type: str, key: str-b012273bcde56eadf364cd3ce9b4ca26>

@crusaderky crusaderky changed the title Unable to use dask.scatter with apply_ufunc Passing a distributed.Future to the kwargs of apply_ufunc should resolve the future Oct 17, 2022
@alessioarena
Copy link
Author

I can add that this problem is augmented in a dask_gateway system where the task just fails.

With apply_ufunc I never received an error but in similar context I obtained something very similar to dask/dask-gateway#404.

My interpretation is that the Future is resolved at the worker (or in case of apply_ufunc a thread of this worker) and embeds a reference to the Client object. This last however uses a gateway connection that is not understood by the worker as generally is the scheduler dealing with those

@crusaderky
Copy link
Contributor

Having said the above, your design is... contrived.

There isn't, as of today, a straightforward way to scatter a local dask collection (persist() will push the whole thing through the scheduler and likely send it out of memory).

Workaround:

test = np.full((20,), 30)
a = da.from_array(test)
dsk = client.scatter(dict(a.dask), broadcast=True)
a = da.Array(dsk, name=a.name, chunks=a.chunks, dtype=a.dtype, meta=a._meta, shape=a.shape)
a_x = xarray.DataArray(a, dims=["new_z"])

Once you have a_x, you just pass it to the args (not kwargs) of apply_ufunc.

@alessioarena
Copy link
Author

I'm not sure I understand the code above.

In my case I have an array of approximately 300k elements that each and every function call needs to have access.
I can pass it as a kwargs in its numpy form, but once I scale up the calculation across a large dataset (many large chunks) such array gets replicated for every task pushing the scheduler out of memory.

That is why I tried to send the dataset to the cluster beforehand using scatter, but I cannot resolve the Future at the workers

@crusaderky
Copy link
Contributor

crusaderky commented Oct 17, 2022

new_data_future = xr.apply_ufunc(
    _copy_test,
    data, 
    a_x,
    ...
)

instead of using kwargs.

I've opened dask/distributed#7140 to simplify this. With it implemented, my snippet

test = np.full((20,), 30)
a = da.from_array(test)
dsk = client.scatter(dict(a.dask), broadcast=True)
a = da.Array(dsk, name=a.name, chunks=a.chunks, dtype=a.dtype, meta=a._meta, shape=a.shape)
a_x = xarray.DataArray(a, dims=["new_z"])

would become

test = np.full((20,), 30)
a_x = xarray.DataArray(test, dims=["new_z"]).chunk()
a_x = client.scatter(a_x)

@alessioarena
Copy link
Author

I will try that.
I still find it weird that I need to wrap a numpy object into a task/xarray object to be able to send it to workers when there is dask.scatter made for exactly that purpose.

Thanks for opening that issue. I do feel there is the need to revisit scatter functionality and role particularly around dynamic clusters.

Having a better look at your initial comment, that may still work if you call Future.result() method inside the function applied. That in theory should retrieve the data associated with that Future, in that case "Hello World". However, in a dark gateway setup that will fail

@dcherian
Copy link
Contributor

Not sure there's anything actionable here

@ollie-bell
Copy link

ollie-bell commented Jan 9, 2024

I think this thread is related to my problem, but not 100% sure.

I have a single xarray dataset (holding multiple dataarrays) which I want to load into worker memory across a dask cluster, and then I do a bunch of different computations on the same data.

I guess it's up to dask to work out how it wants to distribute the chunks across worker memory, but one scheme I imagine could be each worker loads N_chunks / N_workers number of chunks for each dataarray in the dataset. e.g. if there are 5 dataarray's in the dataset and each dataaray is 20 chunks, if there are 10 workers then each worker would load into memory 2 chunks from each dataarray.

Is this, or something like it, what a simple ds.persist() achieves? When I do this I get a warning from dask:

/home/ec2-user/miniforge3/envs/py311/lib/python3.11/site-packages/distributed/client.py:3162: UserWarning: Sending large graph of size 39.75 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.

(Note I have not done anything more than xr.open_mfdataset with chunking)

The data loading seems pretty slow, wondering if I should be heeding this warning and using scatter...?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants