Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transitioning the dependency of a queued task from memory to released causes deadlock #8702

Closed
hendrikmakait opened this issue Jun 19, 2024 · 0 comments · Fixed by #8703
Closed
Assignees
Labels
bug Something is broken deadlock The cluster appears to not make any progress scheduler

Comments

@hendrikmakait
Copy link
Member

Reproducer

@gen_cluster(client=True, nthreads=[("", 1)])
async def test_deadlock_dependency_of_queued_released(c, s, a):
    @delayed
    def inc(input):
        return input + 1
    
    @delayed
    def block_on_event(input, block, executing):
        executing.set()
        block.wait()
        return input

    block = Event()
    executing = Event()
    
    dep = inc(0)
    futs = [block_on_event(dep, block, executing, dask_key_name=("rootish", i)) for i in range(s.total_nthreads * 2 + 1)]
    del dep
    futs = c.compute(futs)
    await executing.wait()
    assert s.queued
    await s.remove_worker(address=a.address, stimulus_id="test")

    # Instead of deadlocking, this will fail the test
    # s.validate_state()

    await block.set()
    await executing.clear()
    
    async with Worker(s.address) as b:
        await c.gather(*futs)

deadlocks with the following scheduler logs:


2024-06-19 08:45:18,446 - distributed.scheduler - ERROR - Error transitioning ('rootish', 0) from 'queued' to 'processing'
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2009, in _transition
    recommendations, client_msgs, worker_msgs = func(
                                                ^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2854, in _transition_queued_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 3252, in _add_to_processing
    self._validate_ready(ts)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 3245, in _validate_ready
    assert all(dts.who_has for dts in ts.dependencies)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError
2024-06-19 08:45:18,447 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:62311', status: running, memory: 0, processing: 1> (stimulus_id='handle-worker-cleanup-1718779518.447378')
2024-06-19 08:45:18,447 - distributed.scheduler - INFO - Lost all workers
2024-06-19 08:45:18,450 - distributed.scheduler - ERROR - 
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 4490, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 6002, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1053, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5948, in handle_worker_status_change
    self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5083, in stimulus_queue_slots_maybe_opened
    self.transitions({qts.key: "processing"}, stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 8057, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2116, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2009, in _transition
    recommendations, client_msgs, worker_msgs = func(
                                                ^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2854, in _transition_queued_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 3252, in _add_to_processing
    self._validate_ready(ts)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 3245, in _validate_ready
    assert all(dts.who_has for dts in ts.dependencies)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError
2024-06-19 08:45:18,451 - distributed.core - ERROR - Exception while handling op register-worker
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 4490, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 6002, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1053, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5948, in handle_worker_status_change
    self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5083, in stimulus_queue_slots_maybe_opened
    self.transitions({qts.key: "processing"}, stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 8057, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2116, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2009, in _transition
    recommendations, client_msgs, worker_msgs = func(
                                                ^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 2854, in _transition_queued_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 3252, in _add_to_processing
    self._validate_ready(ts)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 3245, in _validate_ready
    assert all(dts.who_has for dts in ts.dependencies)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Uncommenting s.validate_state() in the reproducer causes it to fail with

______________________________ test_deadlock_dependency_of_queued_released _______________________________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:62334', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:62335', name: 0, status: closed, stored: 1, running: 1/1, ready: 1, comm: 0, waiting: 0>

    @gen_cluster(client=True, nthreads=[("", 1)])
    async def test_deadlock_dependency_of_queued_released(c, s, a):
        @delayed
        def inc(input):
            return input + 1
    
        @delayed
        def block_on_event(input, block, executing):
            executing.set()
            block.wait()
            return input
    
        block = Event()
        executing = Event()
    
        dep = inc(0)
        futs = [block_on_event(dep, block, executing, dask_key_name=("rootish", i)) for i in range(s.total_nthreads * 2 + 1)]
        del dep
        futs = c.compute(futs)
        await executing.wait()
        assert s.queued
        await s.remove_worker(address=a.address, stimulus_id="test")
    
>       s.validate_state()

distributed/tests/test_scheduler.py:4676: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/scheduler.py:5615: in validate_state
    validate_state(self.tasks, self.workers, self.clients)
distributed/scheduler.py:8908: in validate_state
    validate_task_state(ts)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

ts = <TaskState ('rootish', 0) forgotten>

    def validate_task_state(ts: TaskState) -> None:
        """Validate the given TaskState"""
        assert ts.state in ALL_TASK_STATES, ts
    
        if ts.waiting_on:
            assert ts.waiting_on.issubset(ts.dependencies), (
                "waiting not subset of dependencies",
                str(ts.waiting_on),
                str(ts.dependencies),
            )
        if ts.waiters:
            assert ts.waiters.issubset(ts.dependents), (
                "waiters not subset of dependents",
                str(ts.waiters),
                str(ts.dependents),
            )
    
        for dts in ts.waiting_on or ():
            assert not dts.who_has, ("waiting on in-memory dep", str(ts), str(dts))
            assert dts.state != "released", ("waiting on released dep", str(ts), str(dts))
        for dts in ts.dependencies:
            assert ts in dts.dependents, (
                "not in dependency's dependents",
                str(ts),
                str(dts),
                str(dts.dependents),
            )
            if ts.state in ("waiting", "queued", "processing", "no-worker"):
>               assert ts.waiting_on and dts in ts.waiting_on or dts.who_has, (
                    "dep missing",
                    str(ts),
                    str(dts),
                )
E               AssertionError: ('dep missing', "<TaskState ('rootish', 0) queued>", "<TaskState 'inc-6b29e2a4-23ce-4000-81ae-908fcdeb4278' queued>")

distributed/scheduler.py:8799: AssertionError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken deadlock The cluster appears to not make any progress scheduler
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant