Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect priority in earliest start time heuristic #5253

Open
mrocklin opened this issue Aug 23, 2021 · 22 comments
Open

Respect priority in earliest start time heuristic #5253

mrocklin opened this issue Aug 23, 2021 · 22 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Aug 23, 2021

Background

Today Dask decides where to place a task based on an "earliest start time" heuristic. For a given task it tries to find the worker on which it will start the soonest. It does this by computing two values:

  • The amount of work currently on the worker (currently measured by occupancy)
  • The amount of time it would take to transfer all dependencies not on the worker to that worker

This is what is computed by the worker_objective function

Problem

However, this is incorrect if the task that we're considering has higher priority than some of the tasks currently running on that worker. In that case looking at the occupancy of the worker is incorrect, because this task gets to cut in line.

But in general while we can count all of the work that is higher priority than this task, that might be somewhat expensive, especially in cases where there are lots of tasks on a worker (which is common). This might be the kind of thing where Cython can save us, but even then I'm not sure.

Proposed solutions

Let's write down a few possible solutions:

  1. Brute force: we can look at all possible tasks in ws.processing and count up the amount of occupancy of tasks with higher priority
  2. Ignore: We could ignore occupancy altogether, and just let work stealing take charge
  3. Middle ground: We could randomly take a few tasks in ws.processing (maybe four?) and see where we stand among those four. If we're worse then all of them then great, we take the full brunt of occupancy. If we're better than all of them then we take 0%. If we're in the middle then we take 50% and so on.
  4. fancy: we maintain some sort of t-digest per worker. This seems extreme, but we would only need to track like three quantile values for this to work well most of the time.
  5. less fancy: maybe we track min/max/mean and blend between them?

3 and 5 seem like the most probable. Each has some concerns:

  1. Sampling: I'm not sure how best to get these items. iter(seq(...)) is ordered these days, and so not a great sample. random.sample is somewhat expensive. %timeit random.sample(list(d), 2) takes 8us for me for a dict with 1000 items.
  2. min/max/mean: Our priorities are hierarchical, and so mean (or any quantile) is a little wonky.

Importance

This is, I suspect, especially important in workloads where we have lots of rootish tasks, which are common. The variance among all of those tasks can easily swamp the signal that tasks should stay where their data is.

@fjetter
Copy link
Member

fjetter commented Sep 7, 2021

A second effect where our occupation heuristic is off is that assigned tasks might share dependencies. Currently we double count this and inflate the occupancy. This is less important for root-ish tasks, though.


Regarding 5. the only thing I can really come up with to deal with the "wonkyness" of our priorities is to keep a sorted list/deque of our priorities. estimating quantiles would then be a bisect which should be relatively cheap but I don't know if this is cheap enough (bisect + insert on deque to maintain sort is ~1us on my machine)

@mrocklin
Copy link
Member Author

mrocklin commented Sep 7, 2021

I'm curious about inserting into a sorted deque. I'm not familiar with this as an option. Can you say more about what you're thinking here.

I did a tiny (/ probably wrong) micro-benchmark with a list, assuming 1000 tasks per worker.

In [1]: L = list(range(1000))

In [2]: import bisect

In [3]: %timeit bisect.bisect(L, 400)
198 ns ± 7.16 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

In [4]: %%timeit
   ...: L2 = L[:400] + [400] + L[400:]  # insert
   ...: L3 = L[:400] + L[401:]          # delete
   ...: 
   ...: 
7.84 µs ± 139 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

I think that 8us is probably too much to spend on this improvement

For comparison with the random solution

In [7]: %timeit random.choices(L, k=5)
978 ns ± 8.76 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

@fjetter
Copy link
Member

fjetter commented Sep 7, 2021

Deques are implemented as a linked list afaik (or at least similar). That gives us the opportunity to have (almost; I don't know the exact implementation) constant time insert performance. Lists have an O(N) insert performance since they are implemented using an array.

This is also a "wrong benchmark" but it's hard to benchmark the insert properly. If we subtract the copy performance very naively we're back to ns performance for the insert. bisect also works on deques.

In [1]: from collections import deque

In [2]: D = deque(range(1_000))

In [3]: %timeit D.copy()
3.61 µs ± 23.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [4]: %%timeit
   ...: D_c = D.copy()
   ...: D_c.insert(123, 123)
   ...:
   ...:
3.68 µs ± 15.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [5]: import bisect

In [6]: %timeit bisect.bisect(D, 123)
196 ns ± 1.84 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

@fjetter
Copy link
Member

fjetter commented Sep 7, 2021

Very naive cumsum occupancy could then look like the following but I don't have a good recipe for removal, therefore I think we occasionally would need to recalculate the cumsum and live with inaccurate numbers for a while

def task_to_processing(ts, ws):
    ix = bisect.bisect(ws._processing_deque, ts.priority)
    max_clusters = 4
    cluster = min(int(ix / len(ws._processing_deque) * max_clusters), max_clusters - 1)
    ws._processing_deque.insert(ix, ts.priority)
    for cluster_to_update in range(cluster, max_clusters):
        ws._occupancies[cluster_to_update] += ts.duration

Mathematically speaking this update is wrong, I believe, but if we occasionally recalculate the cumsum entirely we might be just getting away with it. Maybe even remove the update entirely and just periodically recalc the cumsum

@mrocklin
Copy link
Member Author

mrocklin commented Sep 8, 2021

So given this complexity I started thinking "Gosh, it probably makes sense to just go random here instead" so I did a tiny micro-benchmark

In [1]: import random

In [2]: L = list(range(1000))

In [3]: %timeit random.choices(L, k=5)
1.34 µs ± 63.4 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

Great, one microsecond seems doable.

But then I realized "No, this happens for every valid worker on this task as we compute the worker objective function, and so this could easily add up to 100us".

Now I'm leaning more towards tracking a distribution of priorities, like mean and standard deviation, assuming that there is a nice way to do that in a streaming population where elements can leave the set.

Thoughts?

@fjetter
Copy link
Member

fjetter commented Sep 8, 2021

Now I'm leaning more towards tracking a distribution of priorities, like mean and standard deviation, assuming that there is a nice way to do that in a streaming population where elements can leave the set.

I assume you are talking about some abstract "mean of priority" to estimate whether a task has the potential to cut the line? Or are you talking about mean/stddev of expected starttime? I'm currently a bit confused tbh

assuming that there is a nice way to do that in a streaming population where elements can leave the set.

I'm not sure if we can find anything which is significantly simpler than my proposed sorted deque + bisect but I'm glad to be educated about this topic if I'm wrong

@fjetter
Copy link
Member

fjetter commented Sep 8, 2021

FWIW the mean/quantile of expected starttime is what I called cumsum occupancy above (If that's not clear, I'll need to find a pen and paper :P)

@mrocklin
Copy link
Member Author

mrocklin commented Sep 8, 2021

I was thinking of tracking the mean/std of the priority of the tasks in the processing state.

Then, we would determine start time by assuming that all processing tasks had equal duration, and looking at how the priority of the new task compared to the mean/std, and taking an appropriate fraction of the total occupancy.

I'm pretty comfortable making pretty broad assumptions here. I think that in most cases where this makes a difference in scheduling we'll find that the priority is either at the beginning or at the end of the distribution. Exact placement within the distribution probably isn't useful. My hope is that by making simplifying assumptions like this that we'll be able to simplify and accelerate things considerably.

@fjetter
Copy link
Member

fjetter commented Sep 8, 2021

Then, we would determine start time by assuming that all processing tasks had equal duration, and looking at how the priority of the new task compared to the mean/std, and taking an appropriate fraction of the total occupancy.

Got it. Still, that requires us to define a mean of a tuple of things (tie breakers are not even integers, are they?). Is it possible for us to ignore some/most of the priority hierarchies? Calculating a mean (even with removal) for integers is trivial; I believe it is also possible for stddev but I'll need to do a bit more research on this. Maybe we can calculate a mean + stddev for every hierarchy and ignore all non-integer levels?

If we need to take the full priority tuple, I'll stick to my bisection proposal to calculate quantiles instead.

@mrocklin
Copy link
Member Author

mrocklin commented Sep 8, 2021

Most of the priority stack is non-informative for most comparisons. A mean/std per level would probably be fine. It'll get a little interesting though to do comparisons.

@mrocklin
Copy link
Member Author

mrocklin commented Sep 8, 2021

Interestingly, this problem could also go away if we got very aggressive with speculative task assignment, and if we tried to assign tasks in a depth first manner. The order in which we would assign tasks to workers would more closely match their current priority, and so the current assumption that all tasks on a worker are higher priority would more likely be true.

This fails the first time we're unable to speculatively assign something though (which, in a non-trivial situation will probably happen quickly).

I don't currently think we should go down the path of STA as a cure for all ailments, but I thought I'd bring it up here for completeness.

@gjoseph92
Copy link
Collaborator

Just thinking about a couple properties we might be able to take advantage of:

  1. Tasks should be assigned to workers in roughly priority order, right? So just appending priorities to a deque as they get assigned would already be somewhat sorted.
  2. Priorities are discrete and unique, so once we have both priority N and N+1 on a worker, we know that no task can ever be assigned which would fall between them in priority. This, combined with point 1, might allow us to efficiently keep a much smaller list of priorities per worker, since we could skip over any runs of consecutive priorities from the perspective of figuring out where a new priority falls within the list.

Basically maybe we could maintain a run-length encoding (kinda) of priorities on each worker? And if the latest task being assigned is 1 greater than the previous priority (common case, for rootish tasks at least, which should make up the vast majority of tasks in ws.processing), then this would be O(1).

Still, that requires us to define a mean of a tuple of things

Obviously, assessing "1 greater than the previous" would require this as well.

@gjoseph92
Copy link
Collaborator

In fact I think we can narrow this more. Doesn't this problem only really happen with downstream tasks that need to beat out root-ish tasks? I believe root tasks are the only case where workers can get thousands/large numbers of tasks assigned. Once all root tasks are done, shouldn't len(ws.processing) be O(ws.nthreads), because new tasks can't get assigned without old ones completing a dependency of that task (besides work stealing / poor decide_worker assignment)? And furthermore, once we're in this state, I'd think the cases where a new task is runnable and higher-priority than other tasks that are also runnable on the worker are relatively rare, and possibly not as important from a scheduling perspective.

So perhaps we could track the occupancy from root-ish tasks separately for each worker, and if the task to assign is not root-ish, then subtract that occupancy from the total worker occupancy? In doing so, we're assuming that non-root tasks are higher priority than all root tasks, which is approximately true, though false for those tasks' dependencies (though acting like it's true from the perspective of worker assignment still feels correct).

I think there's also a danger in a perfect "Respect priority in earliest start time heuristic", which is that it would tend to assign downstream tasks to the wrong workers. Consider the graph

2 4    6 8
| |    | |
1 3    5 7

 A      B
^ ideal worker

with 2 workers, each with 2 threads. 1,3 go to A, 5,7 go to B. Worker B completes all its tasks first, and gets 6 and 8 to run. Then worker A finishes, and the scheduler has to assign 1 and 2. It gives 1 to worker A. But it gives 2 to worker B, since it thinks 2 can cut in line in front of 6 and 8, which looks faster than waiting for 2 to finish. (This would happen if the estimated runtime of the downstream taskgroup was higher than the transfer time of the root task.)

Whereas if we just take the "ignore all root-ish tasks when estimating downstream task start time" approach, it basically levels the playing field across all workers, and the data-transfer metric should take over (I think).

@mrocklin
Copy link
Member Author

mrocklin commented Sep 8, 2021

So perhaps we could track the occupancy from root-ish tasks separately for each worker, and if the task to assign is not root-ish, then subtract that occupancy from the total worker occupancy?

Hrm, that does indeed seem simple and practical. I have a slight hesitation to encode rootish-ness more deeply throughout the codebase but 🤷‍♂️ it worked pretty well last time :) If we can't think of general purpose cases where this is important then I like this idea more than anything that has been proposed before.

@mrocklin
Copy link
Member Author

mrocklin commented Sep 8, 2021

As a lesser alternative I'll simplify my mean/std thought and suggest just min/max, and assume uniform distribution between those two, which might be simpler. I'm not proposing this above rootish handling. I'm just mentioning this here for completeness.

@gjoseph92
Copy link
Collaborator

I have a slight hesitation to encode rootish-ness more deeply throughout the codebase but 🤷‍♂️ it worked pretty well last time

They are a special case though, in a way. Scheduling is a recursive process; root tasks are the base case: they may need to follow different logic in order to bootstrap that recursive process properly. A lot of the properties you can assume about downstream tasks don't apply to root tasks, and visa versa. In a constant-time scheduling system like this that's so based on heuristics, it makes sense that root tasks might be need to be special-cased—they have different properties, so they need different heuristics.

Though I don't like it much either :)

suggest just min/max, and assume uniform distribution between those two, which might be simpler

Assuming root tasks are our problem, this would probably work too, and might feel a little less weird. (Though we would have to compare priority components as ints.)

Should I try either of these? Probably not too hard to put together.

@mrocklin
Copy link
Member Author

mrocklin commented Sep 8, 2021

Personally I have slightly more confidence in the rootish task solution. Thoughts?

Should I try either of these? Probably not too hard to put together.

They both seem doable and cheap to me, so yeah, it seems like the time to try one of them and see how it goes. My expectation is that this should take about a day or so. Would you agree?

@gjoseph92
Copy link
Collaborator

I agree. Seems like a good thing to try next week.

@gjoseph92
Copy link
Collaborator

Spent some time on this today.

  • Indexing a deque is O(n) at the middle. So bisect on a deque can be O(n log n) (see https://stackoverflow.com/a/60405467). So I don't think you can track the min/max cheaply, because in order to find the new min/max when the current min/max task completes, you have to have all the previous priorities. We thought @fjetter's deque approach would give us an O(n log n) way to do this (n task insertions, at log n bisect per task and O(1) insertion), but it's actually O(n^2) = O(n * (n logn + n)) (n task insertions, at n logn bisect and n insertion). It would probably be faster to just re-scan all of ws.processing every time the min/max has to change (O(n^2) but without any coefficient, and lower when tasks don't complete in ascending priority order). If you used a list instead, it's O(n^2) = O(n * (logn + n)—better, but still worse than re-scanning ws.processing.

  • However tracking the mean and stddev of priority (as an int) would be easy and O(1) per task. Then we have to assume a normal distribution of priorities (very untrue) to get to a percentile (we might need scipy.stats.norm.ppf to do that though?), which we could multiply by total occupancy. But since priorities are also hierarchical, mixing the distributions of values at each level might start to become meaningless. When there are multiple generations or user priorities in play on a worker, how do you turn that into a fraction of the total occupancy? You could do it hierarchically too (multiply by the percentile of the first level, then multiple that range by the percentile of the second level, etc.) but this starts to feel very wacky.

    We'd probably be better off with random sampling than this. Or what if we just assumed ws.processing was roughly ordered and picked quantiles from sorted([processing_list[(len(processing_list) // 4) * i] for i in range(4)])?

  • I briefly tried out a run-length-ish encoding approach like described here. It ended up being a little more complicated than I thought so I've paused finishing it until we discuss more. But it should be best-case (all tasks assigned to each worker in contiguous priority order) O(1) per task, and worst-case (tasks assigned to each worker in reverse priority order and with gaps between every priority, which is ridiculous and unlikely) O(n log n) per task. Specifically, the complexity is dependent on the number of priority gaps on a worker, plus how many priority gaps from the end each new gap is. I don't yet know how to reason about how common priority gaps are in task assignment. Interestingly I think the goal of STA in a sense is to make them less common.

The "track root-ish occupancy separately" approach should still work, but it also will not make sense once we have STA. In general, any efficient/clever method of estimating where tasks fall in the priority queue is probably going to depend on assumptions about how tasks are assigned. STA will significantly change those assumptions. It makes me a little hesitant to work on this more besides just tracking min/max with a scan of ws.processing until we know what the STA policies will look like.

@fjetter
Copy link
Member

fjetter commented Sep 15, 2021

Indexing a deque is O(n) at the middle.

There is no free lunch, after all :( Thanks for doing the research

However tracking the mean and stddev of priority (as an int) would be easy and O(1) per task. Then we have to assume a normal distribution of priorities (very untrue) to get to a percentile

The algorithm you linked is also something I am aware of. I'm not too concerned with the normal distribution since as was pointed out already, we're not looking for an exact solution. If we can somehow determine a "likely below 10/20/30/...%" we already won.


I'm back to the min/max/mean approach as suggested in #5253 (comment)
the uniform distribution is probably not even far off considering that our priorities were initially strictly monotonically increasing integers. either way, I don't think mathematical correctness is required here. Even a wrong estimation should not be incredibly harmful after all.

As a proxy for min/max (with removal) we could track "last executed/last finished/currently executing priority"


Do we understand the value proposition well enough to construct an example to test this? I would suggest to use a simple, potentially poorly performing algorithm first (e.g. just calculate quantiles/min/max to get started) to verify the proposal. IMHO, If we cannot construct an example where this will help, it's hard to justify any kind of additional logic. A reproducing example would also help with testing STA down the line

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Sep 16, 2021
This doesn't make the test pass, because of exactly what I theorized in dask#5253 (comment): once one of the proper downstream tasks has been assigned to a worker, the next tasks get sent somewhere else, because they can cut to position 0 in the line elsewhere instead of position 1 on that worker. We'd need the communication cost to be higher in order to beat this.

Also this metric is still slightly off, because it's acting as though workers are single-threaded. In reality position 0 and 1 in the queue could maybe both run at (nearly) the same time. Also, using average `ws.occupancy` means if the root tasks are slower than the downstream tasks, we're making `stack_time` too high (when i==1, `stack_time` gets counted as the runtime for a `slowinc` task, not the one fast `inc` task that's actually ahead in the priority queue).
@gjoseph92
Copy link
Collaborator

I played around with a test for this, and the simplest possible implementation of a solution (maintain an ordered list of all the priorities on each worker): https://github.com/dask/distributed/compare/main...gjoseph92:worker-objective-priority/basic?expand=1

Exactly what I'd been worried about in #5253 (comment) happened: because the data transfer costs are too low (xref #5324), downstream tasks got assigned to the wrong workers, because transferring + cutting in line on a different worker seemed cheaper than waiting in line on the right worker. I also noted some other problems in this commit message: a3be0c1.

It concerns me that the accurate implementation of the heuristic we're talking about causes incorrect behavior. It's possible that a less-accurate estimation would actually lead to better behavior, but that seems like a brittle and hard-to-reason-about thing to rely on.

A more realistic estimate of transfer time would help, but it's still just thresholds competing with each other. Because worker_objective might not actually be wrong: maybe it is faster to transfer, cut in line, and start this task on a less-occupied worker. That choice just isn't considering the opportunity cost of preventing some other, better task from running on that worker instead in the near future. With O(1) scheduling we can't actually determine that. But maybe we should be a little less greedy to get each particular task executing as soon as possible, and let graph structure have a little more weight in the decision?

I'm especially excited about what I proposed at the bottom of #5324. If there's a worker available which already holds all (or nearly all?) of the necessary dependencies, choosing a different worker should be a special event that we're pretty confident will pay off, not just potentially saving a couple ms.

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Sep 16, 2021
If a dependency is already on every worker—or will end up on every worker regardless, because many things depend on it—we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering every worker as a candidate, which is 1) slow and 2) often leads to poor choices (xref dask#5253, dask#5324).

Just like with root-ish tasks, this is particularly important at the beginning. Say we have a bunch of tasks `x, 0`..`x, 10` that each depend on `root, 0`..`root, 10` respectively, but every `x` also depends on one task called `everywhere`. If `x, 0` is ready first, but `root, 0` and `everywhere` live on different workers, it appears as though we have a legitimate choice to make: do we schedule near `root, 0`, or near `everywhere`? But if we choose to go closer to `everywhere`, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say that `everywhere` worker is about to complete `root, 6`. Now `x, 6` may run on yet another worker (because `x, 0` is already running where it should have gone). This can cascade through all the `x`s, until we've transferred most `root` tasks to different workers (on top of `everywhere`, which we have to transfer everywhere no matter what).

The principle of this is the same as dask#4967: be more forward-looking in worker assignment and accept a little short-term slowness to ensure that downstream tasks have to transfer less data.

This PR is a binary choice, but I think we could actually generalize to some weight in `worker_objective` like: the more dependents or replicas a task has, the less weight we should give to the workers that hold it. I wonder if, barring significant data transfer inbalance, having stronger affinity for the more "rare" keys will tend to lead to better placement.
@gjoseph92
Copy link
Collaborator

I also hadn't thought about how this worker_objective problem only happens when a task's dependencies are on multiple workers. (Because decide_worker initially narrows down candidates to only workers holding deps.) It probably doesn't matter so much when there are just a couple workers to choose from, and none of them already have all the data. The reason this is so bad with the shuffle service is because the tasks all share one common dependency, which ends up getting replicated to every worker. So since every worker is a candidate (probably shouldn't be), the error in the worker_objective metric has a chance to mess things up.

The graph I'm testing with, to illustrate:

bad-assignment

Based on this, I realized we can sidestep this problem, for the shuffle-service case at least, with #5325

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants