Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added wait until n workers arguments #223

Closed
wants to merge 8 commits into from
Closed

added wait until n workers arguments #223

wants to merge 8 commits into from

Conversation

danpf
Copy link
Contributor

@danpf danpf commented Jan 16, 2019

Sometimes my scheduler is very full and it can take me a very long time to get a worker. This way, when I do something like:

cluster.adapt(minimum=10,wait_until_n=2)
...{setup client}...
client.scatter([mything])

I don't get a timeout error on the scatter if it takes me 1hr+ to get those workers.

not sure if you are actually interested in this feature, just posting in case you are.

related to
dask/distributed#2454

@lesteve
Copy link
Member

lesteve commented Jan 17, 2019

There was another related discussion in distributed in dask/distributed#2138.

I would encourage to open a PR in distributed because it feels something like this belongs there. I haven't thought about the interplay with adapt and scatter, I feel this is slightly orthogonal but I may be wrong.

@danpf
Copy link
Contributor Author

danpf commented Jan 17, 2019

it is the same problem as there, but it isn't.
Since jobqueue ClusterManagers don't inherit from distributed Clusters, they both would have to implement this separately.

there is also i think a bug here since Adaptive gets the wait_until_n kwarg... but i haven't seen any problems yet.

@lesteve
Copy link
Member

lesteve commented Jan 18, 2019

Since jobqueue ClusterManagers don't inherit from distributed Clusters, they both would have to implement this separately.

I see, good point. Still I think something like this should be done in distributed because it is applicable in other projects (LocalCluster in distributed, KubernetesCluster in dask-kubernetes, etc ...) and I think a PR in distributed would be very welcome.

We can then think about how to have the same functionality in dask-jobqueue with minimal code and deferring to the distributed implementation. Full disclosure: I haven't followed very closely the ClusterManager development so the situation may be a bit more complicated than this, not 100 % sure ...

You may do something like this already, but in the mean time, I would recommend to use a work-around like dask/distributed#2138 (comment).

@guillaumeeb
Copy link
Member

For the reason behind ClusterManager, see #170. The idea is to use dask-jobqueue as an experimental project for designing the common part from distributed codebase.

The wait_for_n feature here is another to add to the ClusterManager features. Maybe it is simple enough to be put in distributed right now, I'm not sure how we want to do things here. @mrocklin any opinion?

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep_until_n_workers internals shall be modified, see comment.

@@ -83,8 +84,14 @@ def __init__(self, adaptive_options={}):
self._adaptive_options = adaptive_options
self._adaptive_options.setdefault('worker_key', self.worker_key)

def sleep_until_n_workers(self, n):
'''Block by sleeping until we have n active workers'''
while self._count_active_workers() < n:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ClusterManager, we should not rely on functions from JobQueueCluster. You need to find something than can be called on the scheduler remotly at one point. Or we need the ClusterManager to be informed of any incoming worker.
What shall be kept in mind is that the ClusterManager is potentially another process than the Scheduler, and that it is build to be usable by other Cluster implementation than those of dask-jobqueue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you should use len(self.scheduler.workers). And at one point will use some remote call to the scheduler process to actually have this information.

@mrocklin
Copy link
Member

mrocklin commented Jan 22, 2019 via email

@guillaumeeb
Copy link
Member

Let's work on this in dask-jobqueue first, and we'll see how it can be made available to distributed if needed.

@guillaumeeb
Copy link
Member

@danpf any interest in finishing this?

@danpf
Copy link
Contributor Author

danpf commented Feb 8, 2019

it's pretty much done. pending tests.

@danpf
Copy link
Contributor Author

danpf commented Feb 15, 2019

There is some test leaking somehow. I'm not sure why but If i just move some tests aroudn like in the last commit the tests pass.

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm honestly not sure what caused the test failures...

Anyway, this looks good to me, thanks @danpf!

If possible, I'd like for @jhamman, @lesteve or @mrocklin to take a quick look at this one.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some feedback. I have some concerns both with adding this as a keyword argument, and also with the current lack of async support.

For context, async support is necessary to make it easy to manage clusters in things like the current dask-jupyterlab extension, or for any future dask-hub project.

@@ -122,6 +131,7 @@ def adapt(self, minimum_cores=None, maximum_cores=None,
kwargs['maximum'] = self._get_nb_workers_from_memory(maximum_memory)
self._adaptive_options.update(kwargs)
self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options)
self.wait_until_n_workers(wait_until_n)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapt can be called within the event loop, so we can't have a while ...: time.sleep() call in there, otherwise things will halt/block.

def wait_until_n_workers(self, n):
'''Block by sleeping until we have n active workers'''
while n and len(self.scheduler.workers) < n:
time.sleep(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this function then I would decrease this sleep significantly, probably to around 10ms. It'll still be way less than 1% cpu time.

It's common in Dask to build functions that are async-friendly first, and then wrap around them with self.cluster.sync. If we're supporting Python 2 then we do this with @gen.coroutine, if we're not then we use async def.

@@ -200,6 +212,7 @@ def scale(self, n=None, cores=None, memory=None):
# TODO we should not rely on scheduler loop here, self should have its
# own loop
self.scheduler.loop.add_callback(self._scale, n, cores, memory)
self.wait_until_n_workers(wait_until_n)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that having a keyword like this in many methods is a sign that maybe we shouldn't include it, and instead we should expect users to make two calls.

-cluster.scale(10, wait_until_n=10)
+cluster.scale(10)
+cluster.wait_until_n_workers(10)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise keyword arguments like that tend to propagate to many new methods, resulting in an n x m maintenance problem.

@mrocklin
Copy link
Member

mrocklin commented Mar 4, 2019

Should have said first though, this is nice work. People have asked for things like this before, so thank you @danpf for pushing on it.

@mrocklin
Copy link
Member

mrocklin commented Mar 4, 2019

I've added a comment to the original issue: dask/distributed#2138 (comment)

@guillaumeeb
Copy link
Member

@mrocklin from what I understand from your comment, expecting user to make and explicit call to the method also solve the problem related to the lack of async support, or am I missing something?

@mrocklin
Copy link
Member

mrocklin commented Mar 4, 2019

If you don't call wait within scale or adapt then having a synchronous-only method is fine, as long as people don't try to call it from async contexts. Most people don't use async programming so supporting async isn't a big deal. The implementation in the issue should work out OK though, and hopefully isn't too complex.

@danpf
Copy link
Contributor Author

danpf commented Mar 5, 2019

As one of the few people that use async -->
I don't have, nor forsee any reasons to asynchronously scale nor adapt. 100 % of the time currently for me my scripts look like:

  1. distributed client is initted with adapt, or scale, wait until workers are up
  2. then actually start doing complex async workflows.

since we depend on distributed workers for actual computations, there's really nothing else we could be doing instead of synchronously sleeping, so it's not a big deal for us.
I understand the need to plan ahead, but figured I would just mention my actual usage to provide some context.

@guillaumeeb
Copy link
Member

Even if we leave the wait_until_n_workers code into scale or adapt, there's no reason dask-labexstension would call it with a wait kwarg > 0, so I don't think this would block the loop.

Anyway I'm really new to async programming, so I leave this discussion to both of you. @danpf, is there a downside into handling async call, except a few lines of code more?

@mrocklin
Copy link
Member

mrocklin commented Mar 5, 2019

there's really nothing else we could be doing instead of synchronously sleeping

Well, there are some things you would be doing, like having the scheduler respond to the workers that are coming in and update its count of current workers. The current loop, if run within the event loop, will never terminate because the scheduler will never be able to acknowledge the new workers.

Calling blocking calls within the event loop effectively shuts down the entire system.

@guillaumeeb
Copy link
Member

guillaumeeb commented Mar 6, 2019

As moving to async as proposed by @mrocklin in dask/distributed#2138 (comment) seems not to be a big deal, I propose @danpf implements it, along with the sleeping time at 0.01, or 0.1 which is probably reactive enough.

As per not including the call in scale and adapt and the associated keyword, I'm still undecided...

@guillaumeeb
Copy link
Member

@danpf, do you have time for finishing this along what @mrocklin proposes?

@danpf
Copy link
Contributor Author

danpf commented Mar 19, 2019

Sorry I'm really busy for a while, probably ~2 weeks I can do this. apologies if that's too late.

@guillaumeeb
Copy link
Member

guillaumeeb commented May 9, 2019

Edit: sorry wron github handle...

@danpf , do you think you can follow up on this one in the near future?

@guillaumeeb
Copy link
Member

So @danpf, I believe this is superseeded by dask/distributed#2688, so we can close this one?

@danpf
Copy link
Contributor Author

danpf commented May 16, 2019

yup!

@danpf danpf closed this May 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants