Skip to content

Commit

Permalink
Break the state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 30, 2022
1 parent c857e92 commit 950e4bc
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@
from distributed.worker_state_machine import (
AcquireReplicasEvent,
ComputeTaskEvent,
Execute,
ExecuteFailureEvent,
ExecuteSuccessEvent,
FreeKeysEvent,
RemoveReplicasEvent,
RescheduleEvent,
SerializedTask,
Expand Down Expand Up @@ -1206,6 +1208,45 @@ def f(x):
assert all(f.key in b.data for f in futures)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_reschedule_released(c, s, a):
"""By the time the task raises Reschedule(), the client has released the task."""
ev1 = Event()
ev2 = Event()

def f(ev1, ev2):
ev1.set()
ev2.wait()
raise Reschedule()

x = c.submit(f, ev1, ev2, key="x")
await ev1.wait()
x.release()
while "x" in s.tasks:
await asyncio.sleep(0.01)

with captured_logger("distributed.scheduler") as logger:
await ev2.set()
while "x" in a.tasks:
await asyncio.sleep(0.01)
assert (
"Attempting to reschedule task x, which was not found on the scheduler"
in logger.getvalue()
)


def test_reschedule_released_worker_state(ws):
"""Same as test_reschedule_released"""
instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy(key="x", stimulus_id="s1"),
FreeKeysEvent(keys=["x"], stimulus_id="s2"),
RescheduleEvent(key="x", stimulus_id="s3"),
)
# There's no RescheduleMsg
assert instructions == [Execute(key="x", stimulus_id="s1")]
assert not ws.tasks


@gen_cluster(nthreads=[])
async def test_deque_handler(s):
from distributed.worker import logger
Expand Down

0 comments on commit 950e4bc

Please sign in to comment.