Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffle restart may deadlock if no running workers exist #8088

Closed
hendrikmakait opened this issue Aug 9, 2023 · 1 comment · Fixed by #8091 or #8094
Closed

P2P shuffle restart may deadlock if no running workers exist #8088

hendrikmakait opened this issue Aug 9, 2023 · 1 comment · Fixed by #8091 or #8094
Labels
bug Something is broken deadlock The cluster appears to not make any progress shuffle

Comments

@hendrikmakait
Copy link
Member

An edge case exists where attempting to restart a P2P shuffle may deadlock.

In this scenario, there are some non-running workers holding inputs to the shuffle, but no more running workers when the shuffle is restarted. This will cause a processing shuffle-transfer to transition processing -> released -> waiting -> waiting -> no-worker -> released. The task will then remain stuck in released, causing a deadlock.

There are two possible ways of fixing this:

  1. Carefully design the restarting mechanism to avoid this transition chain.
  2. Recommend waiting when transitioning a task no-worker -> released if it has tasks or clients waiting on its results (similar to the processing -> released and waiting -> released.

Reproducer:

@gen_cluster(client=True, nthreads=[])
async def test_restart_deadlock(c, s):
    async with Worker(s.address) as a:
        async with Nanny(s.address) as b:
            with dask.annotate(workers=[a.address]):
                df = dask.datasets.timeseries(
                    start="2000-01-01",
                    end="2000-03-01",
                    dtypes={"x": float, "y": float},
                    freq="10 s",
                )
            out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
            fut = c.compute(out.x.size)
            await wait_until_worker_has_tasks(
                "shuffle-transfer", b.worker_address, 1, s
            )
            a.status = Status.paused
            while len(s.running) > 1:
                await asyncio.sleep(0.01) 
            await b.process.process.kill()

            while s.running:
                await asyncio.sleep(0.01)


            a.status = Status.running
            await fut
@hendrikmakait hendrikmakait added bug Something is broken deadlock The cluster appears to not make any progress shuffle labels Aug 9, 2023
@hendrikmakait hendrikmakait reopened this Aug 10, 2023
@hendrikmakait
Copy link
Member Author

There's another race hidden in this edge case that causes the new test to flake due to a problem with the transition logic:

2023-08-10 19:53:39,698 - distributed.core - ERROR - ({'worker': 'tcp://127.0.0.1:55522', 'nbytes': 28, 'type': b'\x80\x05\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x03int\x94\x93\x94.', 'typename': 'int', 'metadata': {}, 'thread': 10872008704, 'startstops': ({'action': 'compute', 'start': 1691690019.318779, 'stop': 1691690019.393408},), 'status': 'OK'}, 'no-worker', 'memory')
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 803, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 4311, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5669, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1011, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5528, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 4845, in stimulus_task_finished
    r: tuple = self._transition(
               ^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 1928, in _transition
    assert not kwargs, (kwargs, start, finish)
AssertionError: ({'worker': 'tcp://127.0.0.1:55522', 'nbytes': 28, 'type': b'\x80\x05\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x03int\x94\x93\x94.', 'typename': 'int', 'metadata': {}, 'thread': 10872008704, 'startstops': ({'action': 'compute', 'start': 1691690019.318779, 'stop': 1691690019.393408},), 'status': 'OK'}, 'no-worker', 'memory')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken deadlock The cluster appears to not make any progress shuffle
Projects
None yet
1 participant