Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore widely-shared dependencies in decide_worker #5325

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Sep 16, 2021

If a dependency is already on every worker—or will end up on every worker regardless, because many things depend on it—we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering every worker as a candidate, which is 1) slow and 2) often leads to poor choices (xref #5253, #5324).

The principle of this is the same as root-ish tasks in #4967: be more forward-looking in worker assignment and accept a little short-term slowness to ensure that downstream tasks have to transfer less data.

Say we have a bunch of tasks x, 0..x, 10 that each depend on root, 0..root, 10 respectively, but every x also depends on one task called everywhere. If x, 0 is ready first, but root, 0 and everywhere live on different workers, it appears as though we have a legitimate choice to make: do we schedule near root, 0, or near everywhere? But if we choose to go closer to everywhere, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say that everywhere worker is about to complete root, 6. Now x, 6 may run on yet another worker (because x, 0 is already running where it should have gone). This can cascade through all the xs, until we've transferred most root tasks to different workers (on top of everywhere, which we have to transfer everywhere no matter what).

This PR is a binary choice, but I think we could actually generalize to some weight in worker_objective like: the more dependents or replicas a task has, the less weight we should give to the workers that hold it. I wonder if, barring significant data transfer inbalance, having stronger affinity for the more "rare" keys will tend to lead to better placement. UPDATE: see #5326 for this.

  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

If a dependency is already on every worker—or will end up on every worker regardless, because many things depend on it—we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering every worker as a candidate, which is 1) slow and 2) often leads to poor choices (xref dask#5253, dask#5324).

Just like with root-ish tasks, this is particularly important at the beginning. Say we have a bunch of tasks `x, 0`..`x, 10` that each depend on `root, 0`..`root, 10` respectively, but every `x` also depends on one task called `everywhere`. If `x, 0` is ready first, but `root, 0` and `everywhere` live on different workers, it appears as though we have a legitimate choice to make: do we schedule near `root, 0`, or near `everywhere`? But if we choose to go closer to `everywhere`, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say that `everywhere` worker is about to complete `root, 6`. Now `x, 6` may run on yet another worker (because `x, 0` is already running where it should have gone). This can cascade through all the `x`s, until we've transferred most `root` tasks to different workers (on top of `everywhere`, which we have to transfer everywhere no matter what).

The principle of this is the same as dask#4967: be more forward-looking in worker assignment and accept a little short-term slowness to ensure that downstream tasks have to transfer less data.

This PR is a binary choice, but I think we could actually generalize to some weight in `worker_objective` like: the more dependents or replicas a task has, the less weight we should give to the workers that hold it. I wonder if, barring significant data transfer inbalance, having stronger affinity for the more "rare" keys will tend to lead to better placement.
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of weighting with the number of replicas. If possible I would stick with the binary choice for now, though. The binary choice could maybe be a bit relaxed, e.g. if the task is already on 80% (or whatever value) of workers, this heuristic still makes sense to me

Comment on lines 271 to 278
for k in keys.values():
assert k["root_keys"] == k["dep_keys"]

for worker in workers:
log = worker.incoming_transfer_log
if log:
assert len(log) == 1
assert list(log[0]["keys"]) == ["everywhere"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this test. Regardless of how to achieve this result, I think it makes a lot of sense for this computation pattern to not transfer any other keys 👍

)
async def test_decide_worker_common_dep_ignored(client, s, *workers):
roots = [
delayed(slowinc)(1, 0.1 / (i + 1), dask_key_name=f"root-{i}") for i in range(16)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for choosing delayed over futures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. It made it easy to visualize the graph though. Would we prefer futures?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Typically I would prefer futures since if something goes wrong and we'd need to debug, futures contain fewer layers. However, the visualization argument is strong. I would recommend leaving a comment in-code to avoid overly eager refactoring down the line

distributed/tests/test_scheduler.py Show resolved Hide resolved
for dts in deps
for wws in dts._who_has
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._who_has), len(dts._dependents))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why who_has is part of this check but not necessarily why dependents is part of this. Having many dependents does not tell us much about where the keys will end up, does it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, dependents is speculative/heuristic. If we don't include that, the test fails and this PR basically does nothing, except if you've already broadcast a key to every worker before submitting some other tasks.

When the first root task completes and a dep task is getting assigned, the everywhere task is still only on one worker. It's actually the process of running the dep tasks that eventually copies everywhere everywhere. So without len(dts._dependents), this check won't kick in until we've already made n_workers bad decisions.

The idea is that if a task has lots of dependents, it's quite likely those dependents will run on many different workers. We're trying to guess when this situation will happen before it happens.

There are times ways this heuristic is wrong:

  • The many dependents don't have any other dependencies (so running them all on the same worker might actually be a good idea)
    • Though in some cases, they might get identified as root-ish tasks and spread across workers anyway
  • The many dependents have worker/resource restrictions

Here is a graph which currently behaves well, but under this PR will allow the deps to be assigned to any worker on a 4-worker cluster:
fantree
Because each root has >4 dependents, the location of the root is ignored when assigning the dependents and they get scattered all over.

I think this heuristic needs more thought. I have another idea I'm going to try out.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other idea (amortizing transfer cost by number of dependencies #5326) didn't work (though it may still be a good idea). The above case works, but then the original test in this PR still fails, because transfer cost is just low regardless, and occupancy is all that really matters.

Interestingly the above case would probably be handled by STA. I still feel like we should tackle STA first, since so many other things will change that it's hard to think about heuristics right now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I got this case working with e1fd58b, which I don't love, but maybe is okay enough?

@fjetter
Copy link
Member

fjetter commented Sep 16, 2021

There is a genuine test failure in test_dont_steal_fast_tasks_compute_time. I believe this test assumes that the initial assignment of tasks is different.

for wws in dts._who_has
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._who_has), len(dts._dependents))
< len(valid_workers if valid_workers is not None else all_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a / 2 here? "almost everywhere" might be enough of a condition here.

@mrocklin
Copy link
Member

This will help with the initial placement, but will work stealing still move things around suboptimally? Do we also need something like #5243 ?

Copy link
Collaborator Author

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will work stealing still move things around suboptimally? Do we also need something like #5243 ?

Yes we still need that too

)
async def test_decide_worker_common_dep_ignored(client, s, *workers):
roots = [
delayed(slowinc)(1, 0.1 / (i + 1), dask_key_name=f"root-{i}") for i in range(16)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. It made it easy to visualize the graph though. Would we prefer futures?

distributed/tests/test_scheduler.py Show resolved Hide resolved
for dts in deps
for wws in dts._who_has
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._who_has), len(dts._dependents))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, dependents is speculative/heuristic. If we don't include that, the test fails and this PR basically does nothing, except if you've already broadcast a key to every worker before submitting some other tasks.

When the first root task completes and a dep task is getting assigned, the everywhere task is still only on one worker. It's actually the process of running the dep tasks that eventually copies everywhere everywhere. So without len(dts._dependents), this check won't kick in until we've already made n_workers bad decisions.

The idea is that if a task has lots of dependents, it's quite likely those dependents will run on many different workers. We're trying to guess when this situation will happen before it happens.

There are times ways this heuristic is wrong:

  • The many dependents don't have any other dependencies (so running them all on the same worker might actually be a good idea)
    • Though in some cases, they might get identified as root-ish tasks and spread across workers anyway
  • The many dependents have worker/resource restrictions

Here is a graph which currently behaves well, but under this PR will allow the deps to be assigned to any worker on a 4-worker cluster:
fantree
Because each root has >4 dependents, the location of the root is ignored when assigning the dependents and they get scattered all over.

I think this heuristic needs more thought. I have another idea I'm going to try out.

This addresses the issue in dask#5325 (comment). It feels a little hacky since it can still be wrong (what if there are multiple root groups that have large subtrees?). We're trying to infer global graph structure (how mnay sibling tasks are there) using TaskGroups, which don't necessarily reflect graph structure.

It's also hard to explain the intuition for why this is right-ish (besides "well we need the `len(dts._dependents)` number to be smaller if it has siblings".)
The goal is to identify a specific situation: fan-outs where the group is larger than the cluster, but the dependencies are (much) smaller than the cluster. When this is the case, scheduling near the dependencies is pointless, since you know those workers will get filled up and the dependencies will have to get copied everywhere anyway. So you want to instead schedule in a methodical order which ends up keeping neighbors together.

But the key is really crossing that boundary of cluster size. Hence these changes:
* `total_nthreads * 2` -> `total_nthreads`: so long as every thread will be saturated by this group, we know every worker will need all its dependencies. The 2x requirement is too restrictive.
* Switch magic 5 to `min(5, len(self.workers))`: if you have 3 workers, and your group has 3 dependencies, you actually _should_ try to schedule near those dependencies. Then each worker only needs 1 dependency, instead of copying all 3 dependencies to all 3 workers. If you had 20 workers, duplicating the dependencies would be unavoidable (without leaving most of the workers idle). But here, it is avoidable while maintaining parallelism, so avoid it.

  I'm actually wondering if we should get rid of magic 5 entirely, and just use a cluster-size metric. Like `len(self.workers) / 4` or something. If you have 1,000 workers, and a multi-thousand group has 20 dependencies, maybe you do want to copy those 20 dependencies to all workers up front. But if you only had 30 workers, you'd be much better off considering locality.
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Sep 17, 2021
As discussed in dask#5325. The idea is that if a key we need has many dependents, we should amortize the cost of transferring it to a new worker, since those other dependencies could then run on the new worker more cheaply. "We'll probably have to move this at some point anyway, might as well do it now."

This isn't actually intended to encourage transfers though. It's more meant to discourage transferring keys that could have just stayed in one place. The goal is that if A and B are on different workers, and we're the only task that will ever need A, but plenty of other tasks will need B, we should schedule alongside A even if B is a bit larger to move.

But this is all a theory and needs some tests.
Comment on lines +301 to +303
........ ........ ........ ........
\\\\//// \\\\//// \\\\//// \\\\////
a b c d
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't put much thought into this question, therefore a short answer is more than enough.

Would your logic be impacted if the subtrees flow together again, i.e. they have a common dependent or set of dependents as in a tree reduction.

If the answer is "no, this logic doesn't go that deep into the graph" (which is what I'm currently guessing), that's fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't go any further into the graph.

wws
for dts in deps
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._dependents) / len(dts._group), len(dts._who_has))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentionally the number of dependents of a task over the length of a group. Wouldn't the behaviour here be drastically different if I have 2 subtrees vs 200 subtrees even though the topology is otherwise identical?

Reusing your ascii art

    ........  ........ 
    \\\\////  \\\\////
       a         b

and

    ........  ........  ........  ........    ........  ........  ........  ........    ........  ........  ........  ........
    \\\\////  \\\\////  \\\\////  \\\\////    \\\\////  \\\\////  \\\\////  \\\\////    \\\\////  \\\\////  \\\\////  \\\\////
       a         b         c         d       a1         b1         c1         d1       a2         b2         c2         d2

should behave equally

Copy link
Member

@fjetter fjetter Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my intuition about groups might be off. in In my mind, this graph contains two groups. All ... are a group and all {a*, b*, c*} are a group. The length of that group is then the number of tasks in a group.

IF that is correct, the len(dependents)/len(group) would evaluate to

1st graph: #dependents(8) / len(group)(2) == 4
2nd graph: #dependents(8) / len(group)(12) == 2/3

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent about 1 minute coming up with this metric and then 1 hour trying to explain or figure out if it actually made sense (beyond that it works in this case). It may not. But here's what I'm thinking (this does not answer the question at all, just trying to communicate a lot of thoughts):

Your intuition is correct. The behavior would and should be different if you have 2 subtrees vs 200—depending on the number of workers. Imagine we have 5 workers:

  • In the first graph we have fewer sub-trees (groups) than workers. (This actually should fall in the root-ish task case. More on this later.) So unless we're going to leave 3 workers idle, copying a and b to more workers will improve parallelism. So we don't care where a and b live while scheduling ... because we expect a and b to get transferred around anyway.
  • In the second graph we have more sub-trees than workers. So transferring the a*,b*,c*s won't get things done any faster[^1]—the cluster is already saturated. Therefore, we do want to consider where the root tasks live when scheduling ..., because we can avoid them getting copied around unnecessarily.

So even though local topology is identical, we should schedule these differently depending on cluster size. If we had 100 workers, we'd schedule both graphs the same way (ignore the root task locations, because copying to new workers will give us more parallelism). And if we had 2 workers we'd also schedule both graphs the same way (consider root task locations because every worker already has a root task).

But I actually think all of these cases should be handled by the root task logic. I wrote more notes in e5175ce, but I think we should think of that as "fan-out logic" instead of "root-ish task logic", since it's really about this special case where we're crossing the boundary of cluster size. When we fan out from having fewer keys than workers to more keys than workers, the placement of those few pieces of input data is a poor guide for where the many pieces of output data should go. We have to copy the input data around anyway, so it's a good opportunity to reorganize things—that's what the current root-ish task logic is all about.

This PR is really about when some dependencies make a task seem root-ish, and others don't. Imagine overlaying the graphs

. . . . . . . .
| | | | | | | |
* * * * * * * *

and

. . . . . . . .
\ \ \ \ / / / /
       a    

so you have

. . . . . . . .   . . . . . . . .
|\|\|\|\|/|/|/|   |\|\|\|\|/|/|/|
| | | | a | | |   | | | | b | | |
* * * * * * * *   * * * * * * * *

The .s should never be considered root-ish tasks now—as linear chains, they should definitely take into account the location of the * tasks when scheduling. But if we have 5 workers, every worker will have a * task on it, but only 2 workers will have an a or b. In scheduling the first few .s, there's a tug-of-war between the a and the *—which do we want to schedule near? We want a way to disregard the a.

But—unlike in the simpler case above where we just have a tree—if we have 12 of these subtrees and only 5 workers, we still want to disregard where the as are, because they're much much less important than where the *s are.

And therein is the overall problem with this PR. In one case, we need to pay attention to the as (when they're the only dependency); in other cases, we want to ignore them (when there are other dependencies with few dependents and replicas). Dividing by the group length was an effective but kind of nonsensical way to do this. I think there are probably other better ways that are more logical, but might involve more change.

I'll note that I think amortizing transfer cost (#5326) is a more sensible way to do this. That actually makes sense. The problem is, it doesn't do much here, because transfer costs (even adding the 10ms penalty) are minuscule relative to occupancy. Weighing those two things, occupancy is usually far more significant.

So I also tried a couple ideas to counteract that. What if we could amortize occupancy in some way too? Basically to account for the opportunity cost of taking a spot on a worker that lots of other tasks could possibly run on, that might be better suited.

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 1c810f4a..4c8d97b9 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -3418,15 +3418,27 @@ class SchedulerState:
         nbytes: Py_ssize_t
         comm_bytes: double = 0
         xfers: Py_ssize_t = 0
+        alternative_tasks: Py_ssize_t = 0
         for dts in ts._dependencies:
-            if ws not in dts._who_has:
+            if ws in dts._who_has:
+                alternative_tasks += len(dts._waiters) - 1
+            else:
                 nbytes = dts.get_nbytes()
                 # amortize transfer cost over all waiters
                 comm_bytes += nbytes / len(dts._waiters)
                 xfers += 1
 
+        # If there are many other tasks that could run on this worker,
+        # consider how much we are displacing a better task that could run soon
+        # (unless we are that "better task").
+        # TODO wrong duration kinda
+        opportunity_cost: double = (
+            (alternative_tasks * self.get_task_duration(ts) / 2) if xfers else 0.0
+        )
         stack_time: double = ws._occupancy / ws._nthreads
-        start_time: double = stack_time + comm_bytes / self._bandwidth + xfers * 0.01
+        start_time: double = (
+            stack_time + opportunity_cost + comm_bytes / self._bandwidth + xfers * 0.01
+        )
 
         if ts._actor:
             return (len(ws._actors), start_time, ws._nbytes)

This didn't help much. It worked at the beginning, but once the a task was in lots of places, the opportunity_cost showed up on every worker.

I also tried something similar with stack_time *= alternative_tasks / len(ws.processing) to let tasks that could run in fewer other places cut in line more—this is basically another form of #5253. Didn't work either.

The more I've worked on this, the more I find myself fighting against occupancy. It's hard to find enough incentives or penalties to make a worker with 0.1s occupancy ever beat an idle one.

For fun, I ignored occupancy and just removed stack_time from worker_objective entirely. With only that, plus #5326 and gjoseph92@e5175ce, it passes all the tests in this PR.

As I think about it more, I feel more skeptical of occupancy as a metric to use in the worker_objective. The greedy goal of "get this task running as soon as possible" is sometimes counterproductive to running the whole graph smoothly, and therefore quickly.

Especially as we're thinking about STA, it feels maybe we should be more simple, dumb, and optimistic in initial task placement. Just follow graph structure. Then the load-balancing decisions to minimize occupancy, like worker_objective is trying to do eagerly right now, are more the responsibility of stealing. We also may be able to make these decisions better once things are already running and we've collected more information (runtime, memory, etc.).

None of this is really an answer, these are just all the things I've thought about trying to work on this.

[^1] well some workers will have 3 a*,b*,c* and others will only have 2. So once the ones with only 2 finish all their ...s they could maybe take some load from the ones with 3—but that's a work-stealing question.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your thorough problem description. I will try to address some of the points above but it will not be an exhaustive review.

So even though local topology is identical, we should schedule these differently depending on cluster size. If we had 100 workers, we'd schedule both graphs the same way (ignore the root task locations, because copying to new workers will give us more parallelism). And if we had 2 workers we'd also schedule both graphs the same way (consider root task locations because every worker already has a root task).

I agree. I want to point out, though, that parallelism is mostly not one of our concerns. In my mind, memory efficient scheduling will trump parallelism if we ever needed to make that call. However, I think the case where we are not parallelizing enough is happening less frequently.

In particular, there is also the case where transfer costs would kill us and just executing stuff sequentially on few workers will be faster. This case can only be handled by including runtime metrics, of course.

But I actually think all of these cases should be handled by the root task logic. I wrote more notes in e5175ce, but I think we should think of that as "fan-out logic" instead of "root-ish task logic"

I would love it if we had a relatively high level description documented somewhere about our strategic goals of scheduling. That would also include a description of what we mean when talking about root-ish task scheduling since I'm still struggling to classify those task. I imagine something similar to what we do with dask.order, see https://github.com/dask/dask/blob/9fc5777f3d83f1084360adf982da301ed4afe13b/dask/order.py#L1-L77

I believe https://distributed.dask.org/en/latest/journey.html#step-3-select-a-worker goes into this direction but is not thorough enough and likely out of date.

I'll note that I think amortizing transfer cost (#5326) is a more sensible way to do this.

I agree that that some kind of amortization or weighing might be a good approach to solve this. I'm still not convinced where to apply weights and what the correct weights are, though.

I also see another component factoring in, namely if we knew which dependencies are intended to be fetched on a worker, that could influence our decision as well. Maybe, instead of tracking combined occupancy, we should track "comm occupancy" and try to minimize this instead of total occupancy? Can we detect a "too small parallelism" scenario and switch metrics based on that?

To make things even more messy, I also want to point out that I believe we're double counting comm cost which might blow occupancy up unnecessarily, see

for tts in s:
if tts._processing_on is not None:
wws = tts._processing_on
comm: double = self.get_comm_cost(tts, wws)
old: double = wws._processing[tts]
new: double = avg_duration + comm
diff: double = new - old
wws._processing[tts] = new
wws._occupancy += diff
self._total_occupancy += diff
I might be wrong, though.

Basically to account for the opportunity cost of taking a spot on a worker that lots of other tasks could possibly run on, that

I don't dislike opportunity costs but at this point in time it is much too unclear how that cost factor would even look like. I would suggest to delay this for now since I don't think this will be strong enough to counteract the occupancy and will make this even less comprehensible.
If we had a good handle on what opportunity cost would actually be that might be a different story.

As I think about it more, I feel more skeptical of occupancy as a metric to use in the worker_objective. The greedy goal of "get this task running as soon as possible" is sometimes counterproductive to running the whole graph smoothly, and therefore quickly.

I'm open to reconsider this metric for initial task placement although this feels like a long term thing since it may be a highly disruptive change. FWIW we will likely need to reconsider the entire worker_objective for STA since it is based 100% on runtime information.

You mentioned that it would work good enough if we left out stack_time from the worker_objective. This sounds quite similar to what is proposed in #5253 since disabling it entirely without replacement would probably introduce many other problems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to point out, though, that parallelism is mostly not one of our concerns.

Totally agree. But taken to the extreme (never consider parallelism), then we'd do things like assign every singe task to the same one worker in a 100-worker cluster, leaving 99 others idle, because that worker holds one root dependency. I know you're not proposing that at all—just saying that since we do sometimes consider parallelism, we have to have some mechanism to decide where that "sometimes" line is. Agreed that having documentation of this would help.

As you said, it's hard to do without runtime metrics. I think what I'm proposing is that, when we only have graph metrics, a reasonable threshold for adding parallelism is when we have more tasks to schedule than total threads, yet purely following dependencies would leave some workers totally idle.

Maybe, instead of tracking combined occupancy, we should track "comm occupancy" and try to minimize this instead of total occupancy?

I've often wanted this metric. (Adds a bit of communication for the workers to tell the scheduler every time they fetch a key from a peer though, right?) And I think it's a good thing to minimize, though what we really want to minimize is memory. However, tracking this data would allow us to more preemptively avoid transferring to a worker which already has a lot of (bytes of) pending incoming transfers.

Can we detect a "too small parallelism" scenario and switch metrics based on that?

Probably not? Because tasks will prefer to schedule on the worker with no transfers, that worker's comm occupancy won't ever go up, so we'll keep scheduling there. The graph-structural approach would hopefully still avoid this though?

You mentioned that it would work good enough if we left out stack_time from the worker_objective. This sounds quite similar to what is proposed in #5253 since disabling it entirely without replacement would probably introduce many other problems.

With #5253 we do still consider occupancy, just more accurately. But as I showed in #5253 (comment) that causes a different sort of bad behavior. That's a good example of why I think earliest start time isn't quite the right metric. With #5253 we correctly identify that we can actually start the task sooner by transferring to a different worker and cutting in the priority line, because the transfer is so cheap. We do successfully minimize task start time. The problem is that it disrupts most of the subsequent tasks by shoving in line like that. The short-term gains come with much bigger long-term costs.

FWIW we will likely need to reconsider the entire worker_objective for STA since it is based 100% on runtime information.

Agreed. STA still feels like the place to start. I'm still intrigued by the idea of eagerly and optimistically assigning tasks by graph structure (since that's probably reasonable in the common case) and then load-balancing with runtime metrics.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter tried slightly rewording this metric in d0f0955 and it at least vaguely makes sense, though I'm still not sure we should be doing this PR at all vs just trying STA.

Here we're assuming that all tasks in the group have a similar number of dependents / degree of fan-out. Then if this dependency is widely used enough to fill the cluster, and there are not nearly enough like it to fill the cluster, then we should be okay with copying it around to enable parallelism (ignoring it, since other dependencies of the task are likely more important).
Comment on lines +821 to +833
name: str
prefix: TaskPrefix | None
states: dict[str, int]
dependencies: set[TaskGroup]
nbytes_total: int
duration: float
types: set[str]
start: float
stop: float
all_durations: defaultdict[str, float]
last_worker: WorkerState | None
last_worker_tasks_left: int

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more thoroughly dealt with in #6253

@@ -1955,7 +1967,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState: # -> WorkerState | None
type(ws),
ws,
)
assert ws.address in self.workers
if ws:
assert ws.address in self.workers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also in #6253

Comment on lines +7529 to +7531
and not (
len(dts.dependents) >= n_workers
and len(dts.group) < n_workers // 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and not (
len(dts.dependents) >= n_workers
and len(dts.group) < n_workers // 2
and (
len(dts.dependents) < n_workers
or len(dts.group) >= n_workers // 2

@github-actions
Copy link
Contributor

github-actions bot commented May 5, 2022

Unit Test Results

       16 files  +       4         16 suites  +4   7h 53m 35s ⏱️ + 2h 34m 54s
  2 759 tests +       3    2 676 ✔️ +     13       79 💤  -   13    4 +  3 
22 034 runs  +5 524  20 999 ✔️ +5 285  1 024 💤 +229  11 +10 

For more details on these failures, see this check.

Results for commit fd7e790. ± Comparison against base commit 7bd6442.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants