From add1fa29727b6572ce9fcbf7fd4bdcb9d2952406 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 14 Dec 2023 18:23:12 +0100 Subject: [PATCH 01/10] Allow tasks to override `is_rootish` heuristic (#8412) --- distributed/scheduler.py | 15 ++++++++++++++- distributed/tests/test_scheduler.py | 24 ++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 882bcf0a7b..3a4686010a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1373,6 +1373,15 @@ class TaskState: #: be rejected. run_id: int | None + #: Whether to consider this task rootish in the context of task queueing + #: True + #: Always consider this task rootish + #: False + #: Never consider this task rootish + #: None + #: Use a heuristic to determine whether this task should be considered rootish + _rootish: bool | None + #: Cached hash of :attr:`~TaskState.client_key` _hash: int @@ -1430,6 +1439,7 @@ def __init__( self.metadata = None self.annotations = None self.erred_on = None + self._rootish = None self.run_id = None TaskState._instances.add(self) @@ -2909,8 +2919,11 @@ def is_rootish(self, ts: TaskState) -> bool: Whether ``ts`` is a root or root-like task. Root-ish tasks are part of a group that's much larger than the cluster, - and have few or no dependencies. + and have few or no dependencies. Tasks may also be explicitly marked as rootish + to override this heuristic. """ + if ts._rootish is not None: + return ts._rootish if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions: return False tg = ts.group diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 5485d44e63..d7935a2ba3 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -279,6 +279,30 @@ def random(**kwargs): test_decide_worker_coschedule_order_neighbors_() +@gen_cluster( + client=True, + nthreads=[], +) +async def test_override_is_rootish(c, s): + x = c.submit(lambda x: x + 1, 1, key="x") + await async_poll_for(lambda: "x" in s.tasks, timeout=5) + ts_x = s.tasks["x"] + assert ts_x._rootish is None + assert s.is_rootish(ts_x) + + ts_x._rootish = False + assert not s.is_rootish(ts_x) + + y = c.submit(lambda y: y + 1, 1, key="y", workers=["not-existing"]) + await async_poll_for(lambda: "y" in s.tasks, timeout=5) + ts_y = s.tasks["y"] + assert ts_y._rootish is None + assert not s.is_rootish(ts_y) + + ts_y._rootish = True + assert s.is_rootish(ts_y) + + @pytest.mark.skipif( QUEUING_ON_BY_DEFAULT, reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204", From 78050ccfc25baa84d55466f57965b1b2c04d41de Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 14 Dec 2023 18:27:38 +0100 Subject: [PATCH 02/10] Avoid `RecursionError` when failing to pickle key in `SpillBuffer` and using `tblib=3` (#8404) --- distributed/spill.py | 26 ++++++++++++++----------- distributed/tests/test_spill.py | 5 +++-- distributed/tests/test_worker_memory.py | 3 ++- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/distributed/spill.py b/distributed/spill.py index 9b9d72b58b..cfbd093c40 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -136,25 +136,24 @@ def _handle_errors(self, key: Key | None) -> Iterator[None]: logger.error("Spill to disk failed; keeping data in memory", exc_info=True) raise HandledError() except PickleError as e: - key_e, orig_e = e.args - assert key_e in self.fast - assert key_e not in self.slow - if key_e == key: + assert e.key in self.fast + assert e.key not in self.slow + if e.key == key: assert key is not None # The key we just inserted failed to serialize. # This happens only when the key is individually larger than target. # The exception will be caught by Worker and logged; the status of # the task will be set to error. del self[key] - raise orig_e + raise else: # The key we just inserted is smaller than target, but it caused # another, unrelated key to be spilled out of the LRU, and that key # failed to serialize. There's nothing wrong with the new key. The older # key is still in memory. - if key_e not in self.logged_pickle_errors: - logger.error("Failed to pickle %r", key_e, exc_info=True) - self.logged_pickle_errors.add(key_e) + if e.key not in self.logged_pickle_errors: + logger.error("Failed to pickle %r", e.key, exc_info=True) + self.logged_pickle_errors.add(e.key) raise HandledError() def __setitem__(self, key: Key, value: object) -> None: @@ -267,8 +266,13 @@ class MaxSpillExceeded(Exception): pass -class PickleError(Exception): - pass +class PickleError(TypeError): + def __str__(self) -> str: + return f"Failed to pickle {self.key!r}" + + @property + def key(self) -> Key: + return self.args[0] class HandledError(Exception): @@ -324,7 +328,7 @@ def __setitem__(self, key: Key, value: object) -> None: # zict.LRU ensures that the key remains in fast if we raise. # Wrap the exception so that it's recognizable by SpillBuffer, # which will then unwrap it. - raise PickleError(key, e) + raise PickleError(key) from e # Thanks to Buffer.__setitem__, we never update existing # keys in slow, but always delete them and reinsert them. diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index ef02a2a27f..604f7b98c7 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -219,9 +219,10 @@ def test_spillbuffer_fail_to_serialize(tmp_path): a = Bad(size=201) # Exception caught in the worker - with pytest.raises(TypeError, match="Could not serialize"): + with pytest.raises(TypeError, match="Failed to pickle 'a'") as e: with captured_logger("distributed.spill") as logs_bad_key: buf["a"] = a + assert isinstance(e.value.__cause__.__cause__, MyError) # spill.py must remain silent because we're already logging in worker.py assert not logs_bad_key.getvalue() @@ -240,7 +241,7 @@ def test_spillbuffer_fail_to_serialize(tmp_path): # worker.py won't intercept the exception here, so spill.py must dump the traceback logs_value = logs_bad_key_mem.getvalue() - assert "Failed to pickle" in logs_value # from distributed.spill + assert "Failed to pickle 'b'" in logs_value # from distributed.spill assert "Traceback" in logs_value # from distributed.spill assert_buf(buf, tmp_path, {"b": b, "c": c}, {}) diff --git a/distributed/tests/test_worker_memory.py b/distributed/tests/test_worker_memory.py index 2e02c98a53..fc473ba8d1 100644 --- a/distributed/tests/test_worker_memory.py +++ b/distributed/tests/test_worker_memory.py @@ -174,8 +174,9 @@ async def test_fail_to_pickle_execute_1(c, s, a, b): assert x.status == "error" - with pytest.raises(TypeError, match="Could not serialize"): + with pytest.raises(TypeError, match="Failed to pickle 'x'") as e: await x + assert isinstance(e.value.__cause__.__cause__, CustomError) await assert_basic_futures(c) From 5b0dfa4f4544001475e2265b21fcfa67f6f73812 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 14 Dec 2023 19:14:42 +0100 Subject: [PATCH 03/10] Rewrite `test_subprocess_cluster_does_not_depend_on_logging` (#8409) --- distributed/deploy/tests/test_subprocess.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index e401fb88dc..bbf344b258 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import logging import pytest @@ -76,21 +75,21 @@ async def test_raise_if_scheduler_fails_to_start(): @pytest.mark.skipif(WINDOWS, reason="distributed#7434") +@pytest.mark.slow @gen_test() async def test_subprocess_cluster_does_not_depend_on_logging(): - async def _start(): + with new_config_file( + {"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}} + ): async with SubprocessCluster( asynchronous=True, dashboard_address=":0", scheduler_kwargs={"idle_timeout": "5s"}, worker_kwargs={"death_timeout": "5s"}, - ): - pass - - with new_config_file( - {"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}} - ): - await asyncio.wait_for(_start(), timeout=2) + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + result = await client.submit(lambda x: x + 1, 10) + assert result == 11 @pytest.mark.skipif( From 049759b550473bee768109d95b85f7cd771c08ad Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 15 Dec 2023 15:09:40 +0100 Subject: [PATCH 04/10] Do not reuse shuffles between merges (#8416) --- distributed/shuffle/_merge.py | 16 ++++++++++------ distributed/shuffle/tests/test_merge.py | 10 +++++++--- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index 99490bde8e..945a0011c4 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -361,18 +361,22 @@ def cull(self, keys: Iterable[str], all_keys: Any) -> tuple[HashJoinP2PLayer, di def _construct_graph(self) -> dict[tuple | str, tuple]: token_left = tokenize( - "hash-join", + # Include self.name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. + self.name, self.name_input_left, self.left_on, - self.npartitions, - self.parts_out, + self.left_index, ) token_right = tokenize( - "hash-join", + # Include self.name to ensure that shuffle IDs are unique for individual + # merge operations. Reusing shuffles between merges is dangerous because of + # required coordination and complexity introduced through dynamic clusters. + self.name, self.name_input_right, self.right_on, - self.npartitions, - self.parts_out, + self.right_index, ) dsk: dict[tuple | str, tuple] = {} name_left = "hash-join-transfer-" + token_left diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index 6c68e5e426..284519e675 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -112,7 +112,9 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s # Vary the number of output partitions for the shuffles of dd2 .repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle="p2p") ) - # Generate unique shuffle IDs if the input frame is the same but parameters differ + # Generate unique shuffle IDs if the input frame is the same but + # parameters differ. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. assert sum(id_from_key(k) is not None for k in out.dask) == 4 result = await c.compute(out) expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge( @@ -147,8 +149,10 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, right_on="b", shuffle="p2p", ) - # Generate the same shuffle IDs if the input frame is the same and all its parameters match - assert sum(id_from_key(k) is not None for k in out.dask) == 3 + # Generate unique shuffle IDs if the input frame is the same and all its + # parameters match. Reusing shuffles in merges is dangerous because of the + # required coordination and complexity introduced through dynamic clusters. + assert sum(id_from_key(k) is not None for k in out.dask) == 4 result = await c.compute(out) expected = pdf2.merge( pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b" From 3882bc622e2f277c351cdba8c47b555f26a41b6d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 15 Dec 2023 15:10:49 +0100 Subject: [PATCH 05/10] Ensure that P2P output tasks are never rootish (#8414) --- distributed/shuffle/_scheduler_plugin.py | 27 ++++++++++++++++ distributed/shuffle/tests/test_shuffle.py | 39 +++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index ab54b154f6..f43d4db322 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -145,6 +145,7 @@ def get_or_create( self._raise_if_barrier_unknown(spec.id) self._raise_if_task_not_processing(key) worker_for = self._calculate_worker_for(spec) + self._ensure_output_tasks_are_non_rootish(spec) state = spec.create_new_run(worker_for) self.active_shuffles[spec.id] = state self._shuffles[spec.id].add(state) @@ -194,6 +195,32 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: mapping[partition] = worker return mapping + def _ensure_output_tasks_are_non_rootish(self, spec: ShuffleSpec) -> None: + """Output tasks are created without worker restrictions and run once with the + only purpose of setting the worker restriction and then raising Reschedule, and + then running again properly on the correct worker. It would be non-trivial to + set the worker restriction before they're first run due to potential task + fusion. + + Most times, this lack of initial restrictions would cause output tasks to be + labelled as rootish on their first (very fast) run, which in turn would break + the design assumption that the worker-side queue of rootish tasks will last long + enough to cover the round-trip to the scheduler to receive more tasks, which in + turn would cause a measurable slowdown on the overall runtime of the shuffle + operation. + + This method ensures that, given M output tasks and N workers, each worker-side + queue is pre-loaded with M/N output tasks which can be flushed very fast as + they all raise Reschedule() in quick succession. + + See Also + -------- + ShuffleRun._ensure_output_worker + """ + barrier = self.scheduler.tasks[barrier_key(spec.id)] + for dependent in barrier.dependents: + dependent._rootish = False + @log_errors() def _set_restriction(self, ts: TaskState, worker: str) -> None: if ts.annotations and "shuffle_original_restrictions" in ts.annotations: diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index c04ab49f82..c34a85cd9f 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -2478,6 +2478,45 @@ async def test_unpack_gets_rescheduled_from_non_participating_worker(c, s, a): dd.assert_eq(result, expected) +class BlockedBarrierShuffleSchedulerPlugin(ShuffleSchedulerPlugin): + def __init__(self, scheduler: Scheduler): + super().__init__(scheduler) + self.in_barrier = asyncio.Event() + self.block_barrier = asyncio.Event() + + async def barrier(self, id: ShuffleId, run_id: int, consistent: bool) -> None: + self.in_barrier.set() + await self.block_barrier.wait() + return await super().barrier(id, run_id, consistent) + + +@gen_cluster(client=True) +async def test_unpack_is_non_rootish(c, s, a, b): + with pytest.warns(UserWarning): + scheduler_plugin = BlockedBarrierShuffleSchedulerPlugin(s) + df = dask.datasets.timeseries( + start="2000-01-01", + end="2000-01-21", + dtypes={"x": float, "y": float}, + freq="10 s", + ) + df = df.shuffle("x") + result = c.compute(df) + + await scheduler_plugin.in_barrier.wait() + + unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == "shuffle_p2p"] + assert len(unpack_tss) == 20 + assert not any(s.is_rootish(ts) for ts in unpack_tss) + del unpack_tss + scheduler_plugin.block_barrier.set() + result = await result + + await check_worker_cleanup(a) + await check_worker_cleanup(b) + await check_scheduler_cleanup(s) + + class FlakyConnectionPool(ConnectionPool): def __init__(self, *args, failing_connects=0, **kwargs): self.attempts = 0 From c76b4be834ff4650d0b301ba608b659c68c7682c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 15 Dec 2023 17:49:19 +0100 Subject: [PATCH 06/10] Avoid deadlock in P2P hash join if worker joins late (#8415) --- distributed/shuffle/_scheduler_plugin.py | 48 ++++++++++++++++++-- distributed/shuffle/tests/test_merge.py | 56 +++++++++++++++++++++++- 2 files changed, 99 insertions(+), 5 deletions(-) diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index f43d4db322..3f2f4dc50b 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -171,6 +171,11 @@ def _raise_if_task_not_processing(self, key: Key) -> None: def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: """Pin the outputs of a P2P shuffle to specific workers. + The P2P implementation of a hash join combines the loading of shuffled output + partitions for the left and right side with the actual merge operation into a + single output task. As a consequence, we need to make sure that shuffles with + shared output tasks align on the output mapping. + Parameters ---------- id: ID of the shuffle to pin @@ -181,7 +186,7 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: This function assumes that the barrier task and the output tasks share the same worker restrictions. """ - mapping = {} + existing: dict[Any, str] = {} shuffle_id = spec.id barrier = self.scheduler.tasks[barrier_key(shuffle_id)] @@ -190,10 +195,45 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]: else: workers = list(self.scheduler.workers) + # Check if this shuffle shares output tasks with a different shuffle that has + # already been initialized and needs to be taken into account when + # mapping output partitions to workers. + # Naively, you could delete this whole paragraph and just call + # spec.pick_worker; it would return two identical sets of results on both calls + # of this method... until the set of available workers changes between the two + # calls, which would cause misaligned shuffle outputs and a deadlock. + seen = {barrier} + for dependent in barrier.dependents: + for possible_barrier in dependent.dependencies: + if possible_barrier in seen: + continue + seen.add(possible_barrier) + if not (other_barrier_key := id_from_key(possible_barrier.key)): + continue + if not (shuffle := self.active_shuffles.get(other_barrier_key)): + continue + current_worker_for = shuffle.run_spec.worker_for + # This is a fail-safe for future three-ways merges. At the moment there + # should only ever be at most one other shuffle that shares output + # tasks, so existing will always be empty. + if existing: # pragma: nocover + for shared_key in existing.keys() & current_worker_for.keys(): + if existing[shared_key] != current_worker_for[shared_key]: + raise RuntimeError( + f"Failed to initialize shuffle {spec.id} because " + "it cannot align output partition mappings between " + f"existing shuffles {seen}. " + f"Mismatch encountered for output partition {shared_key!r}: " + f"{existing[shared_key]} != {current_worker_for[shared_key]}." + ) + existing.update(current_worker_for) + + worker_for = {} for partition in spec.output_partitions: - worker = spec.pick_worker(partition, workers) - mapping[partition] = worker - return mapping + if (worker := existing.get(partition, None)) is None: + worker = spec.pick_worker(partition, workers) + worker_for[partition] = worker + return worker_for def _ensure_output_tasks_are_non_rootish(self, spec: ShuffleSpec) -> None: """Output tasks are created without worker restrictions and run once with the diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index 284519e675..7ed6e942c3 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -1,12 +1,18 @@ from __future__ import annotations +import asyncio import contextlib +from typing import Any from unittest import mock import pytest -from distributed.shuffle._core import id_from_key +from dask.typing import Key + +from distributed import Worker +from distributed.shuffle._core import ShuffleId, ShuffleSpec, id_from_key from distributed.shuffle._merge import hash_join +from distributed.shuffle._worker_plugin import ShuffleRun, _ShuffleRunManager from distributed.utils_test import gen_cluster dd = pytest.importorskip("dask.dataframe") @@ -478,3 +484,51 @@ async def test_index_merge_p2p(c, s, a, b, how): ), pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"), ) + + +class LimitedGetOrCreateShuffleRunManager(_ShuffleRunManager): + seen: set[ShuffleId] + block_get_or_create: asyncio.Event + blocking_get_or_create: asyncio.Event + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.seen = set() + self.limit = 1 + self.blocking_get_or_create = asyncio.Event() + self.block_get_or_create = asyncio.Event() + + async def get_or_create(self, spec: ShuffleSpec, key: Key) -> ShuffleRun: + if len(self.seen) >= self.limit and spec.id not in self.seen: + self.blocking_get_or_create.set() + await self.block_get_or_create.wait() + self.seen.add(spec.id) + return await super().get_or_create(spec, key) + + +@mock.patch( + "distributed.shuffle._worker_plugin._ShuffleRunManager", + LimitedGetOrCreateShuffleRunManager, +) +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_merge_does_not_deadlock_if_worker_joins(c, s, a): + """Regression test for https://github.com/dask/distributed/issues/8411""" + pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)}) + pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50}) + df1 = dd.from_pandas(pdf1, npartitions=10) + df2 = dd.from_pandas(pdf2, npartitions=20) + + run_manager_A = a.plugins["shuffle"].shuffle_runs + + joined = dd.merge(df1, df2, left_on="a", right_on="x", shuffle="p2p") + result = c.compute(joined) + + await run_manager_A.blocking_get_or_create.wait() + + async with Worker(s.address) as b: + run_manager_A.block_get_or_create.set() + run_manager_B = b.plugins["shuffle"].shuffle_runs + run_manager_B.block_get_or_create.set() + result = await result + expected = pd.merge(pdf1, pdf2, left_on="a", right_on="x") + assert_eq(result, expected, check_index=False) From 89183a1cb94447ea8046c13760000c0fa8de8230 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 15 Dec 2023 14:40:22 -0600 Subject: [PATCH 07/10] bump version to 2023.12.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index bae4d4d442..f3b385363f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ requires-python = ">=3.9" dependencies = [ "click >= 8.0", "cloudpickle >= 1.5.0", - "dask == 2023.12.0", + "dask == 2023.12.1", "jinja2 >= 2.10.3", "locket >= 1.0.0", "msgpack >= 1.0.0", From 415d4fa5048edb49d0523caa33269b34bcb4eca1 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 18 Dec 2023 09:52:47 +0000 Subject: [PATCH 08/10] Fix flaky test_subprocess_cluster_does_not_depend_on_logging (#8417) --- distributed/deploy/tests/test_subprocess.py | 12 ++++-------- distributed/utils_test.py | 20 +++++++++----------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index bbf344b258..9033e5e22e 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -82,14 +82,10 @@ async def test_subprocess_cluster_does_not_depend_on_logging(): {"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}} ): async with SubprocessCluster( - asynchronous=True, - dashboard_address=":0", - scheduler_kwargs={"idle_timeout": "5s"}, - worker_kwargs={"death_timeout": "5s"}, - ) as cluster: - async with Client(cluster, asynchronous=True) as client: - result = await client.submit(lambda x: x + 1, 10) - assert result == 11 + asynchronous=True, dashboard_address=":0" + ) as cluster, Client(cluster, asynchronous=True) as client: + result = await client.submit(lambda x: x + 1, 10) + assert result == 11 @pytest.mark.skipif( diff --git a/distributed/utils_test.py b/distributed/utils_test.py index f94ae16559..37df9ba57a 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1500,7 +1500,7 @@ def new_config(new_config): @contextmanager -def new_config_file(c): +def new_config_file(c: dict[str, Any]) -> Iterator[None]: """ Temporarily change configuration file to match dictionary *c*. """ @@ -1508,18 +1508,16 @@ def new_config_file(c): old_file = os.environ.get("DASK_CONFIG") fd, path = tempfile.mkstemp(prefix="dask-config") + with os.fdopen(fd, "w") as f: + yaml.dump(c, f) + os.environ["DASK_CONFIG"] = path try: - with os.fdopen(fd, "w") as f: - f.write(yaml.dump(c)) - os.environ["DASK_CONFIG"] = path - try: - yield - finally: - if old_file: - os.environ["DASK_CONFIG"] = old_file - else: - del os.environ["DASK_CONFIG"] + yield finally: + if old_file: + os.environ["DASK_CONFIG"] = old_file + else: + del os.environ["DASK_CONFIG"] os.remove(path) From 53e95ec8f359e7e8cea1e6e06da7129de2742ee6 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Mon, 18 Dec 2023 12:32:01 +0100 Subject: [PATCH 09/10] [Adaptive] Do not allow workers to downscale if they are running long-running tasks (e.g. `worker_client`) (#7481) --- distributed/scheduler.py | 7 ++++++- distributed/tests/test_scheduler.py | 32 +++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3a4686010a..e7325b7a88 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6935,7 +6935,12 @@ def workers_to_close( if isinstance(key, bytes): key = pickle.loads(key) - groups = groupby(key, self.workers.values()) + # Long running tasks typically use a worker_client to schedule + # other tasks. We should never shut down the worker they're + # running on, as it would cause them to restart from scratch + # somewhere else. + valid_workers = [ws for ws in self.workers.values() if not ws.long_running] + groups = groupby(key, valid_workers) limit_bytes = {k: sum(ws.memory_limit for ws in v) for k, v in groups.items()} group_bytes = {k: sum(ws.nbytes for ws in v) for k, v in groups.items()} diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index d7935a2ba3..0fb0974b5e 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1604,6 +1604,38 @@ def key(ws): assert set(s.workers_to_close(key=key)) == {workers[0].address, workers[1].address} +@pytest.mark.parametrize("reverse", [True, False]) +@gen_cluster(client=True) +async def test_workers_to_close_never_close_long_running(c, s, a, b, reverse): + if reverse: + a, b = b, a + wait_evt = Event() + + def executing(evt): + evt.wait() + + def long_running_secede(evt): + secede() + evt.wait() + + assert a.address in s.workers_to_close() + assert b.address in s.workers_to_close() + long_fut = c.submit(long_running_secede, wait_evt, workers=[a.address]) + wsA = s.workers[a.address] + while not wsA.long_running: + await asyncio.sleep(0.01) + assert s.workers_to_close() == [b.address] + futs = [c.submit(executing, wait_evt, workers=[b.address]) for _ in range(10)] + assert a.address not in s.workers_to_close(n=2) + while not b.state.tasks: + await asyncio.sleep(0.01) + assert s.workers_to_close() == [] + assert s.workers_to_close(n=1) == [b.address] + assert s.workers_to_close(n=2) == [b.address] + + await wait_evt.set() + + @gen_cluster(client=True) async def test_retire_workers_no_suspicious_tasks(c, s, a, b): future = c.submit( From 8c3eb6f0bf47d124c887c543599d80ff09c3f5ed Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Mon, 18 Dec 2023 03:32:19 -0800 Subject: [PATCH 10/10] Fix inconsistent hashing for Nanny-spawned workers (#8400) --- distributed/nanny.py | 7 +++++++ distributed/tests/test_dask_collections.py | 13 +++++++++++++ distributed/tests/test_nanny.py | 5 ++++- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 021b41edfc..2c14224d36 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -233,6 +233,13 @@ def __init__( # type: ignore[no-untyped-def] self.Worker = Worker if worker_class is None else worker_class self.pre_spawn_env = _get_env_variables("distributed.nanny.pre-spawn-environ") + # To get consistent hashing on subprocesses, we need to set a consistent seed for + # the Python hash algorithm; xref https://github.com/dask/distributed/pull/8400 + if self.pre_spawn_env.get("PYTHONHASHSEED") in (None, "0"): + # This number is arbitrary; it was chosen to commemorate + # https://github.com/dask/dask/issues/6640. + self.pre_spawn_env.update({"PYTHONHASHSEED": "6640"}) + self.env = merge( self.pre_spawn_env, _get_env_variables("distributed.nanny.environ"), diff --git a/distributed/tests/test_dask_collections.py b/distributed/tests/test_dask_collections.py index 12b9417d9a..8a9829c536 100644 --- a/distributed/tests/test_dask_collections.py +++ b/distributed/tests/test_dask_collections.py @@ -13,6 +13,7 @@ import dask.dataframe as dd from distributed.client import wait +from distributed.nanny import Nanny from distributed.utils_test import gen_cluster dfs = [ @@ -124,6 +125,18 @@ async def test_bag_groupby_tasks_default(c, s, a, b): assert not any("partd" in k[0] for k in b2.dask) +@gen_cluster(client=True, Worker=Nanny) +async def test_bag_groupby_key_hashing(c, s, a, b): + # https://github.com/dask/distributed/issues/4141 + dsk = {("x", 0): (range, 5), ("x", 1): (range, 5), ("x", 2): (range, 5)} + grouped = db.Bag(dsk, "x", 3).groupby(lambda x: "even" if x % 2 == 0 else "odd") + remote = c.compute(grouped) + result = await remote + assert len(result) == 2 + assert ("odd", [1, 3] * 3) in result + assert ("even", [0, 2, 4] * 3) in result + + @pytest.mark.parametrize("wait", [wait, lambda x: None]) def test_dataframe_set_index_sync(wait, client): df = dask.datasets.timeseries( diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index f0e824d3ed..74d049a35d 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -340,10 +340,13 @@ async def test_environment_variable_config(c, s, monkeypatch): }, ) async def test_environment_variable_pre_post_spawn(c, s, n): - assert n.env == {"PRE-SPAWN": "1", "POST-SPAWN": "2"} + assert n.env == {"PRE-SPAWN": "1", "POST-SPAWN": "2", "PYTHONHASHSEED": "6640"} results = await c.run(lambda: os.environ) assert results[n.worker_address]["PRE-SPAWN"] == "1" assert results[n.worker_address]["POST-SPAWN"] == "2" + # if unset in pre-spawn-environ config, PYTHONHASHSEED defaults to "6640" to ensure + # consistent hashing across workers; https://github.com/dask/distributed/issues/4141 + assert results[n.worker_address]["PYTHONHASHSEED"] == "6640" del os.environ["PRE-SPAWN"] assert "POST-SPAWN" not in os.environ