From c760eed78a7ab4d0d7bc5ae203c1507c241db3e1 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 17 Oct 2022 11:29:25 +0200 Subject: [PATCH 1/6] Algo for cogroups --- distributed/_coassignment_group.py | 42 ++++ distributed/scheduler.py | 14 +- distributed/system_monitor.py | 2 +- distributed/tests/test_coassignmnet_group.py | 214 +++++++++++++++++++ 4 files changed, 270 insertions(+), 2 deletions(-) create mode 100644 distributed/_coassignment_group.py create mode 100644 distributed/tests/test_coassignmnet_group.py diff --git a/distributed/_coassignment_group.py b/distributed/_coassignment_group.py new file mode 100644 index 00000000000..1731e033573 --- /dev/null +++ b/distributed/_coassignment_group.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from typing import Sequence + +from distributed.scheduler import TaskState + + +def coassignmnet_groups( + tasks: Sequence[TaskState], start: int = 0 +) -> dict[int, set[TaskState]]: + groups = {} + group = start + ix = 0 + min_prio = None + max_prio = None + while ix < len(tasks): + current = tasks[ix] + if min_prio is None: + min_prio = ix + + if not current.dependents: + min_prio = None + max_prio = None + ix += 1 + continue + # There is a way to implement this faster by just continuing to iterate + # over ix and check if the next is a dependent or not. I chose to go + # this route because this is what we wrote down initially + next = min(current.dependents, key=lambda ts: ts.priority) + next_ix = tasks.index(next) + if next_ix != ix + 1: + # Detect a jump + max_prio = next_ix + groups[group] = set(tasks[min_prio : max_prio + 1]) + group += 1 + ix = max_prio + 1 + min_prio = None + max_prio = None + else: + ix = next_ix + + return groups diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3316a324efd..f1b5d5c448f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -63,6 +63,7 @@ from distributed import cluster_dump, preloading, profile from distributed import versions as version_module +from distributed._coassignmnet_group import coassignmnet_groups from distributed._stories import scheduler_story from distributed.active_memory_manager import ActiveMemoryManagerExtension, RetireWorker from distributed.batched import BatchedSend @@ -1307,6 +1308,8 @@ class TaskState: #: Task annotations annotations: dict[str, Any] + cogroup: int + #: Cached hash of :attr:`~TaskState.client_key` _hash: int @@ -4573,7 +4576,16 @@ def update_graph( # Compute recommendations recommendations: dict = {} - for ts in sorted(runnables, key=operator.attrgetter("priority"), reverse=True): + sorted_tasks = sorted( + runnables, key=operator.attrgetter("priority"), reverse=True + ) + + self.cogroups = coassignmnet_groups(sorted_tasks, start=max(self.cogroups) + 1) + for gr_ix, tss in self.cogroups.items(): + for ts in tss: + ts.cogroup = gr_ix + + for ts in sorted_tasks: if ts.state == "released" and ts.run_spec: recommendations[ts.key] = "waiting" diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index d92dac6c119..d6e98f7cb48 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -70,7 +70,7 @@ def __init__( else: if disk_ioc is None: # pragma: nocover # diskless machine - monitor_disk_io = False + monitor_disk_io = False # type: ignore[unreachable] else: self._last_disk_io_counters = disk_ioc self.quantities["host_disk_io.read_bps"] = deque(maxlen=maxlen) diff --git a/distributed/tests/test_coassignmnet_group.py b/distributed/tests/test_coassignmnet_group.py new file mode 100644 index 00000000000..e68212b781e --- /dev/null +++ b/distributed/tests/test_coassignmnet_group.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import pytest + +import dask + +from distributed._coassignment_group import coassignmnet_groups +from distributed.scheduler import TaskGroup, TaskState + + +def f(*args): + return None + + +@pytest.fixture(params=["abcde", "edcba"]) +def abcde(request): + return request.param + + +def dummy_dsk_to_taskstate(dsk: dict) -> list[TaskState]: + task_groups: dict[str, TaskGroup] = {} + tasks = dict() + priority = dask.order.order(dsk) + for key in dsk: + tasks[key] = ts = TaskState(key, None, "released") + ts.group = task_groups.get(ts.group_key, TaskGroup(ts.group_key)) + task_groups[ts.group_key] = ts.group + ts.priority = priority[key] + for key, (_, *deps) in dsk.items(): + for d in deps: + if d not in tasks: + raise ValueError(f"Malformed example. {d} not part of dsk") + tasks[key].add_dependency(tasks[d]) + return sorted(tasks.values(), key=lambda ts: ts.priority) + + +def test_tree_reduce(abcde): + r""" """ + a, b, c, _, _ = abcde + a1, a2, a3, a4, a5, a6, a7, a8, a9 = (a + i for i in "123456789") + b1, b2, b3, b4 = (b + i for i in "1234") + dsk = { + a1: (f,), + a2: (f,), + a3: (f,), + b1: (f, a1, a2, a3), + a4: (f,), + a5: (f,), + a6: (f,), + b2: (f, a4, a5, a6), + a7: (f,), + a8: (f,), + a9: (f,), + b3: (f, a7, a8, a9), + c: (f, b1, b2, b3), + } + tasks = dummy_dsk_to_taskstate(dsk) + assert isinstance(tasks, list) + cogroups = coassignmnet_groups(tasks) + assert len(cogroups) == 3 + + +def test_nearest_neighbor(abcde): + r""" + a1 a2 a3 a4 a5 a6 a7 a8 a9 + \ | / \ | / \ | / \ | / + b1 b2 b3 b4 + + No co-groups + """ + a, b, c, _, _ = abcde + a1, a2, a3, a4, a5, a6, a7, a8, a9 = (a + i for i in "123456789") + b1, b2, b3, b4 = (b + i for i in "1234") + + dsk = { + b1: (f,), + b2: (f,), + b3: (f,), + b4: (f,), + a1: (f, b1), + a2: (f, b1), + a3: (f, b1, b2), + a4: (f, b2), + a5: (f, b2, b3), + a6: (f, b3), + a7: (f, b3, b4), + a8: (f, b4), + a9: (f, b4), + } + tasks = dummy_dsk_to_taskstate(dsk) + assert isinstance(tasks, list) + cogroups = coassignmnet_groups(tasks) + assert len(cogroups) == 0 + + +def test_deep_bases_win_over_dependents(abcde): + r""" + It's not clear who should run first, e or d + + 1. d is nicer because it exposes parallelism + 2. e is nicer (hypothetically) because it will be sooner released + (though in this case we need d to run first regardless) + + Regardless of e or d first, we should run b before c. + + a + / | \ . + b c | + / \ | / + e d + """ + a, b, c, d, e = abcde + dsk = {a: (f, b, c, d), b: (f, d, e), c: (f, d), d: (f,), e: (f,)} + + tasks = dummy_dsk_to_taskstate(dsk) + assert isinstance(tasks, list) + cogroups = coassignmnet_groups(tasks) + assert len(cogroups) == 1 + assert len(cogroups[0]) == 3 + + +def test_base_of_reduce_preferred(abcde): + r""" + a3 + /| + a2 | + /| | + a1 | | + /| | | + a0 | | | + | | | | + b0 b1 b2 b3 + \ \ / / + c + + """ + a, b, c, d, e = abcde + dsk = {(a, i): (f, (a, i - 1), (b, i)) for i in [1, 2, 3]} + dsk[(a, 0)] = (f, (b, 0)) + dsk.update({(b, i): (f, c) for i in [0, 1, 2, 3]}) + dsk[c] = (f,) + + tasks = dummy_dsk_to_taskstate(dsk) + assert isinstance(tasks, list) + cogroups = coassignmnet_groups(tasks) + + assert len(cogroups) == 2 + assert {ts.key for ts in cogroups[0]} == { + c, + (b, 0), + (b, 1), + (a, 0), + (a, 1), + } + assert sum(map(len, cogroups.values())) == len(tasks) + + +def test_map_overlap(abcde): + r""" + b1 b3 b5 + |\ / | \ / | + c1 c2 c3 c4 c5 + |/ | \ | / | \| + d1 d2 d3 d4 d5 + | | | + e1 e3 e5 + + Want to finish b1 before we start on e5 + """ + a, b, c, d, e = abcde + dsk = { + (e, 1): (f,), + (d, 1): (f, (e, 1)), + (c, 1): (f, (d, 1)), + (b, 1): (f, (c, 1), (c, 2)), + (d, 2): (f,), + (c, 2): (f, (d, 1), (d, 2), (d, 3)), + (e, 3): (f,), + (d, 3): (f, (e, 3)), + (c, 3): (f, (d, 3)), + (b, 3): (f, (c, 2), (c, 3), (c, 4)), + (d, 4): (f,), + (c, 4): (f, (d, 3), (d, 4), (d, 5)), + (e, 5): (f,), + (d, 5): (f, (e, 5)), + (c, 5): (f, (d, 5)), + (b, 5): (f, (c, 4), (c, 5)), + } + + tasks = dummy_dsk_to_taskstate(dsk) + assert isinstance(tasks, list) + cogroups = coassignmnet_groups(tasks) + assert len(cogroups) == 2 + + assert {ts.key for ts in cogroups[0]} == { + (e, 1), + (d, 1), + (c, 1), + (c, 2), + (d, 3), + (d, 2), + (e, 3), + } + assert {ts.key for ts in cogroups[1]} == { + # Why is this not part of the group? Maybe linked to order output + # (b, 5), + (c, 5), + (d, 5), + (e, 5), + (c, 4), + (d, 4), + } + # Not all belong to a cogroup + assert sum(map(len, cogroups.values())) != len(tasks) From 3579451135b09040d1f83d87ea4a240bff461c70 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 17 Oct 2022 15:13:04 +0200 Subject: [PATCH 2/6] Simple decide_worker based on cogroups --- distributed/_coassignment_group.py | 42 ------- distributed/scheduler.py | 113 ++++++++++++++++--- distributed/tests/test_client.py | 19 ++++ distributed/tests/test_coassignmnet_group.py | 98 ++++++++++++++-- distributed/tests/test_scheduler.py | 92 +++++++++++++++ 5 files changed, 298 insertions(+), 66 deletions(-) delete mode 100644 distributed/_coassignment_group.py diff --git a/distributed/_coassignment_group.py b/distributed/_coassignment_group.py deleted file mode 100644 index 1731e033573..00000000000 --- a/distributed/_coassignment_group.py +++ /dev/null @@ -1,42 +0,0 @@ -from __future__ import annotations - -from typing import Sequence - -from distributed.scheduler import TaskState - - -def coassignmnet_groups( - tasks: Sequence[TaskState], start: int = 0 -) -> dict[int, set[TaskState]]: - groups = {} - group = start - ix = 0 - min_prio = None - max_prio = None - while ix < len(tasks): - current = tasks[ix] - if min_prio is None: - min_prio = ix - - if not current.dependents: - min_prio = None - max_prio = None - ix += 1 - continue - # There is a way to implement this faster by just continuing to iterate - # over ix and check if the next is a dependent or not. I chose to go - # this route because this is what we wrote down initially - next = min(current.dependents, key=lambda ts: ts.priority) - next_ix = tasks.index(next) - if next_ix != ix + 1: - # Detect a jump - max_prio = next_ix - groups[group] = set(tasks[min_prio : max_prio + 1]) - group += 1 - ix = max_prio + 1 - min_prio = None - max_prio = None - else: - ix = next_ix - - return groups diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f1b5d5c448f..dda3b28a8fd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -63,7 +63,6 @@ from distributed import cluster_dump, preloading, profile from distributed import versions as version_module -from distributed._coassignmnet_group import coassignmnet_groups from distributed._stories import scheduler_story from distributed.active_memory_manager import ActiveMemoryManagerExtension, RetireWorker from distributed.batched import BatchedSend @@ -1308,7 +1307,7 @@ class TaskState: #: Task annotations annotations: dict[str, Any] - cogroup: int + cogroup: int | None #: Cached hash of :attr:`~TaskState.client_key` _hash: int @@ -1354,6 +1353,7 @@ def __init__(self, key: str, run_spec: object, state: TaskStateState): self.metadata = {} self.annotations = {} self.erred_on = set() + self.cogroup = None TaskState._instances.add(self) def __hash__(self) -> int: @@ -1471,6 +1471,7 @@ class SchedulerState: """ bandwidth: int + cogroups: dict[int, set[TaskState]] #: Clients currently connected to the scheduler clients: dict[str, ClientState] @@ -1626,7 +1627,7 @@ def __init__( ws for ws in self.workers.values() if ws.status == Status.running } self.plugins = {} if not plugins else {_get_plugin_name(p): p for p in plugins} - + self.cogroups = {} # Variables from dask.config, cached by __init__ for performance self.UNKNOWN_TASK_DURATION = parse_timedelta( dask.config.get("distributed.scheduler.unknown-task-duration") @@ -2069,6 +2070,40 @@ def transition_no_worker_memory( pdb.set_trace() raise + def cogroup_objective(self, cogroup: int, ws: WorkerState) -> tuple: + # Cogroups are not always connected subgraphs but if we assume they + # were, only the top prio task would need a transfer + tasks_in_group = self.cogroups[cogroup] + # TODO: this could be made more efficient / we should remeber max if it is required + ts_top_prio = max(tasks_in_group, key=lambda ts: ts.priority) + dts: TaskState + comm_bytes: int = 0 + cotasks_on_worker = 0 + for ts in tasks_in_group: + if ts in ws.processing or ws in ts.who_has: + cotasks_on_worker += 1 + for dts in ts_top_prio.dependencies: + if ( + # This is new compared to worker_objective + (dts not in tasks_in_group or dts not in ws.processing) + and ws not in dts.who_has + ): + nbytes = dts.get_nbytes() + comm_bytes += nbytes + + stack_time: float = ws.occupancy / ws.nthreads + start_time: float = stack_time + comm_bytes / self.bandwidth + + if ts_top_prio.actor: + raise NotImplementedError("Cogroup assignment for actors not implemented") + else: + return (-cotasks_on_worker, start_time, ws.nbytes) + + def decide_worker_cogroup(self, ts) -> WorkerState | None: + assert ts.cogroup is not None + pool = self.running + return min(pool, key=partial(self.cogroup_objective, ts.cogroup)) + def decide_worker_rootish_queuing_disabled( self, ts: TaskState ) -> WorkerState | None: @@ -2254,21 +2289,13 @@ def transition_waiting_processing(self, key, stimulus_id): """ try: ts: TaskState = self.tasks[key] - - if self.is_rootish(ts): - # NOTE: having two root-ish methods is temporary. When the feature flag is removed, - # there should only be one, which combines co-assignment and queuing. - # Eventually, special-casing root tasks might be removed entirely, with better heuristics. - if math.isinf(self.WORKER_SATURATION): - if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): - return {ts.key: "no-worker"}, {}, {} - else: - if not (ws := self.decide_worker_rootish_queuing_enabled()): - return {ts.key: "queued"}, {}, {} + if ts.cogroup is not None: + decider = self.decide_worker_cogroup else: - if not (ws := self.decide_worker_non_rootish(ts)): - return {ts.key: "no-worker"}, {}, {} + decider = self.decide_worker_non_rootish + if not (ws := decider(ts)): + return {ts.key: "no-worker"}, {}, {} worker_msgs = _add_to_processing(self, ts, ws) return {}, {}, worker_msgs except Exception as e: @@ -4580,7 +4607,12 @@ def update_graph( runnables, key=operator.attrgetter("priority"), reverse=True ) - self.cogroups = coassignmnet_groups(sorted_tasks, start=max(self.cogroups) + 1) + if self.cogroups: + start = max(self.cogroups.keys()) + 1 + else: + start = 0 + cogroups = coassignmnet_groups(sorted_tasks[::-1], start=start) + self.cogroups.update(cogroups) for gr_ix, tss in self.cogroups.items(): for ts in tss: ts.cogroup = gr_ix @@ -8420,3 +8452,50 @@ def transition( self.metadata[key] = ts.metadata self.state[key] = finish self.keys.discard(key) + + +def coassignmnet_groups( + tasks: Sequence[TaskState], start: int = 0 +) -> dict[int, set[TaskState]]: + groups = {} + group = start + ix = 0 + min_prio = None + max_prio = None + + group_dependents_seen = set() + while ix < len(tasks): + current = tasks[ix] + if min_prio is None: + min_prio = ix + + if current in group_dependents_seen or not current.dependents: + min_prio = None + max_prio = None + ix += 1 + continue + # There is a way to implement this faster by just continuing to iterate + # over ix and check if the next is a dependent or not. I chose to go + # this route because this is what we wrote down initially + next = min(current.dependents, key=lambda ts: ts.priority) + next_ix = tasks.index(next) + + # Detect a jump + if next_ix != ix + 1: + while len(next.dependents) == 1: + dep = list(next.dependents)[0] + if len(dep.dependencies) != 1: + # This algorithm has the shortcoming that groups may grow too large if the dependent of a group + group_dependents_seen.add(dep) + break + next = dep + max_prio = tasks.index(next) + 1 + groups[group] = set(tasks[min_prio:max_prio]) + group += 1 + ix = max_prio + min_prio = None + max_prio = None + else: + ix = next_ix + + return groups diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index fcaa35e5bd3..998d7305508 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7628,3 +7628,22 @@ async def test_wait_for_workers_n_workers_value_check(c, s, a, b, value, excepti ctx = nullcontext() with ctx: await c.wait_for_workers(value) + + +@gen_cluster(client=True) +async def test_repartition_coassignment(c, s, a, b): + ddf = dask.datasets.timeseries( + start="2000-01-01", + end="2000-01-17", + dtypes={"x": float, "y": float}, + freq="1d", + ) + assert ddf.npartitions == 16 + ddf_repart = ddf.repartition(npartitions=ddf.npartitions // 2) + + fut = c.compute(ddf_repart) + + while not a.state.tasks and b.state.tasks: + await asyncio.sleep(0.1) + + assert len(a.state.tasks) == len(b.state.tasks) diff --git a/distributed/tests/test_coassignmnet_group.py b/distributed/tests/test_coassignmnet_group.py index e68212b781e..fe431fc063b 100644 --- a/distributed/tests/test_coassignmnet_group.py +++ b/distributed/tests/test_coassignmnet_group.py @@ -4,15 +4,21 @@ import dask -from distributed._coassignment_group import coassignmnet_groups -from distributed.scheduler import TaskGroup, TaskState +from distributed.scheduler import TaskGroup, TaskState, coassignmnet_groups def f(*args): return None -@pytest.fixture(params=["abcde", "edcba"]) +def assert_disjoint_sets(cogroups): + seen = set() + for values in cogroups.values(): + assert not (seen & values) + seen.update(values) + + +@pytest.fixture(params=["abcde"]) def abcde(request): return request.param @@ -26,8 +32,14 @@ def dummy_dsk_to_taskstate(dsk: dict) -> list[TaskState]: ts.group = task_groups.get(ts.group_key, TaskGroup(ts.group_key)) task_groups[ts.group_key] = ts.group ts.priority = priority[key] - for key, (_, *deps) in dsk.items(): - for d in deps: + for key, vals in dsk.items(): + stack = list(vals[1:]) + while stack: + d = stack.pop() + if isinstance(d, list): + stack.extend(d) + continue + assert isinstance(d, (str, tuple, int)) if d not in tasks: raise ValueError(f"Malformed example. {d} not part of dsk") tasks[key].add_dependency(tasks[d]) @@ -57,6 +69,7 @@ def test_tree_reduce(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) cogroups = coassignmnet_groups(tasks) + assert_disjoint_sets(cogroups) assert len(cogroups) == 3 @@ -90,6 +103,7 @@ def test_nearest_neighbor(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) cogroups = coassignmnet_groups(tasks) + assert_disjoint_sets(cogroups) assert len(cogroups) == 0 @@ -115,6 +129,7 @@ def test_deep_bases_win_over_dependents(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) cogroups = coassignmnet_groups(tasks) + assert_disjoint_sets(cogroups) assert len(cogroups) == 1 assert len(cogroups[0]) == 3 @@ -143,8 +158,9 @@ def test_base_of_reduce_preferred(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) cogroups = coassignmnet_groups(tasks) + assert_disjoint_sets(cogroups) - assert len(cogroups) == 2 + assert len(cogroups) == 1 assert {ts.key for ts in cogroups[0]} == { c, (b, 0), @@ -152,7 +168,6 @@ def test_base_of_reduce_preferred(abcde): (a, 0), (a, 1), } - assert sum(map(len, cogroups.values())) == len(tasks) def test_map_overlap(abcde): @@ -190,6 +205,7 @@ def test_map_overlap(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) cogroups = coassignmnet_groups(tasks) + assert_disjoint_sets(cogroups) assert len(cogroups) == 2 assert {ts.key for ts in cogroups[0]} == { @@ -212,3 +228,71 @@ def test_map_overlap(abcde): } # Not all belong to a cogroup assert sum(map(len, cogroups.values())) != len(tasks) + + +def test_repartition(): + ddf = dask.datasets.timeseries( + start="2000-01-01", + end="2000-01-17", + dtypes={"x": float, "y": float}, + freq="1d", + ) + assert ddf.npartitions == 16 + ddf_repart = ddf.repartition(npartitions=ddf.npartitions // 2) + dsk = ddf_repart.dask.to_dict() + for k, _ in list(dsk.items()): + if k[0].startswith("make-timeseries"): + dsk[k] = (f,) + tasks = dummy_dsk_to_taskstate(dsk) + cogroups = coassignmnet_groups(tasks) + assert_disjoint_sets(cogroups) + + assert len(cogroups) == ddf.npartitions // 2 + + +def test_repartition_reduce(abcde): + a, b, c, d, e = abcde + a1, a2, a3, a4, a5, a6, a7, a8 = (a + i for i in "12345678") + b1, b2, b3, b4 = (b + i for i in "1234") + c1, c2, c3, c4 = (c + i for i in "1234") + d1, d2 = (d + i for i in "12") + + dsk = { + # Roots + a1: (f,), + a2: (f,), + a3: (f,), + a4: (f,), + a5: (f,), + a6: (f,), + a7: (f,), + a8: (f,), + # Trivial reduce, e.g. repartition + b1: (f, a1, a2), + b2: (f, a3, a4), + b3: (f, a5, a6), + b4: (f, a7, a8), + # A linear chain + c1: (f, b1), + c2: (f, b2), + c3: (f, b3), + c4: (f, b4), + # Tree reduce + d1: (f, c1, c2), + d2: (f, c3, c4), + e: (f, d1, d2), + } + tasks = dummy_dsk_to_taskstate(dsk) + cogroups = coassignmnet_groups(tasks) + assert_disjoint_sets(cogroups) + + assert all( + [1 == sum([ts.key.startswith("b") for ts in gr]) for gr in cogroups.values()] + ) + assert all( + [1 == sum([ts.key.startswith("c") for ts in gr]) for gr in cogroups.values()] + ) + assert all( + [2 == sum([ts.key.startswith("a") for ts in gr]) for gr in cogroups.values()] + ) + assert len(cogroups) == 4 diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ed87b1fb5fe..f97c50cb910 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -17,6 +17,7 @@ import psutil import pytest from tlz import concat, first, merge, valmap +from toolz import partition from tornado.ioloop import IOLoop, PeriodicCallback import dask @@ -4009,3 +4010,94 @@ async def test_count_task_prefix(c, s, a, b): assert s.task_prefixes["inc"].state_counts["memory"] == 20 assert s.task_prefixes["inc"].state_counts["erred"] == 0 + + +@gen_cluster( + client=True, + nthreads=[("", 1)] * 6, + config={"distributed.scheduler.worker-saturation": 1.0}, +) +async def test_utilization_over_co_assignment(c, s, *workers): + event = Event() + roots = [delayed(event.wait)(5, dask_key_name=f"r-{i}") for i in range(6)] + aggs = [ + delayed(list)(rs, dask_key_name=f"a-{i}") + for i, rs in enumerate(partition(2, roots)) + ] + fs = c.compute(aggs) + + await async_wait_for(lambda: any(w.state.tasks for w in workers), timeout=500) + + # All workers should be used, even though it breaks up co-assignment + assert not s.idle + rts = [s.tasks[r.key] for r in roots] + assert {ts.processing_on for ts in rts} == set(s.workers.values()) + + await event.set() + await wait(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 2)] * 2, + config={"distributed.scheduler.worker-saturation": 1.0}, +) +async def test_co_assign_scale_up(c, s, a, b): + event = Event() + devent = delayed(event) + roots = [devent.wait(5, dask_key_name=f"r-{i}") for i in range(16)] + aggs = [ + delayed(list)(rs, dask_key_name=f"a-{i}") + for i, rs in enumerate(partition(4, roots)) + ] + fs = c.compute(aggs) + + await async_wait_for(lambda: s.queued, timeout=5) + + # Each family of roots should be processing on the same worker, or not at all + for agg in aggs: + tss = s.tasks[agg.key].dependencies + proc = [ts.processing_on for ts in tss] + assert proc == proc[:1] * len(proc) + + async with Worker(s.address, nthreads=2) as w: + await async_wait_for(lambda: w.state.tasks, timeout=5) + assert len(w.state.tasks) == 5 # 4 `r` + the Event + + # Each family of roots should be processing on the same worker, or not at all + for agg in aggs: + tss = s.tasks[agg.key].dependencies + proc = [ts.processing_on for ts in tss] + assert len(set(proc)) == 1, proc + + await event.set() + await wait(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 2)] * 3, + config={"distributed.scheduler.worker-saturation": 1.0}, +) +async def test_co_assign_scale_down(c, s, *workers): + event = Event() + roots = [delayed(event.wait)(5, dask_key_name=f"r-{i}") for i in range(16)] + aggs = [ + delayed(list)(rs, dask_key_name=f"a-{i}") + for i, rs in enumerate(partition(4, roots)) + ] + # pin roots so we can check where they are at the end + fs = c.compute(aggs + roots) + + await async_wait_for(lambda: s.queued, timeout=5) + + await workers[0].close() + await event.set() + await wait(fs) + + for r in roots: + ts = s.tasks[r.key] + assert len(ts.who_has) == 1, ts.who_has + + for w in workers: + assert not w.transfer_incoming_log From 1344f9c8ccfd5589fd8060d5f45e1caceba29a56 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 17 Oct 2022 15:16:46 +0200 Subject: [PATCH 3/6] Fix comment --- distributed/scheduler.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index dda3b28a8fd..00e1b89d0d7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8485,7 +8485,13 @@ def coassignmnet_groups( while len(next.dependents) == 1: dep = list(next.dependents)[0] if len(dep.dependencies) != 1: - # This algorithm has the shortcoming that groups may grow too large if the dependent of a group + # This algorithm has the shortcoming that groups may grow + # too large if we walk straight to the dependent of a group. + # Especially in staged reductions (e.g. tree reductions, the + # next "jump" would combine multiple cogroups). For now, + # just ignore these. This means that we'll practically only + # have cogroups at the bottom of a graph but this is where + # it matters the most anyhow group_dependents_seen.add(dep) break next = dep From 4e82242ebdb3ceeaa7f6b529cbde607f6d10ce38 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 19 Oct 2022 10:26:13 +0200 Subject: [PATCH 4/6] Rename function --- distributed/scheduler.py | 4 ++-- distributed/system_monitor.py | 2 +- distributed/tests/test_coassignmnet_group.py | 16 ++++++++-------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 00e1b89d0d7..27165c1552e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4611,7 +4611,7 @@ def update_graph( start = max(self.cogroups.keys()) + 1 else: start = 0 - cogroups = coassignmnet_groups(sorted_tasks[::-1], start=start) + cogroups = coassignment_groups(sorted_tasks[::-1], start=start) self.cogroups.update(cogroups) for gr_ix, tss in self.cogroups.items(): for ts in tss: @@ -8454,7 +8454,7 @@ def transition( self.keys.discard(key) -def coassignmnet_groups( +def coassignment_groups( tasks: Sequence[TaskState], start: int = 0 ) -> dict[int, set[TaskState]]: groups = {} diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index d6e98f7cb48..d92dac6c119 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -70,7 +70,7 @@ def __init__( else: if disk_ioc is None: # pragma: nocover # diskless machine - monitor_disk_io = False # type: ignore[unreachable] + monitor_disk_io = False else: self._last_disk_io_counters = disk_ioc self.quantities["host_disk_io.read_bps"] = deque(maxlen=maxlen) diff --git a/distributed/tests/test_coassignmnet_group.py b/distributed/tests/test_coassignmnet_group.py index fe431fc063b..4c3cf08bb59 100644 --- a/distributed/tests/test_coassignmnet_group.py +++ b/distributed/tests/test_coassignmnet_group.py @@ -4,7 +4,7 @@ import dask -from distributed.scheduler import TaskGroup, TaskState, coassignmnet_groups +from distributed.scheduler import TaskGroup, TaskState, coassignment_groups def f(*args): @@ -68,7 +68,7 @@ def test_tree_reduce(abcde): } tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) - cogroups = coassignmnet_groups(tasks) + cogroups = coassignment_groups(tasks) assert_disjoint_sets(cogroups) assert len(cogroups) == 3 @@ -102,7 +102,7 @@ def test_nearest_neighbor(abcde): } tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) - cogroups = coassignmnet_groups(tasks) + cogroups = coassignment_groups(tasks) assert_disjoint_sets(cogroups) assert len(cogroups) == 0 @@ -128,7 +128,7 @@ def test_deep_bases_win_over_dependents(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) - cogroups = coassignmnet_groups(tasks) + cogroups = coassignment_groups(tasks) assert_disjoint_sets(cogroups) assert len(cogroups) == 1 assert len(cogroups[0]) == 3 @@ -157,7 +157,7 @@ def test_base_of_reduce_preferred(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) - cogroups = coassignmnet_groups(tasks) + cogroups = coassignment_groups(tasks) assert_disjoint_sets(cogroups) assert len(cogroups) == 1 @@ -204,7 +204,7 @@ def test_map_overlap(abcde): tasks = dummy_dsk_to_taskstate(dsk) assert isinstance(tasks, list) - cogroups = coassignmnet_groups(tasks) + cogroups = coassignment_groups(tasks) assert_disjoint_sets(cogroups) assert len(cogroups) == 2 @@ -244,7 +244,7 @@ def test_repartition(): if k[0].startswith("make-timeseries"): dsk[k] = (f,) tasks = dummy_dsk_to_taskstate(dsk) - cogroups = coassignmnet_groups(tasks) + cogroups = coassignment_groups(tasks) assert_disjoint_sets(cogroups) assert len(cogroups) == ddf.npartitions // 2 @@ -283,7 +283,7 @@ def test_repartition_reduce(abcde): e: (f, d1, d2), } tasks = dummy_dsk_to_taskstate(dsk) - cogroups = coassignmnet_groups(tasks) + cogroups = coassignment_groups(tasks) assert_disjoint_sets(cogroups) assert all( From d451823331fb2139585cc59ac7b83969fbb1b5f2 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 19 Oct 2022 10:27:17 +0200 Subject: [PATCH 5/6] Add vorticity test case --- distributed/tests/test_coassignmnet_group.py | 38 ++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/distributed/tests/test_coassignmnet_group.py b/distributed/tests/test_coassignmnet_group.py index 4c3cf08bb59..3b6704458d1 100644 --- a/distributed/tests/test_coassignmnet_group.py +++ b/distributed/tests/test_coassignmnet_group.py @@ -296,3 +296,41 @@ def test_repartition_reduce(abcde): [2 == sum([ts.key.startswith("a") for ts in gr]) for gr in cogroups.values()] ) assert len(cogroups) == 4 + + +def test_vorticity(abcde): + # See https://gist.github.com/TomNicholas/fe9c6b6c415d4fa42523216c87e2fff2 + # https://github.com/dask/distributed/discussions/7128#discussioncomment-3910328 + a, b, c, d, e = abcde + d1, d2, d3, d4, d5, d6, d7 = (d + i for i in "1234567") + e1, e2, e3, e4, e5 = (e + i for i in "12345") + + def _gen_branch(ix): + return { + f"a{ix}": (f,), + f"b{ix}": (f, f"a{ix}"), + f"c{ix}": (f, f"a{ix}"), + f"d{ix}": (f, f"b{ix}", f"c{ix}"), + } + + dsk = {} + for ix in range(1, 8): + dsk.update(_gen_branch(ix)) + + dsk.update( + { + e1: (f, d1, d2, d3, d4, d5), + e2: (f, d3, d4, d5, d6, d7), + } + ) + from dask.base import visualize + + visualize(dsk, color="order") + ordered = dask.order.order(dsk) + tasks = dummy_dsk_to_taskstate(dsk) + cogroups = coassignment_groups(tasks) + assert_disjoint_sets(cogroups) + + assert len(cogroups) == 7 + assert all(len(group) == 4 for group in cogroups.values()) + assert not any(e1 in group or e2 in group for group in cogroups.values()) From d06ce1a38287843f47a868e913fee41d9d86df89 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 19 Oct 2022 10:28:21 +0200 Subject: [PATCH 6/6] Remove list.index calls --- distributed/scheduler.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 27165c1552e..995c0067587 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8478,12 +8478,14 @@ def coassignment_groups( # over ix and check if the next is a dependent or not. I chose to go # this route because this is what we wrote down initially next = min(current.dependents, key=lambda ts: ts.priority) - next_ix = tasks.index(next) + next_ix = ix + while tasks[next_ix] is not next: + next_ix += 1 # Detect a jump if next_ix != ix + 1: while len(next.dependents) == 1: - dep = list(next.dependents)[0] + (dep,) = next.dependents if len(dep.dependencies) != 1: # This algorithm has the shortcoming that groups may grow # too large if we walk straight to the dependent of a group. @@ -8495,7 +8497,9 @@ def coassignment_groups( group_dependents_seen.add(dep) break next = dep - max_prio = tasks.index(next) + 1 + while tasks[next_ix] is not next: + next_ix += 1 + max_prio = next_ix + 1 groups[group] = set(tasks[min_prio:max_prio]) group += 1 ix = max_prio