Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically change task priorities #2860

Closed
p-himik opened this issue Nov 3, 2017 · 11 comments
Closed

Dynamically change task priorities #2860

p-himik opened this issue Nov 3, 2017 · 11 comments

Comments

@p-himik
Copy link
Contributor

p-himik commented Nov 3, 2017

Dask is great for the current workflow that I'm using - read a number of tables, run some functions on each, output the result as a table where each cell either shows the initial table ID or the result of one of the functions in the pipeline.

Now that I'm implementing a UI for this table, the need to sort and filter interactively it becomes apparent, as well as the need to show intermediate results right as they're ready.

Judging by the source of distributed.scheduler, the current implementation doesn't allow for dynamic task priority. Are there any plans to implement it? Or maybe any ideas on how I can implement it in the best way possible?

@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2017

Short term I have no personal plans to implement it. You could do it yourself if you were interested. Current policy is here: http://distributed.readthedocs.io/en/latest/scheduling-policies.html?highlight=priority#choosing-tasks

In code you would want to search for priority in client.py and scheduler.py.

@jakirkham
Copy link
Member

Would be interested to hear your thought process as to how you would go about reprioritizing tasks.

@mrocklin
Copy link
Member

We would probably just send an update-graph message with new priority values without any graph. This almost certainly doesn't work now, but could be made to.

The real challenge here is that the scheduler currently auto-increments priority on every new update_graph call, preferring older tasks to newer ones. It's not clear to me how we would change this policy. Maybe there is a third policy that overrides the other two.

(user-defined-priority, scheduler first come first served priority, priority from graph placement)

@p-himik
Copy link
Contributor Author

p-himik commented Nov 23, 2017

In my implementation, I subclassed the scheduler and added a new message that just changes priorities stored on all workers. It doesn't influence already assigned tasks and it doesn't change priorities in previously submitted graphs, but in my case, at the start of my project, that's perfectly fine.
However, as more users start to use the service, I think it would make sense to have the ability to alter and/or completely disable the increment of the generation priority, so even if a user submits a graph a bit later than another user, he won't have to wait till all of the workers are finished with the tasks from that user.

@jakirkham
Copy link
Member

Copying this comment over as it is relevant to this discussion.

Pointers as to what to look at and try wouldn't hurt. Whether I'll be able to follow through is another question. At least it would give me an opportunity to familiarize myself more with how job scheduling works.

I would start at distributed/scheduler.py::Scheduler.update_graph, in particular this line

           self.priority[key] = (generation, new_priority[key])  # prefer old

This is where we decide to first prefer first-come-first-served (the generation variable), and then to prefer the graph-based priority. Probably we want to add a third element just before generation that is user-defined priorities.

The user provides information to this function in the distributed/client.py::Client._graph_to_futures method, which is used from methods like compute and persist.

@jakirkham
Copy link
Member

it would make sense to have the ability to alter and/or completely disable the increment of the generation priority

Not sure I see the use case for altering the generation priority. Do you know of one @p-himik?

However, for disabling it, this use case seems clear. It would be pretty easy to add a flag like inc_generation to update_graph, which defaults to True, but could be set to False to skip increasing the generation when calling update_graph. This assuming that we agree update_graph is the place where we want to address this issue.

@p-himik
Copy link
Contributor Author

p-himik commented Dec 17, 2017

@jakirkham The only thing I can think of is some high-priority tasks that appeared after a bunch of low-priority tasks have already been scheduled. E.g. all workers may be loaded with some week-long analysis, but we decided that we want some quick summary statistics on the data right now.

@jakirkham
Copy link
Member

Sure I understand the need for prioritizing tasks independently of when they are submitted. Just think that should be represented with a priority independent of their generation.

@mrocklin
Copy link
Member

There is a start to this here: dask/distributed#1651

I did it while I had some free time on a plane. I am not planning on continuing this work short term (my todo list is somewhat long) but if someone else wants to take it on that would be very welcome.

@jakirkham
Copy link
Member

Thanks for starting this. Happy to give it a look once we wrap up PR ( #2980 ).

@jakirkham
Copy link
Member

Opened issue ( dask/distributed#1753 ), which is similar to this one except that it would auto-propagate priority changes via user operations on Dask collections that have Futures in them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants