Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore widely-shared dependencies in decide_worker #5325

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7982,7 +7982,14 @@ def decide_worker(
if ts._actor:
candidates = set(all_workers)
else:
candidates = {wws for dts in deps for wws in dts._who_has}
candidates = {
wws
for dts in deps
for wws in dts._who_has
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._who_has), len(dts._dependents))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why who_has is part of this check but not necessarily why dependents is part of this. Having many dependents does not tell us much about where the keys will end up, does it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, dependents is speculative/heuristic. If we don't include that, the test fails and this PR basically does nothing, except if you've already broadcast a key to every worker before submitting some other tasks.

When the first root task completes and a dep task is getting assigned, the everywhere task is still only on one worker. It's actually the process of running the dep tasks that eventually copies everywhere everywhere. So without len(dts._dependents), this check won't kick in until we've already made n_workers bad decisions.

The idea is that if a task has lots of dependents, it's quite likely those dependents will run on many different workers. We're trying to guess when this situation will happen before it happens.

There are times ways this heuristic is wrong:

  • The many dependents don't have any other dependencies (so running them all on the same worker might actually be a good idea)
    • Though in some cases, they might get identified as root-ish tasks and spread across workers anyway
  • The many dependents have worker/resource restrictions

Here is a graph which currently behaves well, but under this PR will allow the deps to be assigned to any worker on a 4-worker cluster:
fantree
Because each root has >4 dependents, the location of the root is ignored when assigning the dependents and they get scattered all over.

I think this heuristic needs more thought. I have another idea I'm going to try out.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other idea (amortizing transfer cost by number of dependencies #5326) didn't work (though it may still be a good idea). The above case works, but then the original test in this PR still fails, because transfer cost is just low regardless, and occupancy is all that really matters.

Interestingly the above case would probably be handled by STA. I still feel like we should tackle STA first, since so many other things will change that it's hard to think about heuristics right now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I got this case working with e1fd58b, which I don't love, but maybe is okay enough?

< len(valid_workers if valid_workers is not None else all_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a / 2 here? "almost everywhere" might be enough of a condition here.

}
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
Expand Down
41 changes: 41 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,47 @@ def random(**kwargs):
test()


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 4,
config={"distributed.scheduler.work-stealing": False},
)
async def test_decide_worker_common_dep_ignored(client, s, *workers):
roots = [
delayed(slowinc)(1, 0.1 / (i + 1), dask_key_name=f"root-{i}") for i in range(16)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for choosing delayed over futures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. It made it easy to visualize the graph though. Would we prefer futures?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Typically I would prefer futures since if something goes wrong and we'd need to debug, futures contain fewer layers. However, the visualization argument is strong. I would recommend leaving a comment in-code to avoid overly eager refactoring down the line

]
# This shared dependency will get copied to all workers, eventually making all workers valid candidates for each dep
everywhere = delayed(None, name="everywhere")
deps = [
delayed(lambda x, y: None)(r, everywhere, dask_key_name=f"dep-{i}")
for i, r in enumerate(roots)
]

rs, ds = dask.persist(roots, deps)
await wait(ds)
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

keys = {
worker.name: dict(
root_keys=sorted(
[int(k.split("-")[1]) for k in worker.data if k.startswith("root")]
),
dep_keys=sorted(
[int(k.split("-")[1]) for k in worker.data if k.startswith("dep")]
),
)
for worker in workers
}

for k in keys.values():
assert k["root_keys"] == k["dep_keys"]

for worker in workers:
log = worker.incoming_transfer_log
if log:
assert len(log) == 1
assert list(log[0]["keys"]) == ["everywhere"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this test. Regardless of how to achieve this result, I think it makes a lot of sense for this computation pattern to not transfer any other keys 👍



@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_move_data_over_break_restrictions(client, s, a, b, c):
[x] = await client.scatter([1], workers=b.address)
Expand Down