From 1c41702200e6a8d6ee490a91225ec979a1c9c800 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 16 Sep 2021 00:32:41 -0600 Subject: [PATCH 1/5] Ignore widely-shared dependencies in `decide_worker` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a dependency is already on every worker—or will end up on every worker regardless, because many things depend on it—we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering every worker as a candidate, which is 1) slow and 2) often leads to poor choices (xref https://github.com/dask/distributed/issues/5253, https://github.com/dask/distributed/issues/5324). Just like with root-ish tasks, this is particularly important at the beginning. Say we have a bunch of tasks `x, 0`..`x, 10` that each depend on `root, 0`..`root, 10` respectively, but every `x` also depends on one task called `everywhere`. If `x, 0` is ready first, but `root, 0` and `everywhere` live on different workers, it appears as though we have a legitimate choice to make: do we schedule near `root, 0`, or near `everywhere`? But if we choose to go closer to `everywhere`, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say that `everywhere` worker is about to complete `root, 6`. Now `x, 6` may run on yet another worker (because `x, 0` is already running where it should have gone). This can cascade through all the `x`s, until we've transferred most `root` tasks to different workers (on top of `everywhere`, which we have to transfer everywhere no matter what). The principle of this is the same as https://github.com/dask/distributed/pull/4967: be more forward-looking in worker assignment and accept a little short-term slowness to ensure that downstream tasks have to transfer less data. This PR is a binary choice, but I think we could actually generalize to some weight in `worker_objective` like: the more dependents or replicas a task has, the less weight we should give to the workers that hold it. I wonder if, barring significant data transfer inbalance, having stronger affinity for the more "rare" keys will tend to lead to better placement. --- distributed/scheduler.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 706e85c3b24..7ac88b2981b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7982,7 +7982,14 @@ def decide_worker( if ts._actor: candidates = set(all_workers) else: - candidates = {wws for dts in deps for wws in dts._who_has} + candidates = { + wws + for dts in deps + for wws in dts._who_has + # Ignore dependencies that will need to be, or already are, copied to all workers + if max(len(dts._who_has), len(dts._dependents)) + < len(valid_workers if valid_workers is not None else all_workers) + } if valid_workers is None: if not candidates: candidates = set(all_workers) From aaa12a37dcf95427ea4aefe278db2465a100486d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 16 Sep 2021 00:36:01 -0600 Subject: [PATCH 2/5] WIP test. This maybe could be more targeted. --- distributed/tests/test_scheduler.py | 41 +++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 23f59431cf0..bf1c890a9bd 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -237,6 +237,47 @@ def random(**kwargs): test() +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 4, + config={"distributed.scheduler.work-stealing": False}, +) +async def test_decide_worker_common_dep_ignored(client, s, *workers): + roots = [ + delayed(slowinc)(1, 0.1 / (i + 1), dask_key_name=f"root-{i}") for i in range(16) + ] + # This shared dependency will get copied to all workers, eventually making all workers valid candidates for each dep + everywhere = delayed(None, name="everywhere") + deps = [ + delayed(lambda x, y: None)(r, everywhere, dask_key_name=f"dep-{i}") + for i, r in enumerate(roots) + ] + + rs, ds = dask.persist(roots, deps) + await wait(ds) + + keys = { + worker.name: dict( + root_keys=sorted( + [int(k.split("-")[1]) for k in worker.data if k.startswith("root")] + ), + dep_keys=sorted( + [int(k.split("-")[1]) for k in worker.data if k.startswith("dep")] + ), + ) + for worker in workers + } + + for k in keys.values(): + assert k["root_keys"] == k["dep_keys"] + + for worker in workers: + log = worker.incoming_transfer_log + if log: + assert len(log) == 1 + assert list(log[0]["keys"]) == ["everywhere"] + + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_move_data_over_break_restrictions(client, s, a, b, c): [x] = await client.scatter([1], workers=b.address) From e1fd58b90bee03003b6f734ce10c780262307d8f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 16 Sep 2021 20:41:06 -0600 Subject: [PATCH 3/5] Handle multiple large subtrees This addresses the issue in https://github.com/dask/distributed/pull/5325#discussion_r710396643. It feels a little hacky since it can still be wrong (what if there are multiple root groups that have large subtrees?). We're trying to infer global graph structure (how mnay sibling tasks are there) using TaskGroups, which don't necessarily reflect graph structure. It's also hard to explain the intuition for why this is right-ish (besides "well we need the `len(dts._dependents)` number to be smaller if it has siblings".) --- distributed/scheduler.py | 4 +- distributed/tests/test_scheduler.py | 120 ++++++++++++++++++++++++++-- 2 files changed, 115 insertions(+), 9 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7ac88b2981b..5c3a69f788c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7985,10 +7985,10 @@ def decide_worker( candidates = { wws for dts in deps - for wws in dts._who_has # Ignore dependencies that will need to be, or already are, copied to all workers - if max(len(dts._who_has), len(dts._dependents)) + if max(len(dts._dependents) / len(dts._group), len(dts._who_has)) < len(valid_workers if valid_workers is not None else all_workers) + for wws in dts._who_has } if valid_workers is None: if not candidates: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index bf1c890a9bd..3210413d642 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -237,12 +237,24 @@ def random(**kwargs): test() -@gen_cluster( - client=True, - nthreads=[("127.0.0.1", 1)] * 4, - config={"distributed.scheduler.work-stealing": False}, -) +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4) async def test_decide_worker_common_dep_ignored(client, s, *workers): + r""" + When we have basic linear chains, but all the downstream tasks also share a common dependency, ignore that dependency. + + i j k l m n o p + \__\__\__\___/__/__/__/ + | | | | | | | | | + | | | | X | | | | + a b c d e f g h + + ^ Ignore the location of X when picking a worker for i..p. + It will end up being copied to all workers anyway. + + If a dependency will end up on every worker regardless, because many things depend on it, + we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering + every worker as a candidate, which is 1) slow and 2) often leads to poor choices. + """ roots = [ delayed(slowinc)(1, 0.1 / (i + 1), dask_key_name=f"root-{i}") for i in range(16) ] @@ -261,7 +273,7 @@ async def test_decide_worker_common_dep_ignored(client, s, *workers): root_keys=sorted( [int(k.split("-")[1]) for k in worker.data if k.startswith("root")] ), - dep_keys=sorted( + deps_of_root=sorted( [int(k.split("-")[1]) for k in worker.data if k.startswith("dep")] ), ) @@ -269,7 +281,7 @@ async def test_decide_worker_common_dep_ignored(client, s, *workers): } for k in keys.values(): - assert k["root_keys"] == k["dep_keys"] + assert k["root_keys"] == k["deps_of_root"] for worker in workers: log = worker.incoming_transfer_log @@ -278,6 +290,100 @@ async def test_decide_worker_common_dep_ignored(client, s, *workers): assert list(log[0]["keys"]) == ["everywhere"] +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4) +async def test_decide_worker_large_subtrees_colocated(client, s, *workers): + r""" + Ensure that the above "ignore common dependencies" logic doesn't affect wide (but isolated) subtrees. + + ........ ........ ........ ........ + \\\\//// \\\\//// \\\\//// \\\\//// + a b c d + + Each one of a, b, etc. has more dependents than there are workers. But just because a has + lots of dependents doesn't necessarily mean it will end up copied to every worker. + Because a also has a few siblings, a's dependents shouldn't spread out over the whole cluster. + """ + roots = [delayed(inc)(i, dask_key_name=f"root-{i}") for i in range(len(workers))] + deps = [ + delayed(inc)(r, dask_key_name=f"dep-{i}-{j}") + for i, r in enumerate(roots) + for j in range(len(workers) * 2) + ] + + rs, ds = dask.persist(roots, deps) + await wait(ds) + + keys = { + worker.name: dict( + root_keys=set( + int(k.split("-")[1]) for k in worker.data if k.startswith("root") + ), + deps_of_root=set( + int(k.split("-")[1]) for k in worker.data if k.startswith("dep") + ), + ) + for worker in workers + } + + for k in keys.values(): + assert k["root_keys"] == k["deps_of_root"] + assert len(k["root_keys"]) == len(roots) / len(workers) + + for worker in workers: + assert not worker.incoming_transfer_log + + +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 4, + config={"distributed.scheduler.work-stealing": False}, +) +async def test_decide_worker_large_multiroot_subtrees_colocated(client, s, *workers): + r""" + Same as the above test, but also check isolated trees with multiple roots. + + ........ ........ ........ ........ + \\\\//// \\\\//// \\\\//// \\\\//// + a b c d e f g h + """ + roots = [ + delayed(inc)(i, dask_key_name=f"root-{i}") for i in range(len(workers) * 2) + ] + deps = [ + delayed(lambda x, y: None)( + r, roots[i * 2 + 1], dask_key_name=f"dep-{i * 2}-{j}" + ) + for i, r in enumerate(roots[::2]) + for j in range(len(workers) * 2) + ] + + rs, ds = dask.persist(roots, deps) + await wait(ds) + + keys = { + worker.name: dict( + root_keys=set( + int(k.split("-")[1]) for k in worker.data if k.startswith("root") + ), + deps_of_root=set().union( + *( + (int(k.split("-")[1]), int(k.split("-")[1]) + 1) + for k in worker.data + if k.startswith("dep") + ) + ), + ) + for worker in workers + } + + for k in keys.values(): + assert k["root_keys"] == k["deps_of_root"] + assert len(k["root_keys"]) == len(roots) / len(workers) + + for worker in workers: + assert not worker.incoming_transfer_log + + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_move_data_over_break_restrictions(client, s, a, b, c): [x] = await client.scatter([1], workers=b.address) From e5175ce4dbee3771d78b945659a80cb2e7223957 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 16 Sep 2021 21:02:04 -0600 Subject: [PATCH 4/5] Root-ish tasks logic updates The goal is to identify a specific situation: fan-outs where the group is larger than the cluster, but the dependencies are (much) smaller than the cluster. When this is the case, scheduling near the dependencies is pointless, since you know those workers will get filled up and the dependencies will have to get copied everywhere anyway. So you want to instead schedule in a methodical order which ends up keeping neighbors together. But the key is really crossing that boundary of cluster size. Hence these changes: * `total_nthreads * 2` -> `total_nthreads`: so long as every thread will be saturated by this group, we know every worker will need all its dependencies. The 2x requirement is too restrictive. * Switch magic 5 to `min(5, len(self.workers))`: if you have 3 workers, and your group has 3 dependencies, you actually _should_ try to schedule near those dependencies. Then each worker only needs 1 dependency, instead of copying all 3 dependencies to all 3 workers. If you had 20 workers, duplicating the dependencies would be unavoidable (without leaving most of the workers idle). But here, it is avoidable while maintaining parallelism, so avoid it. I'm actually wondering if we should get rid of magic 5 entirely, and just use a cluster-size metric. Like `len(self.workers) / 4` or something. If you have 1,000 workers, and a multi-thousand group has 20 dependencies, maybe you do want to copy those 20 dependencies to all workers up front. But if you only had 30 workers, you'd be much better off considering locality. --- distributed/scheduler.py | 9 +++++---- distributed/tests/test_scheduler.py | 3 +++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5c3a69f788c..fbc64fe0845 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2485,12 +2485,13 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ts.state = "no-worker" return ws - # Group is larger than cluster with few dependencies? Minimize future data transfers. + # Group fills the cluster and dependencies are much smaller than cluster? Minimize future data transfers. + ndeps_cutoff: Py_ssize_t = min(5, len(self._workers_dv)) if ( valid_workers is None - and len(group) > self._total_nthreads * 2 - and len(group._dependencies) < 5 - and sum(map(len, group._dependencies)) < 5 + and len(group) >= self._total_nthreads + and len(group._dependencies) < ndeps_cutoff + and sum(map(len, group._dependencies)) < ndeps_cutoff ): ws: WorkerState = group._last_worker diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 3210413d642..4e1079fbe22 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -138,6 +138,9 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): ], ) def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): + if ndeps >= len(nthreads): + pytest.skip() + @gen_cluster( client=True, nthreads=nthreads, From d0f0955e6436fcf9536c93d24ddc558529de234d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 20 Sep 2021 12:51:00 -0500 Subject: [PATCH 5/5] WIP slightly different metric Here we're assuming that all tasks in the group have a similar number of dependents / degree of fan-out. Then if this dependency is widely used enough to fill the cluster, and there are not nearly enough like it to fill the cluster, then we should be okay with copying it around to enable parallelism (ignoring it, since other dependencies of the task are likely more important). --- distributed/scheduler.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fbc64fe0845..b44156d4e3d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7979,6 +7979,9 @@ def decide_worker( dts: TaskState deps: set = ts._dependencies candidates: set + n_workers: Py_ssize_t = len( + valid_workers if valid_workers is not None else all_workers + ) assert all([dts._who_has for dts in deps]) if ts._actor: candidates = set(all_workers) @@ -7987,8 +7990,18 @@ def decide_worker( wws for dts in deps # Ignore dependencies that will need to be, or already are, copied to all workers - if max(len(dts._dependents) / len(dts._group), len(dts._who_has)) - < len(valid_workers if valid_workers is not None else all_workers) + if len(dts._who_has) < n_workers + and not ( + len(dts._dependents) >= n_workers + and len(dts._group) < n_workers // 2 + # Really want something like: + # map(len, dts._group._dependents) >= nthreads and len(dts._group) < n_workers // 2 + # Or at least + # len(dts._dependents) * len(dts._group) >= nthreads and len(dts._group) < n_workers // 2 + # But `nthreads` is O(k) to calcualte if given `valid_workers`. + # and the `map(len, dts._group._dependents)` could be extremely expensive since we can't put + # much of an upper bound on it. + ) for wws in dts._who_has } if valid_workers is None: