-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Queue root tasks on scheduler, not workers [with co-assignment] #6598
base: main
Are you sure you want to change the base?
Conversation
Idea was that if a `SortedSet` of unrunnable tasks is too expensive, then insertion order is probably _approximately_ priority order, since higher-priority (root) tasks will be scheduled first. This would give us O(1) for all necessary operations, instead of O(logn) for adding and removing. Interestingly, the SortedSet implementation could be hacked to support O(1) `pop` and `popleft`, and inserting a min/max value. In the most common case (root tasks), we're always inserting a value that's greater than the max. Something like this might be the best tradeoff, since it gives us O(1) in the common case but still maintains the sorted gaurantee, which is easier to reason about.
Now task states on the dashboard are listed in the logical order that tasks transition through.
Simpler, though I think basically just an int of 1 may be the most reasonable.
This is just a hack currently, but maybe it would actually be useful?
This reverts commit df11f719b59aad11f39a27ccae7b2fd4dfd9243a.
When there were multiple root task groups, we were just re-using the last worker for every batch because it had nothing processing on it. Unintentionally this also fixes dask#6597 in some cases (because the first task goes to processing, but we measure queued, so we pick the same worker for both task groups)
This reverts commit fdd5fd9.
The main weakness of this PR is that it all relies on the hacky co-assignment logic from The fact that the current co-assignment logic completely relies on picking a worker for every task up front means we need these per-workers queues to keep track of that decision. But this makes dealing with workers joining/leaving difficult. If we're going to hold tasks back on the scheduler, it would be way easier if we could pick workers in realtime, rather than pre-determining them. We don't have "real" co-assignment logic. We don't currently have a cheap pure function to determine, given a task, what its neighboring tasks are (or how many there are), and which worker(s) they are assigned to. (This obviously can be computed by traversing the graph, but we were worried about the performance of that long ago #4864 (comment)). We want to be able to identify "families" or "neighborhoods" of tasks that should be run together. Specifically, this refers to a set of tasks which all must be in memory at once, on the same worker, to compute a dependent task. (Yes, It turns out though that being able to refer to task families would simplify a number of places:
My point being that I think this PR works, but if anyone feels like "huh, this is starting to add a lot of complexity", I kind of agree. And I think the cause of that complexity is not having a robust way to co-assignment at any moment, as opposed to just when we're iterating through all the root tasks in order. I think we could take an approach like what I have here as a stopgap though. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not against adding a new state but I would prefer not having to since this typically introduces a lot of difficult-to-test complexity and opens us up to potential deadlocks. The queued state as you are proposing it is also a new type of state (for the scheduler). As you pointed out, the definition of a proper transition would require us to allow passing of (kw)args to transition functions. From my time with the worker, I perceive these kinds of transitions as a code smell that indicate we're trying to use the transitions for something they are not suited for. This is most apparent in the fetch->flight transition on the worker that links to gather_dep which is the most problematic of all the transitions. I would hate to replicate such a problem hotspot on the scheduler. That's not a deal breaker, though; I might be too cautious.
Preassigning workers feels like a vastly different approach than what I had in mind when we first discussed #6560 I understand the reason why you chose to go down that route but I am concerned the scope creep, particularly considering the rebalancing.
I'm wondering if the rebalancing would actually be stable. I wouldn't be surprised if work stealing would destroy all our efforts trying to be smart here. See also #6600
The complexity introduced here, accounting for a couple of TODOs that still need to be implemented and a reasonably big uncertainty around stealing + rebalancing, is about what I expect is necessary to implement full STA. At least it's not far off. If this kind of complexity (particularly the rebalancing) is actually necessary to make this work and we agree that STA is the ultimate solution, I would prefer investing this time in STA instead of this stopgap solution.
I'm wondering where we can compromise. How harmful would it be if we wouldn't have any rebalancing?
(This obviously can be computed by traversing the graph, but we were worried about the performance of that long ago #4864 (comment)).
If this were something we actually needed we could still do it, e.g. for cases
with few dependents.
I also see some room to collect additional topological information once during
dask.order but that's of course a different beast (similar to
order.graph_metrics
). There are of course limitations to what we can actually
collect in linear time. Just wanting to point out that we should not consider
this door shut.
tasks = [] | ||
recommendations: dict[str, str] = {} | ||
|
||
# Redistribute the tasks between all worker queues. We bubble tasks off the back of the most-queued |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't studied this algorithm but I'm hesitant of introducing this. This would be the third time we're implementing a bin packing algorithm of some sorts (the other two are stealing and rebalance). All of these applications appear similar enough to me that I'm wondering why we cannot reuse anything (different metrics/weights to optimize, different ways to act on the decision of the algorithm, etc.).
If such a function could be factored out, this would allow for a very nice optimization target.
Anyways, this feels like an awful lot of complexity just for the use case of queued up root tasks in scaling situations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that there's probably some repetition. I'm not sure how much that could be factored out. This is a pretty different approach from work stealing AFAIU.
Without this though, I think new workers would be mostly/entirely idle. Since work-stealing doesn't know about queued
tasks, and co-assignment means the candidates for downstream dependencies will only be existing workers, I think it's quite likely new workers wouldn't get used at all. That didn't feel acceptable to me.
Anyway, I wrote this just to profile and experiment with the rebalancing approach on actual clusters. Maintaining good co-assignment while scaling up is an interesting problem (that we don't currently handle). It's not necessarily what would actually get merged.
But I do think we'd need something to rebalance queued tasks during scaling as long as we're pre-assigning all tasks to workers upfront. This is the main reason why I'd like to not do that anymore and make co-assignment logic stateless. It makes handling scaling way, way easier.
What did you have in mind then? Since the current co-assignment implementation only works by pre-assigning workers, there's no way to re-use it without pre-assignment. I would very much like to change the co-assignment logic to something stateless, but I considered redesigning it to be out of scope here. I have an idea for how to do it that I think will overall simplify things; I'd be happy to try it, but first I wanted to try the approach of "keep the current co-assignment implementation", since I thought that's what we'd talked about in #6560.
I think there is very little chance we will withhold tasks on the scheduler without introducing a new state for them. Logically, it's clearly a state that doesn't yet exist for the scheduler. In #6584, I co-opted the rarely-used I personally don't feel worried about adding a new state on the scheduler. The scheduler is less prone to deadlocking and much easier to debug when it does. In the process of writing this PR, I got something wrong and created a deadlock; it took about 5min to find and fix.
Maybe this particular implementation is stopgap, but I think the overall problem of withholding root tasks on the scheduler is not. Even with STA, I think we'd still want to do this. Otherwise, work-stealing will become even more of a nightmare. And as discussed in #6600, using work-stealing to load-balance during scale up doesn't even work well. Holding back a queue of runnable tasks on the scheduler both massively simplifies the consistency problems of work-stealing, and probably allows for a better load-balancing algorithm during scale-up anyway. |
I don't think that with STA we would withhold anything. Work stealing would not work with STA and would very likely need to be replaced with another system |
This is an approach for withholding root tasks from workers while preserving neighboring task co-assignment, as discussed in #6560.
This adds:
queued
state on the scheduler.queued
is likeprocessing
(task is ready; all dependencies are in memory), but rather than being already sent to a worker, we just track the worker we plan to run it on when that worker has availability.HeapSet
tracking queued tasks per worker in priority order (scheduler-side).distributed.scheduler.worker-oversaturation
config option to allow some degree of oversaturation (or even undersaturation) if desired. This will be helpful for benchmarking. It also could be a feature flag: if we set the default toinf
, current behavior would be unchanged.Due to an unintentional implementation detail, this also avoids #6597 in some very specific situations (see b2e7924 message), but is not in any way actually a fix.
I have not tested this much on real clusters yet. But I think it works. If people like @TomNicholas want to try it out, I'd be curious to hear feedback.
I didn't get around to pushing this up on Friday and I'll be off tomorrow, but I'm pushing it up now in case @fjetter wants to take a look.
Closes #6560
pre-commit run --all-files