Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do I start Dask from within an MPI4Py workflow #28

Open
mrocklin opened this issue Mar 1, 2019 · 0 comments
Open

How do I start Dask from within an MPI4Py workflow #28

mrocklin opened this issue Mar 1, 2019 · 0 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Mar 1, 2019

I'm sitting with @zonca and he's asking how to start a Dask application from within an mpi4py application. I'll give a brief explanation, and then some code snippets.

You can start a scheduler, client, and workers from within your Python script after you do other work, maybe you call a barrier, then on rank 0 start a scheduler, rank 1 start a client (and a bunch of other dask array/dataframe/delayed code) then on all the other ranks you start workers.

There are docs on how to start dask schedulers and workers from Python here: https://docs.dask.org/en/latest/setup/python-advanced.html .

Then @zonca asks

Well how do I get my data from the existing process?

There are many ways to do that, but a simple (if perhaps inelegant way) would be to attach the data on each process to some worker, and then run a task that collects that data on that worker, something like the following:

# on worker
w = Worker('tcp://127.0.0.1:8786', name='rank-' + rank)
w.my_special_data = data  # <--- we add this line to the docs mentioned above
w.start()

# on client
from dask.distributed import get_worker

def get_local_data():
    worker = get_worker()  # get the worker object from which this task is run
    return worker.my_special_data

# run that function on every rank that holds a worker
futures = [client.submit(get_local_data, pure=False, workers=['rank-' + str(rank)]) for rank in range(2, comm_size)]

Then you can do with those futures as you like

Then @zonca asks

Great, well then how do I cleanly shut down my Dask workers and continue on with my MPI execution?

I thought that calling client.retire_workers() would do this, but apparently it didn't. It looks like the Worker shuts down but the Torando IOLoop continues, which blocks the process. We can probably add a keyword argument to some method to improve this, or you can probably get around it by calling something like:

client.run(lambda : tornado.ioloop.IOLoop.current().stop())

Though this is somewhat rude to the workers :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant