From 34c506a8e66ae92d15cd3114738b4d396c03b980 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 23 Sep 2022 17:45:54 -0600 Subject: [PATCH 01/11] Fix decide_worker_rootish_queuing_disabled assert --- distributed/scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cf240240cfb..3a4bb76968a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2104,6 +2104,8 @@ def decide_worker_rootish_queuing_disabled( ): # Last-used worker is full or unknown; pick a new worker for the next few tasks ws = min(pool, key=partial(self.worker_objective, ts)) + if self.validate: + assert ws in self.running, (ws, self.running) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads ) @@ -2118,7 +2120,8 @@ def decide_worker_rootish_queuing_disabled( if self.validate and ws is not None: assert self.workers.get(ws.address) is ws - assert ws in self.running, (ws, self.running) + # NOTE: `ws` may not be running currently. Though not ideal, it's necessary for co-assignment: + # we need to send all tasks in order to this worker, and let it decide what to do with them. return ws From 86c26b0359f0597dccc31113d6ed1288734b886b Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 25 Oct 2022 12:18:23 -0400 Subject: [PATCH 02/11] Revert "Fix decide_worker_rootish_queuing_disabled assert" This reverts commit f8b84c08ac6a388324ec22731aa03961fb326153. --- distributed/scheduler.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3a4bb76968a..cf240240cfb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2104,8 +2104,6 @@ def decide_worker_rootish_queuing_disabled( ): # Last-used worker is full or unknown; pick a new worker for the next few tasks ws = min(pool, key=partial(self.worker_objective, ts)) - if self.validate: - assert ws in self.running, (ws, self.running) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads ) @@ -2120,8 +2118,7 @@ def decide_worker_rootish_queuing_disabled( if self.validate and ws is not None: assert self.workers.get(ws.address) is ws - # NOTE: `ws` may not be running currently. Though not ideal, it's necessary for co-assignment: - # we need to send all tasks in order to this worker, and let it decide what to do with them. + assert ws in self.running, (ws, self.running) return ws From e318f472a05bbf9abe605def0b44ad66336c24ea Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 25 Oct 2022 12:38:00 -0400 Subject: [PATCH 03/11] Don't use last worker if it's not running The assertion probably did make sense. We shouldn't assign tasks to non-running workers. --- distributed/scheduler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cf240240cfb..802b32e6f59 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2100,9 +2100,12 @@ def decide_worker_rootish_queuing_disabled( tg = ts.group lws = tg.last_worker if not ( - lws and tg.last_worker_tasks_left and self.workers.get(lws.address) is lws + lws + and tg.last_worker_tasks_left + and self.workers.get(lws.address) is lws + and lws.status == Status.running ): - # Last-used worker is full or unknown; pick a new worker for the next few tasks + # Last-used worker is full, unknown, or non-running; pick a new worker for the next few tasks ws = min(pool, key=partial(self.worker_objective, ts)) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads From 3d33954ad11888e89c54ff18be42908e5f983578 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 25 Oct 2022 12:39:39 -0400 Subject: [PATCH 04/11] improve readability --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 802b32e6f59..2e6f98d521b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2099,19 +2099,19 @@ def decide_worker_rootish_queuing_disabled( tg = ts.group lws = tg.last_worker - if not ( + if ( lws and tg.last_worker_tasks_left - and self.workers.get(lws.address) is lws and lws.status == Status.running + and self.workers.get(lws.address) is lws ): + ws = lws + else: # Last-used worker is full, unknown, or non-running; pick a new worker for the next few tasks ws = min(pool, key=partial(self.worker_objective, ts)) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads ) - else: - ws = lws # Record `last_worker`, or clear it on the final task tg.last_worker = ( From 5ada8d4ef320e262f9591c206683610b88008c8c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 27 Oct 2022 11:08:23 +0100 Subject: [PATCH 05/11] test assertions on queuing --- distributed/scheduler.py | 3 ++ distributed/tests/test_scheduler.py | 68 +++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d7f596c04f3..ab45c6836a2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2090,6 +2090,9 @@ def decide_worker_rootish_queuing_disabled( returns None, in which case the task should be transitioned to ``no-worker``. """ + print( + f"decide_worker_rootish_queuing_disabled({ts.key}) tg={ts.group} lws={ts.group.last_worker}" + ) if self.validate: # See root-ish-ness note below in `decide_worker_rootish_queuing_enabled` assert math.isinf(self.WORKER_SATURATION) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 3c353901b74..7e6ce6d2896 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -43,6 +43,7 @@ from distributed.utils import TimeoutError from distributed.utils_test import ( NO_AMM, + BlockedGatherDep, BrokenComm, async_wait_for, captured_logger, @@ -253,6 +254,73 @@ def random(**kwargs): test_decide_worker_coschedule_order_neighbors_() +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.scheduler.work-stealing": False}, + timeout=3, +) +async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): + """https://github.com/dask/distributed/issues/7063""" + # Put a task in memory on the worker to be retired and prevent the other from + # acquiring a replica. This will cause a to be stuck in closing_gracefully later on, + # until we set b.block_gather_dep. + m = (await c.scatter({"m": 1}, workers=[a.address]))["m"] + + evx = [Event() for _ in range(3)] + evy = Event() + + async with BlockedGatherDep(s.address, nthreads=1) as b: + xs = [ + c.submit(lambda ev: ev.wait(), evx[0], key="x-0", workers=[a.address]), + c.submit(lambda ev: ev.wait(), evx[1], key="x-1", workers=[a.address]), + c.submit(lambda ev: ev.wait(), evx[2], key="x-2", workers=[b.address]), + ] + ys = [ + c.submit(lambda x, ev: ev.wait(), xs[0], evy, key="y-0"), + c.submit(lambda x, ev: ev.wait(), xs[0], evy, key="y-1"), + c.submit(lambda x, ev: ev.wait(), xs[1], evy, key="y-2"), + c.submit(lambda x, ev: ev.wait(), xs[2], evy, key="y-3"), + c.submit(lambda x, ev: ev.wait(), xs[2], evy, key="y-4"), + c.submit(lambda x, ev: ev.wait(), xs[2], evy, key="y-5"), + ] + + while a.state.executing_count != 1 or b.state.executing_count != 1: + await asyncio.sleep(0.01) + + # - y-2 has no restrictions + # - TaskGroup(y) has more than 4 tasks (total_nthreads * 2) + # - TaskGroup(y) has less than 5 dependency groups + # - TaskGroup(y) has less than 5 dependency tasks + assert s.is_rootish(s.tasks["y-2"]) + + await evx[0].set() + await wait_for_state("y-0", "processing", s) + await wait_for_state("y-1", "processing", s) + assert s.tasks["y-2"].group.last_worker == s.workers[a.address] + assert s.tasks["y-2"].group.last_worker_tasks_left == 1 + + # Take a out of the running pool, but without removing it from the cluster + # completely + retire_task = asyncio.create_task(c.retire_workers([a.address])) + # Wait until AMM sends AcquireReplicasEvent to b to move away m + await b.in_gather_dep.wait() + assert s.workers[a.address].status == Status.closing_gracefully + + # Transition y-2 to processing. Normally, it would be scheduled on a, but it's + # not a running worker, so we must choose b + await evx[1].set() + await wait_for_state("y-2", "processing", s) + await wait_for_state("y-2", "ready", b) + + # Cleanup + b.block_gather_dep.set() + await evx[2].set() + await evy.set() + await retire_task + await wait(xs + ys) + + @pytest.mark.slow @gen_cluster( nthreads=[("", 2)] * 4, From 5b7869cde672fed0f97a4e01e6623963a1aa428e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 27 Oct 2022 11:12:16 +0100 Subject: [PATCH 06/11] tweak --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 7e6ce6d2896..bc18c6600b7 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -311,7 +311,7 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): # not a running worker, so we must choose b await evx[1].set() await wait_for_state("y-2", "processing", s) - await wait_for_state("y-2", "ready", b) + await wait_for_state("y-2", "waiting", b) # Cleanup b.block_gather_dep.set() From 06fff7c368a64e555e654085a3092f0df19058dd Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 27 Oct 2022 11:20:14 +0100 Subject: [PATCH 07/11] polish --- distributed/scheduler.py | 6 ++---- distributed/tests/test_scheduler.py | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 70b13652919..88f45724a42 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2090,9 +2090,6 @@ def decide_worker_rootish_queuing_disabled( returns None, in which case the task should be transitioned to ``no-worker``. """ - print( - f"decide_worker_rootish_queuing_disabled({ts.key}) tg={ts.group} lws={ts.group.last_worker}" - ) if self.validate: # See root-ish-ness note below in `decide_worker_rootish_queuing_enabled` assert math.isinf(self.WORKER_SATURATION) @@ -2111,7 +2108,8 @@ def decide_worker_rootish_queuing_disabled( ): ws = lws else: - # Last-used worker is full, unknown, or non-running; pick a new worker for the next few tasks + # Last-used worker is full, unknown, retiring, or paused; + # pick a new worker for the next few tasks ws = min(pool, key=partial(self.worker_objective, ts)) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index bc18c6600b7..b529e49634d 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -258,7 +258,6 @@ def random(**kwargs): client=True, nthreads=[("", 1)], config={"distributed.scheduler.work-stealing": False}, - timeout=3, ) async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): """https://github.com/dask/distributed/issues/7063""" @@ -311,7 +310,7 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): # not a running worker, so we must choose b await evx[1].set() await wait_for_state("y-2", "processing", s) - await wait_for_state("y-2", "waiting", b) + await wait_for_state("y-2", "waiting", b) # x-1 is in memory on a # Cleanup b.block_gather_dep.set() From 7eb4649bb61c6e464eea61292a2696a7eb200cd8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 27 Oct 2022 11:38:50 -0600 Subject: [PATCH 08/11] skip test when queuing is active --- distributed/tests/test_scheduler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index b529e49634d..1ede12b486b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -254,6 +254,10 @@ def random(**kwargs): test_decide_worker_coschedule_order_neighbors_() +@pytest.mark.skipif( + math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")), + reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204", +) @gen_cluster( client=True, nthreads=[("", 1)], From d82d48737c79fa021a46681110cf4300591c8397 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 27 Oct 2022 16:00:44 -0600 Subject: [PATCH 09/11] Fix flaky test_include_communication_in_occupancy --- distributed/tests/test_scheduler.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 1ede12b486b..6e84db227d9 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1543,7 +1543,12 @@ async def test_balance_many_workers_2(c, s, *workers): assert {len(w.has_what) for w in s.workers.values()} == {3} -@gen_cluster(client=True) +@gen_cluster( + client=True, + worker_kwargs={ + "heartbeat_interval": "10s", # prevent worker from updating executing task durations + }, +) async def test_include_communication_in_occupancy(c, s, a, b): x = c.submit(operator.mul, b"0", int(s.bandwidth) * 2, workers=a.address) y = c.submit(operator.mul, b"1", int(s.bandwidth * 3), workers=b.address) From 92595194330a9c105bc2d87ff3b1f9386915d3a4 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 28 Oct 2022 12:28:17 +0100 Subject: [PATCH 10/11] Fix test_stress_creation_and_deletion --- .github/workflows/tests.yaml | 3 +-- distributed/tests/test_stress.py | 11 +++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a53e9ec80b6..d093aac348f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -162,8 +162,7 @@ jobs: set -o pipefail mkdir reports - pytest distributed \ - -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ + pytest distributed/tests/test_stress.py::test_stress_creation_and_deletion --runslow --count=20 \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index a0a880737ce..324412a6ee9 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -11,7 +11,7 @@ from dask import delayed -from distributed import Client, Nanny, wait +from distributed import Client, Nanny, Worker, wait from distributed.chaos import KillWorker from distributed.compatibility import WINDOWS from distributed.metrics import time @@ -91,14 +91,10 @@ def test_cancel_stress_sync(loop): c.cancel(f) -@pytest.mark.xfail( - reason="Flaky and re-fails on rerun. See https://github.com/dask/distributed/issues/5388" -) @pytest.mark.slow @gen_cluster( nthreads=[], client=True, - timeout=180, scheduler_kwargs={"allowed_failures": 100_000}, ) async def test_stress_creation_and_deletion(c, s): @@ -113,13 +109,12 @@ async def test_stress_creation_and_deletion(c, s): async def create_and_destroy_worker(delay): start = time() while time() < start + 5: - async with Nanny(s.address, nthreads=2) as n: + async with Worker(s.address, nthreads=2) as n: await asyncio.sleep(delay) - print("Killed nanny") await asyncio.gather(*(create_and_destroy_worker(0.1 * i) for i in range(20))) - async with Nanny(s.address, nthreads=2): + async with Worker(s.address, nthreads=2): assert await c.compute(z) == 8000884.93 From f34b72ad39a010327f1c14c64d8c69adfb784c59 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 28 Oct 2022 13:30:29 +0100 Subject: [PATCH 11/11] revert stress test --- .github/workflows/tests.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index d093aac348f..a53e9ec80b6 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -162,7 +162,8 @@ jobs: set -o pipefail mkdir reports - pytest distributed/tests/test_stress.py::test_stress_creation_and_deletion --runslow --count=20 \ + pytest distributed \ + -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \