Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for workers to join before continuing #2138

Closed
jacobtomlinson opened this issue Jul 23, 2018 · 17 comments · Fixed by #2688
Closed

Wait for workers to join before continuing #2138

jacobtomlinson opened this issue Jul 23, 2018 · 17 comments · Fixed by #2688

Comments

@jacobtomlinson
Copy link
Member

Moved from dask/dask-kubernetes#87

It would be useful to have the cluster.scale() method block until the workers join. This was originally raised in dask-kubernetes but is applicable to dask-jobqueue and dask-yarn so would make more sense to implement here.

Suggestions from @mrocklin:

One solution here might be to use SchedulerPlugins to explicitly trigger events whenever a new worker reaches the scheduler. Calling wait on the cluster object might register a tornado.locks.Event or tornado.locks.Condition that gets triggered when the right number of workers have arrived.

@jhamman implemented a SchedulerPlugin for dask-jobqueue in order to improve adaptive operation (he wanted adaptive to be aware of pending jobs). This work happened in dask/dask-jobqueue#63 and might be the sort of thing we could think about generalizing. Any thoughts on this @jhamman?

@guillaumeeb
Copy link
Member

Hi there. From dask-jobqueue perspective, I don't think it would be a good thing to wait until all workers join. Maybe block until one worker is there, but with job scheduler, waiting for all of them could take hours if not more. Maybe add a kwarg to scale() ?

From what I remember, using the plugin as suggested by @mrocklin looks simple enough.

@jacobtomlinson
Copy link
Member Author

You make a good point. I'm making a few assumptions here.

The first is that I'm using a cluster which doesn't make me wait very long to give me workers (the cloud). I'm talking about interactive jobs rather than batch jobs.

I'm also assuming that I'm doing something brittle which needs all the workers before starting. The primary example of this for me right now is doing a live demo, I don't want it to run on one worker as it will be underwhelming. I want to scale and wait until I have a nice big cluster before plowing through my graph.

I appreciate that might not be a good enough reason to do this. I could write a little script to block until workers reaches n for live demos.

@mrocklin
Copy link
Member

mrocklin commented Aug 3, 2018

I think it would make sense to have an explicit wait method or wait= keyword to scale that would allow a user to opt-in to blocking semantics. I would use this for benchmarking.

@jacobtomlinson
Copy link
Member Author

Perhaps we could have wait=bool and/or wait_for=int so you could specify a minimum number to wait for before starting. This may fit better with jobqueue.

@guillaumeeb
Copy link
Member

Yep, I had something like this in mind. Being able to wait for a user defined number of workers to be online is something that would definitly be useful!

@jakirkham
Copy link
Member

FWIW code like this has served this purpose quite well for us. Would be easy enough for someone to add it if they wish.

while ((client.status == "running") and (len(client.scheduler_info()["workers"]) < nworkers)):
    sleep(1.0)

@guillaumeeb
Copy link
Member

Could we use a non active wait solution for that?

Maybe using http://distributed.dask.org/en/latest/plugins.html#scheduler-plugins and some callback ?

Do you have suggestion on this @mrocklin ?

@mrocklin
Copy link
Member

mrocklin commented Mar 4, 2019

Adding some constraints here:

Currently the scale method is fast so that it can be reliably used within the event loop. Typically scale starts some jobs and then returns immediately. If we decide that it should block, then it has to block in an async-friendly way. This means that scale has to become a coroutine if we're in the event loop (it has to return a future), and a blocking function if we're not. This will require some finesse.

We might solve this in a couple of ways:

  1. We could change the contract of scale across all of the projects. This might be necessary at some point anyway.

  2. We could not add the wait= keyword to scale, and instead add a second method, .wait() which followed the coroutine semantics commonly used in the dask.distributed codebase.

    async def _wait(self, n):
        while not condition():
            await gen.sleep(0.01)
    
    def wait(self, n):
        return self.sync(self._wait, n)

@guillaumeeb
Copy link
Member

Just for my understanding, if we do not modify scale and instead just add a wait method, why would we want it to follow the common coroutine semantics?

@mrocklin
Copy link
Member

mrocklin commented Mar 4, 2019

In case someone wanted to use wait from within an asynchronous environment.

async with SGECluster(...) as cluster:
    cluster.scale(10)
    await cluster.wait()
    x.compute()

@elliottslaughter
Copy link

elliottslaughter commented Apr 12, 2019

As another data point, this is impacting my use of Dask in an HPC environment (and some thoughts on CLI integration below).

My jobs are started by a job scheduler (SLURM), and at the start of the job I want to spin up a scheduler and set of workers. For reasons not worth mentioning here, I'm doing this by calling dask-scheduler and dask-worker rather than dask-ssh or dask-mpi, but I believe the same issue appears no matter which route I go. Conceptually the setup looks like:

dask-scheduler &
srun ... dask-worker ... &
python my_dask_script.py

There is a race between the three steps. This is not a problem for correctness since the worker knows to retry if the scheduler isn't up yet, and the scheduler knows to wait for at least one worker to arrive before allowing the client to start running. But in my case this is a performance issue because I want to do accurate timing of execution, and if the client starts before all workers have arrived then the run isn't reflective of the steady state performance.

Currently I'm doing a fairly awful workaround based on parsing the log files from scheduler and counting the number of times it reports workers. You can see my script below. I'm not sure how exactly the API mentioned in the original post would integrate with my workflow; presumably this would need to be exposed via the CLI somehow. I would prefer not to have to write custom scripts to start Dask but I suppose I can do that if absolutely necessary.

https://github.com/elliottslaughter/task-bench/blob/dask/experiments/cori_metg_compute/metg_dask.sh#L18-L43

@martindurant
Copy link
Member

Could you create the client in the script, and then poll client.ncores() until you see as many workers are you are happy with?

@elliottslaughter
Copy link

I think could make that work. In my case I know how many nodes I'm planning to boot, so I can just tell that to the client.

Is there API documentation for this? I was trying to figure out if e.g. the num_workers keyword argument to compute was applicable to the distributed scheduler, but never found anything beyond this page (which only mentions multiprocessing and threaded schedulers): https://docs.dask.org/en/latest/scheduler-overview.html#configuring-the-schedulers

@martindurant
Copy link
Member

https://distributed.readthedocs.io/en/latest/api.html#distributed.Client.ncores

The method just gives information of the scheduler's view of the cluster.

@mrocklin
Copy link
Member

mrocklin commented Apr 12, 2019 via email

@mrocklin
Copy link
Member

mrocklin commented Apr 12, 2019 via email

@guillaumeeb
Copy link
Member

@elliottslaughter I guess you using dask-jobqueue? Have you seen this PR: dask/dask-jobqueue#223 by @danpf?

It is not properly finished, but may give you some insights.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants