From 7c414cfb04f1d0e8c98f51256b026e8ce59968ad Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Fri, 25 Nov 2022 16:59:17 +0100 Subject: [PATCH] Fix a deadlock when queued tasks are resubmitted quickly in succession (#7348) --- distributed/scheduler.py | 2 +- distributed/tests/test_scheduler.py | 101 +++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 54731bbd41c..ad2d360a66e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4637,7 +4637,7 @@ def stimulus_task_finished(self, key=None, worker=None, stimulus_id=None, **kwar ws: WorkerState = self.workers[worker] ts: TaskState = self.tasks.get(key) - if ts is None or ts.state == "released": + if ts is None or ts.state in ("released", "queued"): logger.debug( "Received already computed task, worker: %s, state: %s" ", key: %s, who_has: %s", diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index af5529f925f..e0aae019371 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -70,7 +70,9 @@ pytestmark = pytest.mark.ci1 - +QUEUING_ON_BY_DEFAULT = math.isfinite( + float(dask.config.get("distributed.scheduler.worker-saturation")) +) alice = "alice:1234" bob = "bob:1234" @@ -257,7 +259,7 @@ def random(**kwargs): @pytest.mark.skipif( - math.isfinite(float(dask.config.get("distributed.scheduler.worker-saturation"))), + QUEUING_ON_BY_DEFAULT, reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204", ) @gen_cluster( @@ -4150,3 +4152,98 @@ async def test_transition_waiting_memory(c, s, a, b): assert s.tasks["x"].state == "no-worker" assert s.tasks["y"].state == "waiting" assert_story(s.story("y"), [("y", "waiting", "waiting", {})]) + + +@pytest.mark.parametrize( + "rootish", + [ + pytest.param( + True, + marks=pytest.mark.skipif( + not QUEUING_ON_BY_DEFAULT, + reason="Nothing will be classified as root-ish", + ), + ), + False, + ], +) +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_deadlock_resubmit_queued_tasks_fast(c, s, a, rootish): + # See https://github.com/dask/distributed/issues/7200 + block = Event() + block2 = Event() + executing = Event() + executing2 = Event() + + def block_on_event(*args, block, executing): + executing.set() + block.wait() + + if rootish: + ntasks = s.total_nthreads * 2 + 1 + else: + ntasks = 1 + keys = [f"fut-{i}" for i in range(ntasks)] + + def submit_tasks(): + # Use case would be a client rescheduling the same or a similar graph + # multiple times, closely followed + # df.head() + # df.size.compute() + # We're emulating this by submitting the sames *keys* + return c.map( + block_on_event, range(len(keys)), block=block, executing=executing, key=keys + ) + + def assert_rootish(): + # Just to verify our assumptions in case the definition changes. This is + # currently a bit brittle + if rootish: + assert all(s.is_rootish(s.tasks[k]) for k in keys) + else: + assert not any(s.is_rootish(s.tasks[k]) for k in keys) + + f1 = submit_tasks() + # Make sure that the worker is properly saturated + nblocking_tasks = 5 + + # This set of tasks is there to guarantee that the worker is saturated after + # releasing the first set of tasks s.t. a subsequent submission would run + # into queuing + fut2 = c.map( + block_on_event, range(nblocking_tasks), block=block2, executing=executing2 + ) + + # Once the task is on the threadpool, the client/scheduler may start its + # release chain + await executing.wait() + + assert len(a.state.tasks) + # To trigger this condition, the scheduler needs to receive the + # `task-finished` message after it performed the client release transitions + # Therefore, the worker must not receive the `free-keys`` signal before it + # can finish the task since otherwise the worker would recognize it as + # cancelled and would forget about it. We emulate this behavior by blocking + # the outgoing scheduler stream until that happens, i.e. this introduces + # artifical latency + with freeze_batched_send(s.stream_comms[a.address]): + del f1 + while any(k in s.tasks for k in keys): + await asyncio.sleep(0.005) + + assert len(s.tasks) == nblocking_tasks + fut3 = submit_tasks() + while len(s.tasks) == nblocking_tasks: + await asyncio.sleep(0.005) + assert_rootish() + if rootish: + assert all(s.tasks[k] in s.queued for k in keys) + await block.set() + # At this point we need/want to wait for the task-finished message to + # arrive on the scheduler. There is no proper hook to wait, therefore we + # sleep + await asyncio.sleep(0.2) + # Everything should finish properly after this + await block2.set() + await c.gather(fut2) + await c.gather(fut3)