-
-
Notifications
You must be signed in to change notification settings - Fork 134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
more adaptive scaling fixes #97
Conversation
Some interesting failures here |
I agree. I plan to dig into them further tomorrow. I think the switch to using |
@lesteve - may I ask for a review here? If anyone has any ideas on how to move forward here, I would certainly appreciate it. It seems that my change to |
self.start_workers(n - active_and_pending) | ||
|
||
def scale_down(self, workers): | ||
''' Close the workers with the given addresses ''' | ||
logger.debug("Scaling down. Workers: %s" % workers) | ||
logger.debug("Scaling down. Workers: %s", workers) | ||
worker_states = [] | ||
for w in workers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrocklin - I've been working on debugging the failures we're seeing and I may have stumbled onto a real bug. I'll lay out what I can understand.
- when we call
scale(0)
, we correctly end up in the scale down method - however,
workers
is of type (_asyncio.Future
) and is not a list of workers workers.result()
yields an empty list.
From here, I've tried a bunch of things including:
- removing the
loop.add_callback
calls inscale
- tweaking the remove/close_workers in
retire_workers
But alas, I'm pretty lost. Any pointers would be appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, lost with reason. That looks like a pretty clear bug. I'm not sure how that ever would have worked.
For context. Scheduler.retire_workers
is a coroutine
@gen.coroutine
def retire_workers(self, comm=None, workers=None, remove=True,
close_workers=False, **kwargs):
This means that it is meant to be called within other coroutines, with a yield statment
@gen.coroutine
def f():
yield scheduler.retire_workers(...)
This is true of any internal dask scheduler method that communicates with other things or does anything that might take any non-trivial amount of time.
In this way, coroutines are a bit viral, which in our case is bad, because we don't want scale
to be a coroutine, because users use it, and users get confused by coroutines.
Another approach is to call coroutines with add_callback
which says "run this whenever you have a moment, but don't lets deal with it now, because it's a bit messy".
So probably the thing to do here is in distributed/deploy/cluster.py::Cluster.scale
we want to call self.scheduler.workers_to_close
which is thankfully just a normal method and then we want to add self.scheduler.retire_workers
as a callback with those workers to be run in just a moment, then we want to call scale_down
as we do currently (which is also as a callback, just in case the cluster object implements it as a coroutine as well.
I can do this, but I'd also be very happy to guide someone else through it in the interests of spreading some of this knowledge around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear the problem here is that we were calling it as a normal function
def scale(...):
...
to_close = self.scheduler.retire_workers(...)
without a yield within a function that was not a coroutine. In this case it returns an opaque asyncio/torando Future object. This is never a good thing to see and in our case almost always signifies a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrocklin - I'm happy to push this forward if you can provide some hand holding. https://github.com/jhamman/distributed/commit/ad0256006bc7d5018277da0d6c686224ffaaa8fe is a first attempt based on my reading of what is above. This does cause some tests to fail but I think it is worth sharing in its current form for initial feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you did the choice of overloading scale
method in LocalCluster
. Is this really simpler than the solution to handle active_and_pending
jobs in scale_up
?
dask_jobqueue/core.py
Outdated
jobs = list(self.pending_jobs.keys())[to_kill:] | ||
self.stop_jobs(jobs) | ||
else: | ||
# we need to retire some workers (and maybe pending jobs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that here we need to remove all pending jobs, so comment seems false, or is there something I don't understand ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this comment is a bit misleading. What I meant to say was that all pending jobs will be killed too.
dask_jobqueue/lsf.py
Outdated
jid = out.split('<')[1].split('>')[0].strip() | ||
if not jid: | ||
raise ValueError('Unable to parse jobid from output of %s' % out) | ||
return jid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we wrap the job-id inside core.py, when we call _job_id_from_submit_output
? It would be simpler.
dask_jobqueue/core.py
Outdated
""" | ||
with log_errors(): | ||
active_and_pending = self._count_active_and_pending_workers() | ||
if n >= active_and_pending: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these two lines are why I am currently overloading the scale method. We need to know if we want to scale up or scale down and that depends on the number of pending workers. The current Cluster.scale()
method does not have a way to evaluate pending workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is currently done in scale_up
, maybe not correctly, but what prevents us from doing it there?
Something like this:
def scale_up(self, n, **kwargs):
""" Brings total worker count up to ``n`` """
active_and_pending = self._count_active_and_pending_workers()
if n >= active_and_pending:
self.start_workers(n - self._count_active_and_pending_workers())
else:
n_to_close = active_and_pending - n
if n_to_close < self._count_pending_workers():
# We only need to kill some pending jobs
to_kill = int(n_to_close / self.worker_processes)
jobs = list(self.pending_jobs.keys())[to_kill:]
self.stop_jobs(jobs)
else:
raise something, we should never be there in scale_up.
Gave a try today to this PR. Works fine! Actually, I don't have any problem on master branch when using scale and adaptive, and this one works good too (no noticable change from user perspective). Just one case in which I se some faulty behaviour (but seeing it too on master) : # Create some cluster
cluster = PBSCluster(processes=2, cores=4, memory="20GB")
# Scale workers up
cluster.scale(8)
# Correct number of jobs and worker showing up
# Do things, then scale down
cluster.scale(4)
# Only one job and two workers left in running jobs :
cluster.running_jobs
Out: OrderedDict([('6732209',
{'dask-worker--6732209---0': <Worker 'tcp://10.135.36.116:40331', memory: 0, processing: 0>,
'dask-worker--6732209---1': <Worker 'tcp://10.135.36.116:35941', memory: 0, processing: 0>})]) One way or another, it looks like something's wrong with _adaptive_options = {
'worker_key': lambda ws: _job_id_from_worker_name(ws.name)} |
Could we enable some debug logs in the test here, to have more insights of why tests are failing? |
So taking another look at this:
|
@guillaumeeb - thanks for taking a look at this. My attentions have obviously been elsewhere lately so I appreciate the persistence on your part. |
So I propose this new way without overriding Not sure how to handle the test |
So as discussed previously, the In order to advance here, I propose to disable the test here for the time being, and to merge this PR as it provides some nice enhancement to dask-jobqueue. I propose then to implement dask/distributed#2257 in another PR directly in dask-jobqueue, meaning overload scale or even upstream Cluster object. This should hopefully fix the failing test here. As discussed in #130, this should provide some interesting lessons for dask/distributed#2235. |
Thanks @guillaumeeb. I like the plan you've laid out. Happy to see this merged as is. |
@lesteve submitted a review on #63 after we merged. The changes he requested were mostly small so I'm just addressing them here.
closes #112