Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock when queued tasks are resubmitted quickly in succession #7348

Merged
merged 4 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4702,7 +4702,7 @@ def stimulus_task_finished(self, key=None, worker=None, stimulus_id=None, **kwar

ws: WorkerState = self.workers[worker]
ts: TaskState = self.tasks.get(key)
if ts is None or ts.state == "released":
if ts is None or ts.state in ("released", "queued"):
logger.debug(
"Received already computed task, worker: %s, state: %s"
", key: %s, who_has: %s",
Expand Down
101 changes: 99 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@

pytestmark = pytest.mark.ci1


QUEUING_ON_BY_DEFAULT = math.isfinite(
float(dask.config.get("distributed.scheduler.worker-saturation"))
)
alice = "alice:1234"
bob = "bob:1234"

Expand Down Expand Up @@ -257,7 +259,7 @@ def random(**kwargs):


@pytest.mark.skipif(
math.isfinite(float(dask.config.get("distributed.scheduler.worker-saturation"))),
QUEUING_ON_BY_DEFAULT,
reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204",
)
@gen_cluster(
Expand Down Expand Up @@ -4150,3 +4152,98 @@ async def test_transition_waiting_memory(c, s, a, b):
assert s.tasks["x"].state == "no-worker"
assert s.tasks["y"].state == "waiting"
assert_story(s.story("y"), [("y", "waiting", "waiting", {})])


@pytest.mark.parametrize(
"rootish",
[
pytest.param(
True,
marks=pytest.mark.skipif(
not QUEUING_ON_BY_DEFAULT,
reason="Nothing will be classified as root-ish",
),
),
False,
],
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_deadlock_resubmit_queued_tasks_fast(c, s, a, rootish):
# See https://github.com/dask/distributed/issues/7200
block = Event()
block2 = Event()
executing = Event()
executing2 = Event()

def block_on_event(*args, block, executing):
executing.set()
block.wait()

if rootish:
ntasks = s.total_nthreads * 2 + 1
else:
ntasks = 1
keys = [f"fut-{i}" for i in range(ntasks)]

def submit_tasks():
# Use case would be a client rescheduling the same or a similar graph
# multiple times, closely followed
# df.head()
# df.size.compute()
# We're emulating this by submitting the sames *keys*
return c.map(
block_on_event, range(len(keys)), block=block, executing=executing, key=keys
)

def assert_rootish():
# Just to verify our assumptions in case the definition changes. This is
# currently a bit brittle
if rootish:
assert all(s.is_rootish(s.tasks[k]) for k in keys)
else:
assert not any(s.is_rootish(s.tasks[k]) for k in keys)

f1 = submit_tasks()
# Make sure that the worker is properly saturated
nblocking_tasks = 5

# This set of tasks is there to guarantee that the worker is saturated after
# releasing the first set of tasks s.t. a subsequent submission would run
# into queuing
fut2 = c.map(
block_on_event, range(nblocking_tasks), block=block2, executing=executing2
)

# Once the task is on the threadpool, the client/scheduler may start its
# release chain
await executing.wait()

assert len(a.state.tasks)
# To trigger this condition, the scheduler needs to receive the
# `task-finished` message after it performed the client release transitions
# Therefore, the worker must not receive the `free-keys`` signal before it
# can finish the task since otherwise the worker would recognize it as
# cancelled and would forget about it. We emulate this behavior by blocking
# the outgoing scheduler stream until that happens, i.e. this introduces
# artifical latency
with freeze_batched_send(s.stream_comms[a.address]):
del f1
while any(k in s.tasks for k in keys):
await asyncio.sleep(0.005)

assert len(s.tasks) == nblocking_tasks
fut3 = submit_tasks()
while len(s.tasks) == nblocking_tasks:
await asyncio.sleep(0.005)
assert_rootish()
if rootish:
assert all(s.tasks[k] in s.queued for k in keys)
await block.set()
# At this point we need/want to wait for the task-finished message to
# arrive on the scheduler. There is no proper hook to wait, therefore we
# sleep
await asyncio.sleep(0.2)
# Everything should finish properly after this
await block2.set()
await c.gather(fut2)
await c.gather(fut3)