Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic adaption of average task duration #3516

Open
gsailer opened this issue Feb 24, 2020 · 5 comments
Open

Dynamic adaption of average task duration #3516

gsailer opened this issue Feb 24, 2020 · 5 comments

Comments

@gsailer
Copy link
Contributor

gsailer commented Feb 24, 2020

We currently experience an issue with very long-running tasks, when using adaptive_target to figure out how many workers the dask graph needs.
The estimation by adaptive_target is currently wrong as long as no task of the TaskPrefix has been executed yet.
Could we change the estimation to correct dynamically and propagate a new average across the cluster as soon as the execution of one task e.g. already takes twice the estimated time?

@gsailer
Copy link
Contributor Author

gsailer commented Mar 19, 2020

Do you see any chance in adding timing logic around the ThreadPoolExecutor on the worker and creating a new endpoint to the scheduler, which receives updated timing info for tasks and then propagates that to all other workers?

start_time = time.time()
future = executor.submit(function, *args, **kwargs)
pc = PeriodicCallback(lambda: self.threadpool_timing_callback(start_time), 1000)
pc.start()
def threadpool_timing_callback(self, start_time):
     if time.time() - start_time > some_boundary:
          update_average()
          notify_scheduler(task_key, new_average)

https://github.com/dask/distributed/blob/master/distributed/worker.py#L2309-L2319

@mrocklin
Copy link
Member

Could we change the estimation to correct dynamically and propagate a new average across the cluster as soon as the execution of one task e.g. already takes twice the estimated time?

Sure. I think that we currently do something similar with work stealing. Maybe?

It is a little bit hard because we don't know when the first task of a prefix has actually started running, only when it has been sent to a worker.

Or perhaps this is the motivation for your second comment, about including some signal in the worker that talks back to the scheduler once a task has started processing? Was this was you meant?

@mrocklin
Copy link
Member

If so, the concern here is that we're increasing the communication by a non-trivial amount. Sending a new message for every task is doable, but does add non-trivial cost.

@gsailer
Copy link
Contributor Author

gsailer commented Mar 21, 2020

Or perhaps this is the motivation for your second comment, about including some signal in the worker that talks back to the scheduler once a task has started processing? Was this was you meant?

Yes, the timing around the executor would be a way to solve the problem with the information about when the tasks start running.

Assuming that task duration in one TaskPrefix is roughly normally distributed I think the amount of additional messages could be limited to: O(prefixes * task duration corrections * workers) by only sending messages when the currently executing task is over the boundary, which would adapt dynamically with an exponential backoff and then propagate the new expected duration to all other workers via the scheduler.

I will have to look into how the work stealing is done, but in general this problem seems pretty complex to me and I would be very happy about help with the general design and integration of this as I'm not experienced with distributed systems, the design approaches and the pitfalls that come with it.

@mrocklin
Copy link
Member

Yes, I agree that this seems complicated. It also isn't a common issue (although I acknowledge that it is a serious issue when it is an issue). Because it is not so common, any solution would need to not add much code complexity, especially not in the common case.

I would be very happy about help with the general design and integration of this as I'm not experienced with distributed systems

In principle I would love to be able to offer this help, but my maintainer time is limited

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants