Skip to content

Commit

Permalink
Handle multiple large subtrees
Browse files Browse the repository at this point in the history
This addresses the issue in #5325 (comment). It feels a little hacky since it can still be wrong (what if there are multiple root groups that have large subtrees?). We're trying to infer global graph structure (how mnay sibling tasks are there) using TaskGroups, which don't necessarily reflect graph structure.

It's also hard to explain the intuition for why this is right-ish (besides "well we need the `len(dts._dependents)` number to be smaller if it has siblings".)
  • Loading branch information
gjoseph92 committed Sep 17, 2021
1 parent aaa12a3 commit e1fd58b
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 9 deletions.
4 changes: 2 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7985,10 +7985,10 @@ def decide_worker(
candidates = {
wws
for dts in deps
for wws in dts._who_has
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._who_has), len(dts._dependents))
if max(len(dts._dependents) / len(dts._group), len(dts._who_has))
< len(valid_workers if valid_workers is not None else all_workers)
for wws in dts._who_has
}
if valid_workers is None:
if not candidates:
Expand Down
120 changes: 113 additions & 7 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,24 @@ def random(**kwargs):
test()


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 4,
config={"distributed.scheduler.work-stealing": False},
)
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4)
async def test_decide_worker_common_dep_ignored(client, s, *workers):
r"""
When we have basic linear chains, but all the downstream tasks also share a common dependency, ignore that dependency.
i j k l m n o p
\__\__\__\___/__/__/__/
| | | | | | | | |
| | | | X | | | |
a b c d e f g h
^ Ignore the location of X when picking a worker for i..p.
It will end up being copied to all workers anyway.
If a dependency will end up on every worker regardless, because many things depend on it,
we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering
every worker as a candidate, which is 1) slow and 2) often leads to poor choices.
"""
roots = [
delayed(slowinc)(1, 0.1 / (i + 1), dask_key_name=f"root-{i}") for i in range(16)
]
Expand All @@ -261,15 +273,15 @@ async def test_decide_worker_common_dep_ignored(client, s, *workers):
root_keys=sorted(
[int(k.split("-")[1]) for k in worker.data if k.startswith("root")]
),
dep_keys=sorted(
deps_of_root=sorted(
[int(k.split("-")[1]) for k in worker.data if k.startswith("dep")]
),
)
for worker in workers
}

for k in keys.values():
assert k["root_keys"] == k["dep_keys"]
assert k["root_keys"] == k["deps_of_root"]

for worker in workers:
log = worker.incoming_transfer_log
Expand All @@ -278,6 +290,100 @@ async def test_decide_worker_common_dep_ignored(client, s, *workers):
assert list(log[0]["keys"]) == ["everywhere"]


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4)
async def test_decide_worker_large_subtrees_colocated(client, s, *workers):
r"""
Ensure that the above "ignore common dependencies" logic doesn't affect wide (but isolated) subtrees.
........ ........ ........ ........
\\\\//// \\\\//// \\\\//// \\\\////
a b c d
Each one of a, b, etc. has more dependents than there are workers. But just because a has
lots of dependents doesn't necessarily mean it will end up copied to every worker.
Because a also has a few siblings, a's dependents shouldn't spread out over the whole cluster.
"""
roots = [delayed(inc)(i, dask_key_name=f"root-{i}") for i in range(len(workers))]
deps = [
delayed(inc)(r, dask_key_name=f"dep-{i}-{j}")
for i, r in enumerate(roots)
for j in range(len(workers) * 2)
]

rs, ds = dask.persist(roots, deps)
await wait(ds)

keys = {
worker.name: dict(
root_keys=set(
int(k.split("-")[1]) for k in worker.data if k.startswith("root")
),
deps_of_root=set(
int(k.split("-")[1]) for k in worker.data if k.startswith("dep")
),
)
for worker in workers
}

for k in keys.values():
assert k["root_keys"] == k["deps_of_root"]
assert len(k["root_keys"]) == len(roots) / len(workers)

for worker in workers:
assert not worker.incoming_transfer_log


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 4,
config={"distributed.scheduler.work-stealing": False},
)
async def test_decide_worker_large_multiroot_subtrees_colocated(client, s, *workers):
r"""
Same as the above test, but also check isolated trees with multiple roots.
........ ........ ........ ........
\\\\//// \\\\//// \\\\//// \\\\////
a b c d e f g h
"""
roots = [
delayed(inc)(i, dask_key_name=f"root-{i}") for i in range(len(workers) * 2)
]
deps = [
delayed(lambda x, y: None)(
r, roots[i * 2 + 1], dask_key_name=f"dep-{i * 2}-{j}"
)
for i, r in enumerate(roots[::2])
for j in range(len(workers) * 2)
]

rs, ds = dask.persist(roots, deps)
await wait(ds)

keys = {
worker.name: dict(
root_keys=set(
int(k.split("-")[1]) for k in worker.data if k.startswith("root")
),
deps_of_root=set().union(
*(
(int(k.split("-")[1]), int(k.split("-")[1]) + 1)
for k in worker.data
if k.startswith("dep")
)
),
)
for worker in workers
}

for k in keys.values():
assert k["root_keys"] == k["deps_of_root"]
assert len(k["root_keys"]) == len(roots) / len(workers)

for worker in workers:
assert not worker.incoming_transfer_log


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_move_data_over_break_restrictions(client, s, a, b, c):
[x] = await client.scatter([1], workers=b.address)
Expand Down

0 comments on commit e1fd58b

Please sign in to comment.