-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add C APIs for transition_*
functions
#4650
Changes from all commits
9ddf362
52450c1
2f077b9
aedd565
9f554ee
dc40037
b6a67fe
333ec79
934996f
015af69
85b729f
f2c3e34
5eeffde
fea65f7
4766207
6712923
45b5c94
ab951c6
fbdf587
d15d82e
33fc4af
653f53d
feb5285
1d3025f
7ae22ad
1c55db6
08ed07f
eec1849
6c92b6c
57a949b
328d7fc
bb3e5cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -144,6 +144,15 @@ def nogil(func): | |
return func | ||
|
||
|
||
@cfunc | ||
@inline | ||
def sys_intern(s): | ||
if compiled: | ||
return intern(s) # noqa: F821 | ||
else: | ||
return sys.intern(s) | ||
|
||
|
||
if sys.version_info < (3, 8): | ||
try: | ||
import pickle5 as pickle | ||
|
@@ -1751,7 +1760,6 @@ class SchedulerState: | |
_task_metadata: dict | ||
_total_nthreads: Py_ssize_t | ||
_total_occupancy: double | ||
_transitions_table: dict | ||
_unknown_durations: dict | ||
_unrunnable: set | ||
_validate: bint | ||
|
@@ -1804,23 +1812,6 @@ def __init__( | |
self._task_metadata = dict() | ||
self._total_nthreads = 0 | ||
self._total_occupancy = 0 | ||
self._transitions_table = { | ||
("released", "waiting"): self.transition_released_waiting, | ||
("waiting", "released"): self.transition_waiting_released, | ||
("waiting", "processing"): self.transition_waiting_processing, | ||
("waiting", "memory"): self.transition_waiting_memory, | ||
("processing", "released"): self.transition_processing_released, | ||
("processing", "memory"): self.transition_processing_memory, | ||
("processing", "erred"): self.transition_processing_erred, | ||
("no-worker", "released"): self.transition_no_worker_released, | ||
("no-worker", "waiting"): self.transition_no_worker_waiting, | ||
("released", "forgotten"): self.transition_released_forgotten, | ||
("memory", "forgotten"): self.transition_memory_forgotten, | ||
("erred", "forgotten"): self.transition_released_forgotten, | ||
("erred", "released"): self.transition_erred_released, | ||
("memory", "released"): self.transition_memory_released, | ||
("released", "erred"): self.transition_released_erred, | ||
} | ||
self._unknown_durations = dict() | ||
if unrunnable is not None: | ||
self._unrunnable = unrunnable | ||
|
@@ -1975,7 +1966,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState: | |
# State Transitions # | ||
##################### | ||
|
||
def _transition(self, key, finish: str, *args, **kwargs): | ||
def _transition_dispatch(self, start: str, finish: str, *args, **kwargs): | ||
start_finish: str = sys_intern(f"{start}_{finish}".replace("-", "_")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be interested in a speed test of this approach versus |
||
|
||
if start_finish is "released_waiting": # noqa: F632 | ||
return self.transition_released_waiting(*args, **kwargs) | ||
elif start_finish is "released_forgotten": # noqa: F632 | ||
return self.transition_released_forgotten(*args, **kwargs) | ||
elif start_finish is "released_erred": # noqa: F632 | ||
return self.transition_released_erred(*args, **kwargs) | ||
elif start_finish is "waiting_released": # noqa: F632 | ||
return self.transition_waiting_released(*args, **kwargs) | ||
elif start_finish is "waiting_processing": # noqa: F632 | ||
return self.transition_waiting_processing(*args, **kwargs) | ||
elif start_finish is "waiting_memory": # noqa: F632 | ||
return self.transition_waiting_memory(*args, **kwargs) | ||
elif start_finish is "processing_released": # noqa: F632 | ||
return self.transition_processing_released(*args, **kwargs) | ||
elif start_finish is "processing_memory": # noqa: F632 | ||
return self.transition_processing_memory(*args, **kwargs) | ||
elif start_finish is "processing_erred": # noqa: F632 | ||
return self.transition_processing_erred(*args, **kwargs) | ||
elif start_finish is "no_worker_released": # noqa: F632 | ||
return self.transition_no_worker_released(*args, **kwargs) | ||
elif start_finish is "no_worker_waiting": # noqa: F632 | ||
return self.transition_no_worker_waiting(*args, **kwargs) | ||
elif start_finish is "memory_released": # noqa: F632 | ||
return self.transition_memory_released(*args, **kwargs) | ||
elif start_finish is "memory_forgotten": # noqa: F632 | ||
return self.transition_memory_forgotten(*args, **kwargs) | ||
elif start_finish is "erred_released": # noqa: F632 | ||
return self.transition_erred_released(*args, **kwargs) | ||
elif start_finish is "erred_forgotten": # noqa: F632 | ||
return self.transition_released_forgotten(*args, **kwargs) | ||
|
||
def _transition( | ||
self, | ||
key, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't |
||
finish: str, | ||
*args, | ||
status: str = None, | ||
thread: Py_ssize_t = -1, | ||
metadata: dict = None, | ||
**kwargs, | ||
): | ||
"""Transition a key from its current state to the finish state | ||
|
||
Examples | ||
|
@@ -1994,7 +2028,6 @@ def _transition(self, key, finish: str, *args, **kwargs): | |
parent: SchedulerState = cast(SchedulerState, self) | ||
ts: TaskState | ||
start: str | ||
start_finish: tuple | ||
finish2: str | ||
recommendations: dict | ||
worker_msgs: dict | ||
|
@@ -2019,12 +2052,10 @@ def _transition(self, key, finish: str, *args, **kwargs): | |
dependents = set(ts._dependents) | ||
dependencies = set(ts._dependencies) | ||
|
||
start_finish = (start, finish) | ||
func = self._transitions_table.get(start_finish) | ||
if func is not None: | ||
a: tuple = func(key, *args, **kwargs) | ||
a: tuple = self._transition_dispatch(start, finish, key, *args, **kwargs) | ||
if a is not None: | ||
recommendations, client_msgs, worker_msgs = a | ||
elif "released" not in start_finish: | ||
elif start != "released" and finish != "released": | ||
assert not args and not kwargs | ||
a_recs: dict | ||
a_cmsgs: dict | ||
|
@@ -2033,11 +2064,12 @@ def _transition(self, key, finish: str, *args, **kwargs): | |
a_recs, a_cmsgs, a_wmsgs = a | ||
|
||
v = a_recs.get(key, finish) | ||
func = self._transitions_table["released", v] | ||
b_recs: dict | ||
b_cmsgs: dict | ||
b_wmsgs: dict | ||
b: tuple = func(key) | ||
b: tuple = self._transition_dispatch( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need |
||
"released", v, key, *args, **kwargs | ||
) | ||
b_recs, b_cmsgs, b_wmsgs = b | ||
|
||
recommendations.update(a_recs) | ||
|
@@ -2070,7 +2102,7 @@ def _transition(self, key, finish: str, *args, **kwargs): | |
|
||
start = "released" | ||
else: | ||
raise RuntimeError("Impossible transition from %r to %r" % start_finish) | ||
raise RuntimeError(f"Impossible transition from {start} to {finish}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would change the logic a little, but would make sense to me to have |
||
|
||
finish2 = ts._state | ||
self.transition_log.append((key, start, finish2, recommendations, time())) | ||
|
@@ -2091,7 +2123,16 @@ def _transition(self, key, finish: str, *args, **kwargs): | |
parent._tasks[ts._key] = ts | ||
for plugin in list(self.plugins): | ||
try: | ||
plugin.transition(key, start, finish2, *args, **kwargs) | ||
plugin.transition( | ||
key, | ||
start, | ||
finish2, | ||
*args, | ||
status=status, | ||
thread=thread, | ||
metadata=metadata, | ||
**kwargs, | ||
) | ||
except Exception: | ||
logger.info("Plugin failed with exception", exc_info=True) | ||
if ts._state == "forgotten": | ||
|
@@ -2158,7 +2199,9 @@ def _transitions(self, recommendations: dict, client_msgs: dict, worker_msgs: di | |
for key in keys: | ||
self.validate_key(key) | ||
|
||
def transition_released_waiting(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_released_waiting(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2213,7 +2256,9 @@ def transition_released_waiting(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_no_worker_waiting(self, key): | ||
@ccall | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why all |
||
@exceptval(check=False) | ||
def transition_no_worker_waiting(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2336,7 +2381,9 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState) -> double: | |
ws._processing[ts] = total_duration | ||
return total_duration | ||
|
||
def transition_waiting_processing(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_waiting_processing(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2383,9 +2430,11 @@ def transition_waiting_processing(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
@ccall | ||
@exceptval(check=False) | ||
def transition_waiting_memory( | ||
self, key, nbytes=None, type=None, typename: str = None, worker=None, **kwargs | ||
): | ||
self, key, nbytes=None, type=None, typename: str = None, worker=None | ||
) -> tuple: | ||
try: | ||
ws: WorkerState = self._workers_dv[worker] | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2423,27 +2472,28 @@ def transition_waiting_memory( | |
pdb.set_trace() | ||
raise | ||
|
||
@ccall | ||
@exceptval(check=False) | ||
def transition_processing_memory( | ||
self, | ||
key, | ||
nbytes=None, | ||
type=None, | ||
typename: str = None, | ||
worker=None, | ||
worker: str = None, | ||
startstops=None, | ||
**kwargs, | ||
): | ||
) -> tuple: | ||
ws: WorkerState | ||
wws: WorkerState | ||
recommendations: dict = {} | ||
client_msgs: dict = {} | ||
worker_msgs: dict = {} | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
assert worker | ||
assert isinstance(worker, str) | ||
|
||
if self._validate: | ||
assert worker | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good move |
||
assert isinstance(worker, str) | ||
assert ts._processing_on | ||
ws = ts._processing_on | ||
assert ts in ws._processing | ||
|
@@ -2544,7 +2594,9 @@ def transition_processing_memory( | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_memory_released(self, key, safe: bint = False): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_memory_released(self, key, safe: bint = False) -> tuple: | ||
ws: WorkerState | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2619,7 +2671,9 @@ def transition_memory_released(self, key, safe: bint = False): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_released_erred(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_released_erred(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2664,7 +2718,9 @@ def transition_released_erred(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_erred_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_erred_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2704,7 +2760,9 @@ def transition_erred_released(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_waiting_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_waiting_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
recommendations: dict = {} | ||
|
@@ -2741,7 +2799,9 @@ def transition_waiting_released(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_processing_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_processing_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2786,9 +2846,18 @@ def transition_processing_released(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
@ccall | ||
@exceptval(check=False) | ||
def transition_processing_erred( | ||
self, key, cause=None, exception=None, traceback=None, **kwargs | ||
): | ||
self, | ||
key, | ||
cause=None, | ||
exception=None, | ||
traceback=None, | ||
worker=None, | ||
text=None, | ||
startstops=None, | ||
) -> tuple: | ||
ws: WorkerState | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2864,7 +2933,9 @@ def transition_processing_erred( | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_no_worker_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_no_worker_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2907,7 +2978,9 @@ def remove_key(self, key): | |
ts._exception_blame = ts._exception = ts._traceback = None | ||
self._task_metadata.pop(key, None) | ||
|
||
def transition_memory_forgotten(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_memory_forgotten(self, key) -> tuple: | ||
ws: WorkerState | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2949,7 +3022,9 @@ def transition_memory_forgotten(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_released_forgotten(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_released_forgotten(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
recommendations: dict = {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@optimize.use_switch(True)
is default, I believe, but maybe worth declaring explicitly.