From c978bc054d713875a297e059d6a00afbbc157066 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 13 Apr 2021 22:58:02 -0700 Subject: [PATCH] Add & use `_transition_dispatch` function --- distributed/scheduler.py | 87 +++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index dc64e3cc953..97f61a0e381 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -144,6 +144,15 @@ def nogil(func): return func +@cfunc +@inline +def sys_intern(s): + if compiled: + return intern(s) + else: + return sys.intern(s) + + if sys.version_info < (3, 8): try: import pickle5 as pickle @@ -1606,7 +1615,6 @@ class SchedulerState: _task_metadata: dict _total_nthreads: Py_ssize_t _total_occupancy: double - _transitions_table: dict _unknown_durations: dict _unrunnable: set _validate: bint @@ -1659,23 +1667,6 @@ def __init__( self._task_metadata = dict() self._total_nthreads = 0 self._total_occupancy = 0 - self._transitions_table = { - ("released", "waiting"): self.transition_released_waiting, - ("waiting", "released"): self.transition_waiting_released, - ("waiting", "processing"): self.transition_waiting_processing, - ("waiting", "memory"): self.transition_waiting_memory, - ("processing", "released"): self.transition_processing_released, - ("processing", "memory"): self.transition_processing_memory, - ("processing", "erred"): self.transition_processing_erred, - ("no-worker", "released"): self.transition_no_worker_released, - ("no-worker", "waiting"): self.transition_no_worker_waiting, - ("released", "forgotten"): self.transition_released_forgotten, - ("memory", "forgotten"): self.transition_memory_forgotten, - ("erred", "forgotten"): self.transition_released_forgotten, - ("erred", "released"): self.transition_erred_released, - ("memory", "released"): self.transition_memory_released, - ("released", "erred"): self.transition_released_erred, - } self._unknown_durations = dict() if unrunnable is not None: self._unrunnable = unrunnable @@ -1826,6 +1817,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState: # State Transitions # ##################### + def _transition_dispatch(self, start: str, finish: str, *args, **kwargs): + start = sys_intern(start) + finish = sys_intern(finish) + + result: tuple = None + if start is "released": # noqa: F632 + if finish is "waiting": # noqa: F632 + result = self.transition_released_waiting(*args, **kwargs) + elif finish is "forgotten": # noqa: F632 + result = self.transition_released_forgotten(*args, **kwargs) + elif finish is "erred": # noqa: F632 + result = self.transition_released_erred(*args, **kwargs) + elif start is "waiting": # noqa: F632 + if finish is "released": # noqa: F632 + result = self.transition_waiting_released(*args, **kwargs) + elif finish is "processing": # noqa: F632 + result = self.transition_waiting_processing(*args, **kwargs) + elif finish is "memory": # noqa: F632 + result = self.transition_waiting_memory(*args, **kwargs) + elif start is "processing": # noqa: F632 + if finish is "released": # noqa: F632 + result = self.transition_processing_released(*args, **kwargs) + elif finish is "memory": # noqa: F632 + result = self.transition_processing_memory(*args, **kwargs) + elif finish is "erred": # noqa: F632 + result = self.transition_processing_erred(*args, **kwargs) + elif start is "no-worker": # noqa: F632 + if finish is "released": # noqa: F632 + result = self.transition_no_worker_released(*args, **kwargs) + elif finish is "waiting": # noqa: F632 + result = self.transition_no_worker_waiting(*args, **kwargs) + elif start is "memory": # noqa: F632 + if finish is "released": # noqa: F632 + result = self.transition_memory_released(*args, **kwargs) + elif finish is "forgotten": # noqa: F632 + result = self.transition_memory_forgotten(*args, **kwargs) + elif start is "erred": # noqa: F632 + if finish is "released": # noqa: F632 + result = self.transition_erred_released(*args, **kwargs) + elif finish is "forgotten": # noqa: F632 + result = self.transition_released_forgotten(*args, **kwargs) + + return result + def _transition( self, key, @@ -1854,7 +1889,6 @@ def _transition( parent: SchedulerState = cast(SchedulerState, self) ts: TaskState start: str - start_finish: tuple finish2: str recommendations: dict worker_msgs: dict @@ -1879,12 +1913,10 @@ def _transition( dependents = set(ts._dependents) dependencies = set(ts._dependencies) - start_finish = (start, finish) - func = self._transitions_table.get(start_finish) - if func is not None: - a: tuple = func(key, *args, **kwargs) + a: tuple = self._transition_dispatch(start, finish, key, *args, **kwargs) + if a is not None: recommendations, client_msgs, worker_msgs = a - elif "released" not in start_finish: + elif start != "released" and finish != "released": assert not args and not kwargs a_recs: dict a_cmsgs: dict @@ -1893,11 +1925,12 @@ def _transition( a_recs, a_cmsgs, a_wmsgs = a v = a_recs.get(key, finish) - func = self._transitions_table["released", v] b_recs: dict b_cmsgs: dict b_wmsgs: dict - b: tuple = func(key) + b: tuple = self._transition_dispatch( + "released", v, key, *args, **kwargs + ) b_recs, b_cmsgs, b_wmsgs = b recommendations.update(a_recs) @@ -1930,7 +1963,7 @@ def _transition( start = "released" else: - raise RuntimeError("Impossible transition from %r to %r" % start_finish) + raise RuntimeError(f"Impossible transition from {start} to {finish}") finish2 = ts._state self.transition_log.append((key, start, finish2, recommendations, time()))