From 950e4bc4baa752c7275f0fd8e19066b4786e3f6c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 30 Jun 2022 12:27:23 +0100 Subject: [PATCH] Break the state machine --- distributed/tests/test_worker.py | 41 ++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 57b746448df..17c2e2482f4 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -79,8 +79,10 @@ from distributed.worker_state_machine import ( AcquireReplicasEvent, ComputeTaskEvent, + Execute, ExecuteFailureEvent, ExecuteSuccessEvent, + FreeKeysEvent, RemoveReplicasEvent, RescheduleEvent, SerializedTask, @@ -1206,6 +1208,45 @@ def f(x): assert all(f.key in b.data for f in futures) +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_reschedule_released(c, s, a): + """By the time the task raises Reschedule(), the client has released the task.""" + ev1 = Event() + ev2 = Event() + + def f(ev1, ev2): + ev1.set() + ev2.wait() + raise Reschedule() + + x = c.submit(f, ev1, ev2, key="x") + await ev1.wait() + x.release() + while "x" in s.tasks: + await asyncio.sleep(0.01) + + with captured_logger("distributed.scheduler") as logger: + await ev2.set() + while "x" in a.tasks: + await asyncio.sleep(0.01) + assert ( + "Attempting to reschedule task x, which was not found on the scheduler" + in logger.getvalue() + ) + + +def test_reschedule_released_worker_state(ws): + """Same as test_reschedule_released""" + instructions = ws.handle_stimulus( + ComputeTaskEvent.dummy(key="x", stimulus_id="s1"), + FreeKeysEvent(keys=["x"], stimulus_id="s2"), + RescheduleEvent(key="x", stimulus_id="s3"), + ) + # There's no RescheduleMsg + assert instructions == [Execute(key="x", stimulus_id="s1")] + assert not ws.tasks + + @gen_cluster(nthreads=[]) async def test_deque_handler(s): from distributed.worker import logger