diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a1d5ad2abd2..3d630cd31b5 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -82,6 +82,7 @@ ComputeTaskEvent, ExecuteFailureEvent, ExecuteSuccessEvent, + FreeKeysEvent, RemoveReplicasEvent, SerializedTask, StealRequestEvent, @@ -3505,10 +3506,13 @@ def teardown(self, worker): @gen_cluster(client=True, nthreads=[("", 1)]) async def test_execute_preamble_abort_retirement(c, s, a): """Test race condition in the preamble of Worker.execute(), which used to cause a - task to remain permanently in running state in case of very tight timings when + task to remain permanently in executing state in case of very tight timings when exiting the closing_gracefully status. - See https://github.com/dask/distributed/issues/6867 + See also + -------- + https://github.com/dask/distributed/issues/6867 + test_execute_preamble_early_resume """ x = c.submit(inc, 1, key="x") await wait_for_state("x", "executing", a, interval=0) @@ -3525,10 +3529,49 @@ async def test_execute_preamble_abort_retirement(c, s, a): # Run Worker.execute await asyncio.sleep(0) - # The Worker.execute should have detected the closing_gracefully status and - # performed an early exit. + # Worker.execute should have detected the closing_gracefully status and performed an + # early exit. a.handle_worker_status_change("running", stimulus_id="test") # Test that x does not get stuck. assert await x == 2 + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_execute_preamble_early_resume(c, s, a): + """Test race condition in the preamble of Worker.execute(), which used to cause a + task to remain permanently in resumed state in case of very tight timings when + resuming a task. + + See also + -------- + https://github.com/dask/distributed/issues/6869 + test_execute_preamble_abort_retirement + """ + x = c.submit(inc, 1, key="x") + await wait_for_state("x", "executing", a, interval=0) + # The Worker.execute asyncio task has just been created and is scheduled to + # first run somewhere after this test function in the event loop + + # Simulate a cancellation request from the scheduler, which is almost immediately + # reversed. As it would be exceedingly hard to get the right timings with the real + # thing, we're instead tampering directly with the Worker endpoint. + a.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="free")) + assert a.state.tasks["x"].state == "cancelled" + + # Run Worker.execute + await asyncio.sleep(0) + # Worker.execute should have detected the cancelled state and + # performed an early exit. + + # The exit callback of Worker.execute hasn't been run yet + assert a.state.tasks["x"].state == "cancelled" + + # Resume the task. No need for run_spec etc, as the WorkerState will detect that + # the previous asyncio task is still running and it will resume it. + a.handle_stimulus(ComputeTaskEvent.dummy(key="x", stimulus_id="resume")) + assert a.state.tasks["x"].state == "executing" + + # Test that x does not get stuck. + assert await x == 2 diff --git a/distributed/worker.py b/distributed/worker.py index 0222f63dd69..755544f8748 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2153,15 +2153,11 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: """ if self.status not in WORKER_ANY_RUNNING: return RescheduleEvent(key=key, stimulus_id=f"worker-closing-{time()}") - # The key *must* be in the state - ts = self.state.tasks[key] + # The key *must* be in the worker state + ts = self.state.tasks[key] if ts.state == "cancelled": - logger.debug( - "Trying to execute task %s which is not in executing state anymore", - ts, - ) - return AlreadyCancelledEvent(key=ts.key, stimulus_id=stimulus_id) + return RescheduleEvent(key=key, stimulus_id=f"already-cancelled-{time()}") try: function, args, kwargs = await self._maybe_deserialize_task(ts)