diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fb9610c53d5..5c3bc4f59fa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -144,6 +144,15 @@ def nogil(func): return func +@cfunc +@inline +def sys_intern(s): + if compiled: + return intern(s) # noqa: F821 + else: + return sys.intern(s) + + if sys.version_info < (3, 8): try: import pickle5 as pickle @@ -1751,7 +1760,6 @@ class SchedulerState: _task_metadata: dict _total_nthreads: Py_ssize_t _total_occupancy: double - _transitions_table: dict _unknown_durations: dict _unrunnable: set _validate: bint @@ -1804,23 +1812,6 @@ def __init__( self._task_metadata = dict() self._total_nthreads = 0 self._total_occupancy = 0 - self._transitions_table = { - ("released", "waiting"): self.transition_released_waiting, - ("waiting", "released"): self.transition_waiting_released, - ("waiting", "processing"): self.transition_waiting_processing, - ("waiting", "memory"): self.transition_waiting_memory, - ("processing", "released"): self.transition_processing_released, - ("processing", "memory"): self.transition_processing_memory, - ("processing", "erred"): self.transition_processing_erred, - ("no-worker", "released"): self.transition_no_worker_released, - ("no-worker", "waiting"): self.transition_no_worker_waiting, - ("released", "forgotten"): self.transition_released_forgotten, - ("memory", "forgotten"): self.transition_memory_forgotten, - ("erred", "forgotten"): self.transition_released_forgotten, - ("erred", "released"): self.transition_erred_released, - ("memory", "released"): self.transition_memory_released, - ("released", "erred"): self.transition_released_erred, - } self._unknown_durations = dict() if unrunnable is not None: self._unrunnable = unrunnable @@ -1975,7 +1966,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState: # State Transitions # ##################### - def _transition(self, key, finish: str, *args, **kwargs): + def _transition_dispatch(self, start: str, finish: str, *args, **kwargs): + start_finish: str = sys_intern(f"{start}_{finish}".replace("-", "_")) + + if start_finish is "released_waiting": # noqa: F632 + return self.transition_released_waiting(*args, **kwargs) + elif start_finish is "released_forgotten": # noqa: F632 + return self.transition_released_forgotten(*args, **kwargs) + elif start_finish is "released_erred": # noqa: F632 + return self.transition_released_erred(*args, **kwargs) + elif start_finish is "waiting_released": # noqa: F632 + return self.transition_waiting_released(*args, **kwargs) + elif start_finish is "waiting_processing": # noqa: F632 + return self.transition_waiting_processing(*args, **kwargs) + elif start_finish is "waiting_memory": # noqa: F632 + return self.transition_waiting_memory(*args, **kwargs) + elif start_finish is "processing_released": # noqa: F632 + return self.transition_processing_released(*args, **kwargs) + elif start_finish is "processing_memory": # noqa: F632 + return self.transition_processing_memory(*args, **kwargs) + elif start_finish is "processing_erred": # noqa: F632 + return self.transition_processing_erred(*args, **kwargs) + elif start_finish is "no_worker_released": # noqa: F632 + return self.transition_no_worker_released(*args, **kwargs) + elif start_finish is "no_worker_waiting": # noqa: F632 + return self.transition_no_worker_waiting(*args, **kwargs) + elif start_finish is "memory_released": # noqa: F632 + return self.transition_memory_released(*args, **kwargs) + elif start_finish is "memory_forgotten": # noqa: F632 + return self.transition_memory_forgotten(*args, **kwargs) + elif start_finish is "erred_released": # noqa: F632 + return self.transition_erred_released(*args, **kwargs) + elif start_finish is "erred_forgotten": # noqa: F632 + return self.transition_released_forgotten(*args, **kwargs) + + def _transition( + self, + key, + finish: str, + *args, + status: str = None, + thread: Py_ssize_t = -1, + metadata: dict = None, + **kwargs, + ): """Transition a key from its current state to the finish state Examples @@ -1994,7 +2028,6 @@ def _transition(self, key, finish: str, *args, **kwargs): parent: SchedulerState = cast(SchedulerState, self) ts: TaskState start: str - start_finish: tuple finish2: str recommendations: dict worker_msgs: dict @@ -2019,12 +2052,10 @@ def _transition(self, key, finish: str, *args, **kwargs): dependents = set(ts._dependents) dependencies = set(ts._dependencies) - start_finish = (start, finish) - func = self._transitions_table.get(start_finish) - if func is not None: - a: tuple = func(key, *args, **kwargs) + a: tuple = self._transition_dispatch(start, finish, key, *args, **kwargs) + if a is not None: recommendations, client_msgs, worker_msgs = a - elif "released" not in start_finish: + elif start != "released" and finish != "released": assert not args and not kwargs a_recs: dict a_cmsgs: dict @@ -2033,11 +2064,12 @@ def _transition(self, key, finish: str, *args, **kwargs): a_recs, a_cmsgs, a_wmsgs = a v = a_recs.get(key, finish) - func = self._transitions_table["released", v] b_recs: dict b_cmsgs: dict b_wmsgs: dict - b: tuple = func(key) + b: tuple = self._transition_dispatch( + "released", v, key, *args, **kwargs + ) b_recs, b_cmsgs, b_wmsgs = b recommendations.update(a_recs) @@ -2070,7 +2102,7 @@ def _transition(self, key, finish: str, *args, **kwargs): start = "released" else: - raise RuntimeError("Impossible transition from %r to %r" % start_finish) + raise RuntimeError(f"Impossible transition from {start} to {finish}") finish2 = ts._state self.transition_log.append((key, start, finish2, recommendations, time())) @@ -2091,7 +2123,16 @@ def _transition(self, key, finish: str, *args, **kwargs): parent._tasks[ts._key] = ts for plugin in list(self.plugins): try: - plugin.transition(key, start, finish2, *args, **kwargs) + plugin.transition( + key, + start, + finish2, + *args, + status=status, + thread=thread, + metadata=metadata, + **kwargs, + ) except Exception: logger.info("Plugin failed with exception", exc_info=True) if ts._state == "forgotten": @@ -2158,7 +2199,9 @@ def _transitions(self, recommendations: dict, client_msgs: dict, worker_msgs: di for key in keys: self.validate_key(key) - def transition_released_waiting(self, key): + @ccall + @exceptval(check=False) + def transition_released_waiting(self, key) -> tuple: try: ts: TaskState = self._tasks[key] dts: TaskState @@ -2213,7 +2256,9 @@ def transition_released_waiting(self, key): pdb.set_trace() raise - def transition_no_worker_waiting(self, key): + @ccall + @exceptval(check=False) + def transition_no_worker_waiting(self, key) -> tuple: try: ts: TaskState = self._tasks[key] dts: TaskState @@ -2336,7 +2381,9 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState) -> double: ws._processing[ts] = total_duration return total_duration - def transition_waiting_processing(self, key): + @ccall + @exceptval(check=False) + def transition_waiting_processing(self, key) -> tuple: try: ts: TaskState = self._tasks[key] dts: TaskState @@ -2383,9 +2430,11 @@ def transition_waiting_processing(self, key): pdb.set_trace() raise + @ccall + @exceptval(check=False) def transition_waiting_memory( - self, key, nbytes=None, type=None, typename: str = None, worker=None, **kwargs - ): + self, key, nbytes=None, type=None, typename: str = None, worker=None + ) -> tuple: try: ws: WorkerState = self._workers_dv[worker] ts: TaskState = self._tasks[key] @@ -2423,16 +2472,17 @@ def transition_waiting_memory( pdb.set_trace() raise + @ccall + @exceptval(check=False) def transition_processing_memory( self, key, nbytes=None, type=None, typename: str = None, - worker=None, + worker: str = None, startstops=None, - **kwargs, - ): + ) -> tuple: ws: WorkerState wws: WorkerState recommendations: dict = {} @@ -2440,10 +2490,10 @@ def transition_processing_memory( worker_msgs: dict = {} try: ts: TaskState = self._tasks[key] - assert worker - assert isinstance(worker, str) if self._validate: + assert worker + assert isinstance(worker, str) assert ts._processing_on ws = ts._processing_on assert ts in ws._processing @@ -2544,7 +2594,9 @@ def transition_processing_memory( pdb.set_trace() raise - def transition_memory_released(self, key, safe: bint = False): + @ccall + @exceptval(check=False) + def transition_memory_released(self, key, safe: bint = False) -> tuple: ws: WorkerState try: ts: TaskState = self._tasks[key] @@ -2619,7 +2671,9 @@ def transition_memory_released(self, key, safe: bint = False): pdb.set_trace() raise - def transition_released_erred(self, key): + @ccall + @exceptval(check=False) + def transition_released_erred(self, key) -> tuple: try: ts: TaskState = self._tasks[key] dts: TaskState @@ -2664,7 +2718,9 @@ def transition_released_erred(self, key): pdb.set_trace() raise - def transition_erred_released(self, key): + @ccall + @exceptval(check=False) + def transition_erred_released(self, key) -> tuple: try: ts: TaskState = self._tasks[key] dts: TaskState @@ -2704,7 +2760,9 @@ def transition_erred_released(self, key): pdb.set_trace() raise - def transition_waiting_released(self, key): + @ccall + @exceptval(check=False) + def transition_waiting_released(self, key) -> tuple: try: ts: TaskState = self._tasks[key] recommendations: dict = {} @@ -2741,7 +2799,9 @@ def transition_waiting_released(self, key): pdb.set_trace() raise - def transition_processing_released(self, key): + @ccall + @exceptval(check=False) + def transition_processing_released(self, key) -> tuple: try: ts: TaskState = self._tasks[key] dts: TaskState @@ -2786,9 +2846,18 @@ def transition_processing_released(self, key): pdb.set_trace() raise + @ccall + @exceptval(check=False) def transition_processing_erred( - self, key, cause=None, exception=None, traceback=None, **kwargs - ): + self, + key, + cause=None, + exception=None, + traceback=None, + worker=None, + text=None, + startstops=None, + ) -> tuple: ws: WorkerState try: ts: TaskState = self._tasks[key] @@ -2864,7 +2933,9 @@ def transition_processing_erred( pdb.set_trace() raise - def transition_no_worker_released(self, key): + @ccall + @exceptval(check=False) + def transition_no_worker_released(self, key) -> tuple: try: ts: TaskState = self._tasks[key] dts: TaskState @@ -2907,7 +2978,9 @@ def remove_key(self, key): ts._exception_blame = ts._exception = ts._traceback = None self._task_metadata.pop(key, None) - def transition_memory_forgotten(self, key): + @ccall + @exceptval(check=False) + def transition_memory_forgotten(self, key) -> tuple: ws: WorkerState try: ts: TaskState = self._tasks[key] @@ -2949,7 +3022,9 @@ def transition_memory_forgotten(self, key): pdb.set_trace() raise - def transition_released_forgotten(self, key): + @ccall + @exceptval(check=False) + def transition_released_forgotten(self, key) -> tuple: try: ts: TaskState = self._tasks[key] recommendations: dict = {}