Skip to content

Commit

Permalink
Fix a deadlock when queued tasks are resubmitted quickly in succession (
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Nov 25, 2022
1 parent b6bff75 commit 7c414cf
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 3 deletions.
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4637,7 +4637,7 @@ def stimulus_task_finished(self, key=None, worker=None, stimulus_id=None, **kwar

ws: WorkerState = self.workers[worker]
ts: TaskState = self.tasks.get(key)
if ts is None or ts.state == "released":
if ts is None or ts.state in ("released", "queued"):
logger.debug(
"Received already computed task, worker: %s, state: %s"
", key: %s, who_has: %s",
Expand Down
101 changes: 99 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@

pytestmark = pytest.mark.ci1


QUEUING_ON_BY_DEFAULT = math.isfinite(
float(dask.config.get("distributed.scheduler.worker-saturation"))
)
alice = "alice:1234"
bob = "bob:1234"

Expand Down Expand Up @@ -257,7 +259,7 @@ def random(**kwargs):


@pytest.mark.skipif(
math.isfinite(float(dask.config.get("distributed.scheduler.worker-saturation"))),
QUEUING_ON_BY_DEFAULT,
reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204",
)
@gen_cluster(
Expand Down Expand Up @@ -4150,3 +4152,98 @@ async def test_transition_waiting_memory(c, s, a, b):
assert s.tasks["x"].state == "no-worker"
assert s.tasks["y"].state == "waiting"
assert_story(s.story("y"), [("y", "waiting", "waiting", {})])


@pytest.mark.parametrize(
"rootish",
[
pytest.param(
True,
marks=pytest.mark.skipif(
not QUEUING_ON_BY_DEFAULT,
reason="Nothing will be classified as root-ish",
),
),
False,
],
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_deadlock_resubmit_queued_tasks_fast(c, s, a, rootish):
# See https://github.com/dask/distributed/issues/7200
block = Event()
block2 = Event()
executing = Event()
executing2 = Event()

def block_on_event(*args, block, executing):
executing.set()
block.wait()

if rootish:
ntasks = s.total_nthreads * 2 + 1
else:
ntasks = 1
keys = [f"fut-{i}" for i in range(ntasks)]

def submit_tasks():
# Use case would be a client rescheduling the same or a similar graph
# multiple times, closely followed
# df.head()
# df.size.compute()
# We're emulating this by submitting the sames *keys*
return c.map(
block_on_event, range(len(keys)), block=block, executing=executing, key=keys
)

def assert_rootish():
# Just to verify our assumptions in case the definition changes. This is
# currently a bit brittle
if rootish:
assert all(s.is_rootish(s.tasks[k]) for k in keys)
else:
assert not any(s.is_rootish(s.tasks[k]) for k in keys)

f1 = submit_tasks()
# Make sure that the worker is properly saturated
nblocking_tasks = 5

# This set of tasks is there to guarantee that the worker is saturated after
# releasing the first set of tasks s.t. a subsequent submission would run
# into queuing
fut2 = c.map(
block_on_event, range(nblocking_tasks), block=block2, executing=executing2
)

# Once the task is on the threadpool, the client/scheduler may start its
# release chain
await executing.wait()

assert len(a.state.tasks)
# To trigger this condition, the scheduler needs to receive the
# `task-finished` message after it performed the client release transitions
# Therefore, the worker must not receive the `free-keys`` signal before it
# can finish the task since otherwise the worker would recognize it as
# cancelled and would forget about it. We emulate this behavior by blocking
# the outgoing scheduler stream until that happens, i.e. this introduces
# artifical latency
with freeze_batched_send(s.stream_comms[a.address]):
del f1
while any(k in s.tasks for k in keys):
await asyncio.sleep(0.005)

assert len(s.tasks) == nblocking_tasks
fut3 = submit_tasks()
while len(s.tasks) == nblocking_tasks:
await asyncio.sleep(0.005)
assert_rootish()
if rootish:
assert all(s.tasks[k] in s.queued for k in keys)
await block.set()
# At this point we need/want to wait for the task-finished message to
# arrive on the scheduler. There is no proper hook to wait, therefore we
# sleep
await asyncio.sleep(0.2)
# Everything should finish properly after this
await block2.set()
await c.gather(fut2)
await c.gather(fut3)

0 comments on commit 7c414cf

Please sign in to comment.