Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler.reschedule() does not work #6340

Closed
crusaderky opened this issue May 13, 2022 · 8 comments · Fixed by #6339
Closed

Scheduler.reschedule() does not work #6340

crusaderky opened this issue May 13, 2022 · 8 comments · Fixed by #6339
Assignees
Labels
bug Something is broken

Comments

@crusaderky
Copy link
Collaborator

Scheduler.reschedule() is called:

  1. from Worker.transition_long_running_rescheduled
  2. from Worker.transition_executing_rescheduled
  3. from stealing.py
  4. directly from a bunch of unit tests

If you add "assert False" at the top of the method, only the tests that invoke it directly fail.
Notably, among them there's test_reschedule - in other words, there is nothing at the moment that tests that worker transitions and work stealing are successfully using the method.

@crusaderky crusaderky added the tests Unit tests and/or continuous integration label May 13, 2022
@crusaderky
Copy link
Collaborator Author

Correction: there is one test that triggers rescheduling from the worker, test_worker.py::test_reschedule.

At the moment of writing, the call to reschedule() from the Worker does NOT work due to a mismatched function signature. A TypeError can be spotted in the log by eagle-eyed users.
Once #6339 fixed the call, the worker state machine broke:

Traceback (most recent call last):
  File "/home/crusaderky/github/distributed/distributed/worker.py", line 4277, in validate_task
    self.validate_task_cancelled(ts)
  File "/home/crusaderky/github/distributed/distributed/worker.py", line 4245, in validate_task_cancelled
    assert ts._next is None  # We'll always transition to released after it is done
AssertionError

@crusaderky crusaderky changed the title Scheduler.reschedule() lacks integration testing Scheduler.reschedule() works only by accident May 13, 2022
@crusaderky crusaderky added deadlock The cluster appears to not make any progress and removed tests Unit tests and/or continuous integration labels May 16, 2022
@crusaderky
Copy link
Collaborator Author

CC @fjetter

@fjetter fjetter added the bug Something is broken label May 25, 2022
@fjetter fjetter changed the title Scheduler.reschedule() works only by accident Scheduler.reschedule() does not work May 25, 2022
@crusaderky crusaderky self-assigned this Jun 24, 2022
@crusaderky
Copy link
Collaborator Author

crusaderky commented Jun 28, 2022

The reason why, when you raise Reschedule() in a task, the scheduler effectively reschedules the task is that:

  1. the worker sends the scheduler a reschedule message, which doesn't match the signature of Scheduler.reschedule
  2. which causes Scheduler.handle_stream to raise TypeError
  3. which causes the batched stream channel to collapse
  4. which causes the worker to automatically reconnect to the scheduler
  5. the connection event causes the scheduler to transition the task processing->released->waiting->processing

Scheduler log (note the stimulus ids):

- ('x1', 'released', 'waiting', {'x1': 'processing'}, 'update-graph-1656417887.9150004', 1656417887.9152453)
- ('x1', 'waiting', 'processing', {}, 'update-graph-1656417887.9150004', 1656417887.9161289)
- ('x1', 'processing', 'released', {'x1': 'waiting'}, 'worker-connect-1656417887.7426143', 1656417888.023502)
- ('x1', 'released', 'waiting', {'x1': 'processing'}, 'worker-connect-1656417887.7426143', 1656417888.0235095)
- ('x1', 'waiting', 'processing', {}, 'worker-connect-1656417887.7426143', 1656417888.0235472)
- ('x1', 'processing', 'memory', {}, 'task-finished-1656417888.223756', 1656417888.2246292)

... 🤔 ...

I mean, if it works, don't touch it, amirite? 😆 😆 😆 😆 😆

@crusaderky crusaderky removed the deadlock The cluster appears to not make any progress label Jun 28, 2022
@crusaderky
Copy link
Collaborator Author

Once #6339 fixed the call, the worker state machine broke:

Traceback (most recent call last):
  File "/home/crusaderky/github/distributed/distributed/worker.py", line 4277, in validate_task
    self.validate_task_cancelled(ts)
  File "/home/crusaderky/github/distributed/distributed/worker.py", line 4245, in validate_task_cancelled
    assert ts._next is None  # We'll always transition to released after it is done
AssertionError

I can no longer reproduce this

@fjetter
Copy link
Member

fjetter commented Jun 28, 2022

which causes the worker to automatically reconnect to the scheduler

We removed the worker reconnect in #6361 🤔

@crusaderky
Copy link
Collaborator Author

worker-connect-

which causes the worker to automatically reconnect to the scheduler

We removed the worker reconnect in #6361 thinking

Well something is clearly calling Worker._register_with_scheduler after Scheduler.handle_stream crashes, calling await comm.close() in its finally clause.

@crusaderky
Copy link
Collaborator Author

Whoops. I just added a test for the edge case where a task raises Reschedule(), but the client has already released the future... and the worker state machine just horribly fell apart 🥳

  [Previous line repeated 2941 more times]
  File "/home/crusaderky/github/distributed/distributed/worker_state_machine.py", line 2299, in _transition
    recs, instructions = self._transition(
  File "/home/crusaderky/github/distributed/distributed/worker_state_machine.py", line 2279, in _transition
    func = self._TRANSITIONS_TABLE.get((start, finish))
RecursionError: maximum recursion depth exceeded in comparison

@crusaderky
Copy link
Collaborator Author

worker-connect-

which causes the worker to automatically reconnect to the scheduler

We removed the worker reconnect in #6361 thinking

Well something is clearly calling Worker._register_with_scheduler after Scheduler.handle_stream crashes, calling await comm.close() in its finally clause.

Mistery solved: the original worker died and was not replaced. The finally clause of Scheduler.handle_worker calls remove_worker, passing the original stimulus_id - which is potentially hours old and was not used anywhere before.
Remediation to follow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants