-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamically adjusting priority via Futures #1753
Comments
There are a few things that would have to happen here:
|
cc @philippjfr (as this may be of interest given recent Dask support in HoloViews) |
A small snippet of recursing through priorities of tasks on the scheduler. This came up when discussing this topic with @jakrikham def bump_priority(ts, priority):
if ts.status in ('memory', 'released', 'erred', ...):
return
if priority < ts.priority[0]:
raise ...
ts.priority[0] = priority
for dep in ts.dependencies:
self.bump_priority(dep, priority)
if ts.status == 'processing':
self.send_to_worker(ts.processing_on, {'op': 'bump-priority',
'priority': ts.priority}) |
I'm trying to manually adjust the priorities for futures (e.g. for tree reduction I want to assign higher priority to tasks that are closer to the final node), but I don't know how to find out the priority of a given future. From the snippet above it looks like it was possible to get This is my code: from dask.distributed import Client
client = Client()
def sq(x):
return x**2
futs = client.map(sq, range(3))
print(futs[0].priority)
# AttributeError: 'Future' object has no attribute 'priority' |
Those are TaskState objects and are internal to the scheduler. There is no mechanism today to adjust priorities dynamically. This is what this issue is proposing. |
Basically copied from this comment in PR ( #1651 ). Also related to issue ( dask/dask#2860 ).
One thing that would be nice is to be able to propagate priority changes from
Future
s themselves. Namely if one tries to callresult
, it would be good to bump the priority. Maybe this could be configurable if not generally desirable.To provide an example use case, imagine one has a Dask object (Array, Dataframe, etc.) and has called
persist
on the object. Then one wants to inspect a small piece of it while the computation proceeds. So they select out that piece and callcompute
. It would be good if this already was interpreted as the user wanting this piece of the result sooner than the rest. That way the user can play with this piece as the rest of the computation completes. If needed, this process can be repeated with other pieces.cc @mrocklin @p-himik
The text was updated successfully, but these errors were encountered: