Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transition queued->memory causes AssertionError #7200

Closed
Tracked by #7213
crusaderky opened this issue Oct 26, 2022 · 15 comments · Fixed by #7205 or #7348
Closed
Tracked by #7213

Transition queued->memory causes AssertionError #7200

crusaderky opened this issue Oct 26, 2022 · 15 comments · Fixed by #7205 or #7348
Assignees
Labels
bug Something is broken deadlock The cluster appears to not make any progress scheduler

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Oct 26, 2022

While trying to reproduce #7063, I came across a different error, this one with queueing enabled.
The below reproducer is NOT minimal - there is likely quite a bit of simplification possible.

@gen_cluster(client=True, nthreads=[("", 1)], config={"distributed.scheduler.worker-saturation": 1.5})
async def test_steal_rootish_while_retiring(c, s, a):
    """https://github.com/dask/distributed/issues/7063

    Note that this applies to both tasks that raise Reschedule as well as work stealing.
    """
    ev = Event()

    # Put a task in memory on a, which will be retired, and prevent b from acquiring
    # a replica. This will cause a to be stuck in closing_gracefully state until we
    # set b.block_gather_dep.
    m = c.submit(inc, 1, key="m", workers=[a.address])
    await wait(m)

    async with BlockedGatherDep(s.address, nthreads=1) as b:
        # Large number of tasks to make sure they're rootish
        futures = c.map(
            lambda i, ev: ev.wait(), range(10), ev=ev, key=[f"x-{i}" for i in range(10)]
        )

        while a.state.executing_count != 1 or b.state.executing_count != 1:
            await asyncio.sleep(0.01)

        assert s.is_rootish(s.tasks[futures[0].key])

        retire_task = asyncio.create_task(c.retire_workers([a.address]))
        # Wait until AMM sends AcquireReplicasEvent to b to move away m
        await b.in_gather_dep.wait()
        assert s.workers[a.address].status == Status.closing_gracefully

        # Steal any of the tasks on a
        steal_key = next(iter(a.state.executing)).key
        s.reschedule(steal_key, stimulus_id="steal")
        await ev.set()

        # The stolen task can now complete on the other worker
        await wait_for_state(steal_key, "memory", b)
        await wait_for_state(steal_key, "memory", s)

        # Let graceful retirement of a complete.
        # This in turn reschedules whatever tasks were still processing on a to b.
        b.block_gather_dep.set()
        await retire_task
        await wait(futures)

The test is green; however I read in the log:

  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 5284, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 4649, in stimulus_task_finished
    r: tuple = self._transition(
  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 1813, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:45929', 'nbytes': 28, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x04bool\x94\x93\x94.', 'typename': 'bool', 'metadata': {}, 'thread': 139862053221952, 'startstops': ({'action': 'compute', 'start': 1666802403.9580944, 'stop': 1666802403.9590282},), 'status': 'OK'}, 'queued', 'memory')

What is happening:

  1. steal_key is processing on a
  2. steal_key is rescheduled, which causes the scheduler to send a free-keys message to a and put the task back in queue
  3. before the free-keys message can reach a, steal_key finishes on a
  4. steal_key transitions to memory on a, sending a TaskFinishedMsg to the scheduler.
  5. a queued->memory transition happens which, I suspect, is otherwise untested.

This is timing-sensitive; if free-keys reached a before the task end, then steal_key would be cancelled and transition to forgotten without any messaging when it ends.

@gjoseph92
Copy link
Collaborator

I somewhat intentionally didn't add a queued->memory transition since it seemed like a messed-up case to me, and I didn't want to cargo-cult it in just because other transitions of the same form were there. Seems it is in fact a messed-up case, but a possible one.

So I guess we should just add the transition, which should be straightforward. Minimizing this test, on the other hand, might not be.

@crusaderky crusaderky added the bug Something is broken label Oct 27, 2022
@crusaderky
Copy link
Collaborator Author

I can work on minimising the test

@crusaderky
Copy link
Collaborator Author

I'm working on this.
I think we have a much wider problem.

  • transition_processing_memory (with unexpected worker),
  • transition_waiting_memory, and
  • transition_no_worker_memory

All implement the same logic to accept the task output from wherever it comes. However, the only case I can think of when this happens is when a cancellation request is already in flight but hasn't yet reached the worker, so the worker is going to lose the task immediately afterwards. I think they all have to be fixed to just disregard the transition.

CC @fjetter

@fjetter
Copy link
Member

fjetter commented Oct 27, 2022

    # Steal any of the tasks on a
   steal_key = next(iter(a.state.executing)).key
   s.reschedule(steal_key, stimulus_id="steal")
   await ev.set()

This is not how stealing works. Stealing will be rejected if a key is already executing and nothing will be transitioned.

I'm not sure how such a reschedule would actually occur. If it did we'd intentionally calculate a key on two workers simultaneously.

@fjetter
Copy link
Member

fjetter commented Oct 27, 2022

I'm currently struggling to come up with a case where this transition would actually occur. I believe we eradicated these type of transitions when removing the worker reconnect. However, I'm not entirely sure.

I do not consider it a valid API use to call Scheduler.reschedule. This is internally used when

  • Stealing made a mistake and needs to correct because the key is not running anywhere
  • The user raises a Reschedule exception. in this case, by definition the key is already done

@gjoseph92
Copy link
Collaborator

Won't it occur just by calling Scheduler.reschedule directly in the circumstance Guido has created, where the task is root-ish and all workers are full?

The implementation of Scheduler.reschedule seems odd to me. In work stealing, when we want to reschedule a task in processing, we first talk to the worker, get confirmation it's cancelled the task, and only then change state on the scheduler.

Scheduler.reschedule just blindly releases the state on the scheduler, even though the task may be happily executing on a worker.

It seems like the implementation of reschedule is targeted very much towards being the handler for Reschedule exceptions. That's a very specific use-case, where we can safely make the assumption that:

  1. the task is no longer actually processing on the worker we thought it was, and that worker has already released the task internally
  2. we are not going to receive any subsequent messages from that worker about that task (it's not going to go to memory immediately after)

That specific case doesn't seem like something that should be a public API. I would guess that the only reason it works to call it in work stealing is that the situation is equivalent, because move_task_request has already cancelled the task on the original worker, so we know no more updates about the task will come from there.

If Scheduler.reschedule were actually a generic, public API, I would expect it to implement the same move_task_request, move_task_confirm pattern that work-stealing does. Otherwise it leaves the door open for this inconsistency where a task is executing on two workers simultaneously.

I am leaning towards #7205 (comment) that reschedule should not be a public API, and we should add a comment to note the specific case in which it can be used internally.

@crusaderky
Copy link
Collaborator Author

So if I understand correctly, if I replace the contents of

  • transition_processing_memory (with unexpected worker)
  • transition_waiting_memory
  • transition_no_worker_memory

with breakpoint(), I should catch exclusively use cases that none of us know about?

@crusaderky
Copy link
Collaborator Author

crusaderky commented Oct 27, 2022

Yikes.
I did the above (crusaderky@d71804a) and I got ONE waiting->memory in test_stress_creation_and_deletion on localhost, and ZERO on CI 😨
These transitions are virtually untested.

story of test_stress_creation_and_deletion (breakpoint in transition_waiting_memory; edited for readability):

[
    ("('sum-partial-0', 9, 7)", 'released', 'waiting', {}, 'update-graph-1666879902.9497185', 1666879902.9718037),
    ("('sum-0', 19, 14)", 'processing', 'memory', {"('sum-partial-0', 9, 7)": 'processing', "('random_sample-0', 19, 14)": 'released', "('random_sample-0', 14, 19)": 'released'}, 'task-finished-1666879904.9479938', 1666879904.956455)
    ("('sum-partial-0', 9, 7)", 'waiting', 'processing', {}, 'task-finished-1666879904.9479938', 1666879904.9565284
    ("('sum-0', 19, 14)", 'memory', 'released', {"('sum-0', 19, 14)": 'waiting', "('sum-partial-0', 9, 7)": 'waiting'}, 'handle-worker-cleanup-1666879904.976327', 1666879904.9796247)
    ("('sum-partial-0', 9, 7)", 'processing', 'released', {"('sum-partial-0', 9, 7)": 'waiting'}, 'handle-worker-cleanup-1666879904.976327', 1666879904.9796507)
    ("('sum-partial-0', 9, 7)", 'released', 'waiting', {"('sum-partial-0', 9, 7)": 'waiting', "('sum-0', 19, 14)": 'waiting'}, 'handle-worker-cleanup-1666879904.976327', 1666879904.979665)
    ("('sum-partial-0', 9, 7)", 'waiting', 'memory', <breakpoint>
]

What I think I'm reading here:

  1. sum-partial-0 is processing on a worker
  2. the worker abruptly leaves the cluster (handle-worker-cleanup)
  3. this causes the task to transition from processing to waiting (because we also just lost its dependency sum-0)
  4. after that happens, a worker (which I inspected in the transition to have status=running) tells the scheduler that it completed the task. I do not know if it's the same worker that left the cluster before or not.

@crusaderky
Copy link
Collaborator Author

My opinion: none of this is new.
I'd suggest to merge a PR now (WIP) that implements transition_queued_memory, as a copy-paste of the other equivalent transitions, without any tests - as the others don't have any either - and then open a follow-up issue for investigation and test coverage.

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Oct 27, 2022

I think that's probably the best approach for now, but I find this concerning.

reschedule should not be a public API

@crusaderky do you think we should also do this? It seems like the right thing to me. EDIT:

I do not know if it's the same worker that left the cluster before or not.

Either one of these is concerning.

  • If it's a new worker, that implies that two workers were sent the task even though there's only one 'waiting', 'processing' transition in the story. This would be extremely concerning and feels pretty much impossible to me (barring some oddity around async all workers running the same process and some message getting crossed somehow?).
  • If it's the same worker, that implies the worker somehow manages to send more messages even after we think we've closed communications to it. Sort of feels like Eliminate partially-removed-worker state on scheduler (comms open, state removed) #6390, except in this case all the actions in question should be happening over the batched comm, which we know is already closed by the time we get to handle-worker-cleanup here.

One thing that does come to mind is that if an async function is a stream handler, the handler will be executed in the background:

handler = self.stream_handlers[op]
if iscoroutinefunction(handler):
self._ongoing_background_tasks.call_soon(
handler, **merge(extra, msg)
)
await asyncio.sleep(0)
else:
handler(**merge(extra, msg))

So though we think of batched stream messages as being processed in order, async handlers are not necessarily processed in order, so it might be possible for part of an async handler to run after the batched stream has already closed. (xref #5443 a little.)

The problem with this theory is that handle_task_finished is synchronous.

@crusaderky do you have a cluster dump of this case? Or at least a stimulus ID for the waiting->memory transition?

@fjetter
Copy link
Member

fjetter commented Oct 28, 2022

I can reproduce this locally as well. Pretty easily, actually.

do you have a cluster dump of this case

Cluster dump is difficult because most of the workers shut down very quickly in this test. I can observe with a debugger, though.

Or at least a stimulus ID for the waiting->memory transition?

stim ID in my case is just another task-finished- transition.

What I could gather so far:

  • The release transition was a different worker than the worker currently finishing, i.e. it's not the same worker coming back!
  • The worker currently finishing is properly in state running
  • The worker who finished the key indeed received a ComputeTaskEvent instructing it to calculate it

Still investigating for more. This test is currently xfailed. I think we should invest some time to make this work properly.

@fjetter
Copy link
Member

fjetter commented Oct 28, 2022

Below the scheduler transition log. FOO and BAR are two keys, s.t. FOO depends on BAR

BAR is calculated on WorkerA (I changed the stimulus IDs slightly to include the wokrer address)

the transition waiting -> memory is triggered by a task-finished from WorkerB

I think what happens is...

  • WorkerA finishes BAR
  • FOO is transitioned to processing and assigned to WorkerB
  • WorkerB fetches BAR and sends an add_keys message to the scheduler
  • In the meantime, WorkerA dies and causes BAR to be scheduled back to released/waiting. Scheduler queues up a free-keys intended for WorkerB to cancel FOO
  • The add_keys from WorkerB only arrives after the A has been removed and all tasks are transitioned. This should trigger a remove-replicas (
    else:
    redundant_replicas.append(key)
    if redundant_replicas:
    if not stimulus_id:
    stimulus_id = f"redundant-replicas-{time()}"
    self.worker_send(
    worker,
    {
    "op": "remove-replicas",
    "keys": redundant_replicas,
    "stimulus_id": stimulus_id,
    },
    )
    ) but I haven't confirmed
  • WorkerB computes FOO and submits it's results to the scheduler
  • WorkerB receives the free-keys of the initial cancellation
[
    (
        "FOO",
        "released",
        "waiting",
        {},
        "update-graph-1666955210.246825",
        1666955210.291511,
    ),
    (
        "BAR",
        "processing",
        "memory",
        {...},
        "task-finished-1666955230.056821",
        1666955230.1450088,
    ),
    (
        "FOO",
        "waiting",
        "processing",
        {},
        "task-finished-1666955230.056821",
        1666955230.145199,
    ),
    (
        "BAR",
        "memory",
        "released",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955230.327744,
    ),
    (
        "FOO",
        "processing",
        "released",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955230.327806,
    ),
    (
        "FOO",
        "released",
        "waiting",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955230.3278348,
    ),
    (
        "FOO",
        "waiting",
        "memory",
        {...},
        "task-finished-1666955230.232949",
        1666955261.059616,
    ),
    (
        "FOO",
        "memory",
        "released",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955262.808954,
    ),
    (
        "FOO",
        "released",
        "waiting",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955262.809029,
    ),
]

@crusaderky
Copy link
Collaborator Author

crusaderky commented Oct 31, 2022

@fjette your analysis for waiting->memory is correct. I reproduced it deterministically in #7205.
I'm now tinkering with variations of the same use case to reproduce processing->memory from unexpected worker and queued->memory.

I cannot think of any way to reproduce no-worker->memory.
I tried with scatter:

f = c.submit(inc, 1, key="x", workers=["notexist"])
await asyncio.sleep(1)
await c.scatter({"x": 1})

but it won't cause the task to transition out of no-worker.

@crusaderky
Copy link
Collaborator Author

I cannot, for the life of me, think of any legit use case for the other transitions. I've changed them to raise in the PR.

@fjetter
Copy link
Member

fjetter commented Nov 24, 2022

This is resurfacing again in #7326 (comment)

Reproducer

@gen_cluster(client=True, nthreads=[("", 1)] * 3, timeout=5)
async def test_closed_worker_between_repeats(c, s, w1, w2, w3):
    df = dask.datasets.timeseries(
        start="2000-01-01",
        end="2000-01-10",
        dtypes={"x": float, "y": float},
        freq="100 s",
        seed=42,
    )
    out = dd.shuffle.shuffle(df, "x")
    await c.compute(out.head(compute=False))
    await w3.close()
    await c.compute(out.tail(compute=False))
    await w2.close()
    await c.compute(out.head(compute=False))

cc @crusaderky

Doesn't fail 100% deterministically but pretty reliably (about 1 out of 50)

Story indicates that we're in the third compute and the scheduler receives a task-finished from w1 after the task was transitioned to queued in computation 3.

Couldn't determine what's going on specifically, yet.

@fjetter fjetter reopened this Nov 24, 2022
@fjetter fjetter added the deadlock The cluster appears to not make any progress label Nov 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken deadlock The cluster appears to not make any progress scheduler
Projects
None yet
3 participants