Skip to content

Commit

Permalink
Add & use _transition_dispatch function
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Apr 14, 2021
1 parent 1d3025f commit c978bc0
Showing 1 changed file with 60 additions and 27 deletions.
87 changes: 60 additions & 27 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ def nogil(func):
return func


@cfunc
@inline
def sys_intern(s):
if compiled:
return intern(s)
else:
return sys.intern(s)


if sys.version_info < (3, 8):
try:
import pickle5 as pickle
Expand Down Expand Up @@ -1606,7 +1615,6 @@ class SchedulerState:
_task_metadata: dict
_total_nthreads: Py_ssize_t
_total_occupancy: double
_transitions_table: dict
_unknown_durations: dict
_unrunnable: set
_validate: bint
Expand Down Expand Up @@ -1659,23 +1667,6 @@ def __init__(
self._task_metadata = dict()
self._total_nthreads = 0
self._total_occupancy = 0
self._transitions_table = {
("released", "waiting"): self.transition_released_waiting,
("waiting", "released"): self.transition_waiting_released,
("waiting", "processing"): self.transition_waiting_processing,
("waiting", "memory"): self.transition_waiting_memory,
("processing", "released"): self.transition_processing_released,
("processing", "memory"): self.transition_processing_memory,
("processing", "erred"): self.transition_processing_erred,
("no-worker", "released"): self.transition_no_worker_released,
("no-worker", "waiting"): self.transition_no_worker_waiting,
("released", "forgotten"): self.transition_released_forgotten,
("memory", "forgotten"): self.transition_memory_forgotten,
("erred", "forgotten"): self.transition_released_forgotten,
("erred", "released"): self.transition_erred_released,
("memory", "released"): self.transition_memory_released,
("released", "erred"): self.transition_released_erred,
}
self._unknown_durations = dict()
if unrunnable is not None:
self._unrunnable = unrunnable
Expand Down Expand Up @@ -1826,6 +1817,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState:
# State Transitions #
#####################

def _transition_dispatch(self, start: str, finish: str, *args, **kwargs):
start = sys_intern(start)
finish = sys_intern(finish)

result: tuple = None
if start is "released": # noqa: F632
if finish is "waiting": # noqa: F632
result = self.transition_released_waiting(*args, **kwargs)
elif finish is "forgotten": # noqa: F632
result = self.transition_released_forgotten(*args, **kwargs)
elif finish is "erred": # noqa: F632
result = self.transition_released_erred(*args, **kwargs)
elif start is "waiting": # noqa: F632
if finish is "released": # noqa: F632
result = self.transition_waiting_released(*args, **kwargs)
elif finish is "processing": # noqa: F632
result = self.transition_waiting_processing(*args, **kwargs)
elif finish is "memory": # noqa: F632
result = self.transition_waiting_memory(*args, **kwargs)
elif start is "processing": # noqa: F632
if finish is "released": # noqa: F632
result = self.transition_processing_released(*args, **kwargs)
elif finish is "memory": # noqa: F632
result = self.transition_processing_memory(*args, **kwargs)
elif finish is "erred": # noqa: F632
result = self.transition_processing_erred(*args, **kwargs)
elif start is "no-worker": # noqa: F632
if finish is "released": # noqa: F632
result = self.transition_no_worker_released(*args, **kwargs)
elif finish is "waiting": # noqa: F632
result = self.transition_no_worker_waiting(*args, **kwargs)
elif start is "memory": # noqa: F632
if finish is "released": # noqa: F632
result = self.transition_memory_released(*args, **kwargs)
elif finish is "forgotten": # noqa: F632
result = self.transition_memory_forgotten(*args, **kwargs)
elif start is "erred": # noqa: F632
if finish is "released": # noqa: F632
result = self.transition_erred_released(*args, **kwargs)
elif finish is "forgotten": # noqa: F632
result = self.transition_released_forgotten(*args, **kwargs)

return result

def _transition(
self,
key,
Expand Down Expand Up @@ -1854,7 +1889,6 @@ def _transition(
parent: SchedulerState = cast(SchedulerState, self)
ts: TaskState
start: str
start_finish: tuple
finish2: str
recommendations: dict
worker_msgs: dict
Expand All @@ -1879,12 +1913,10 @@ def _transition(
dependents = set(ts._dependents)
dependencies = set(ts._dependencies)

start_finish = (start, finish)
func = self._transitions_table.get(start_finish)
if func is not None:
a: tuple = func(key, *args, **kwargs)
a: tuple = self._transition_dispatch(start, finish, key, *args, **kwargs)
if a is not None:
recommendations, client_msgs, worker_msgs = a
elif "released" not in start_finish:
elif start != "released" and finish != "released":
assert not args and not kwargs
a_recs: dict
a_cmsgs: dict
Expand All @@ -1893,11 +1925,12 @@ def _transition(
a_recs, a_cmsgs, a_wmsgs = a

v = a_recs.get(key, finish)
func = self._transitions_table["released", v]
b_recs: dict
b_cmsgs: dict
b_wmsgs: dict
b: tuple = func(key)
b: tuple = self._transition_dispatch(
"released", v, key, *args, **kwargs
)
b_recs, b_cmsgs, b_wmsgs = b

recommendations.update(a_recs)
Expand Down Expand Up @@ -1930,7 +1963,7 @@ def _transition(

start = "released"
else:
raise RuntimeError("Impossible transition from %r to %r" % start_finish)
raise RuntimeError(f"Impossible transition from {start} to {finish}")

finish2 = ts._state
self.transition_log.append((key, start, finish2, recommendations, time()))
Expand Down

0 comments on commit c978bc0

Please sign in to comment.