Skip to content

Commit

Permalink
remove suffix __recs
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jul 14, 2021
1 parent 2ae1fbe commit d416520
Showing 1 changed file with 45 additions and 45 deletions.
90 changes: 45 additions & 45 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,25 +466,25 @@ def __init__(
validate = dask.config.get("distributed.scheduler.validate")
self.validate = validate
self._transitions_table = {
("constrained", "executing"): self.transition_constrained_executing__recs,
("executing", "error"): self.transition_executing_error__recs,
("long-running", "error"): self.transition_executing_error__recs,
("executing", "long-running"): self.transition_executing_long_running__recs,
("executing", "memory"): self.transition_executing_memory__recs,
("long-running", "memory"): self.transition_executing_memory__recs,
("fetch", "flight"): self.transition_fetch_flight__recs,
("fetch", "released"): self.transition_fetch_released__recs,
("flight", "error"): self.transition_flight_error__recs,
("flight", "fetch"): self.transition_flight_fetch__recs,
("flight", "memory"): self.transition_flight_memory__recs,
("flight", "released"): self.transition_flight_released__recs,
("memory", "released"): self.transition_memoery_released__recs,
("ready", "error"): self.transition_generic_error__recs,
("ready", "executing"): self.transition_ready_executing__recs,
("released", "fetch"): self.transition_released_fetch__recs,
("released", "waiting"): self.transition_released_waiting__recs,
("waiting", "constrained"): self.transition_waiting_constrained__recs,
("waiting", "ready"): self.transition_waiting_ready__recs,
("constrained", "executing"): self.transition_constrained_executing,
("executing", "error"): self.transition_executing_error,
("long-running", "error"): self.transition_executing_error,
("executing", "long-running"): self.transition_executing_long_running,
("executing", "memory"): self.transition_executing_memory,
("long-running", "memory"): self.transition_executing_memory,
("fetch", "flight"): self.transition_fetch_flight,
("fetch", "released"): self.transition_fetch_released,
("flight", "error"): self.transition_flight_error,
("flight", "fetch"): self.transition_flight_fetch,
("flight", "memory"): self.transition_flight_memory,
("flight", "released"): self.transition_flight_released,
("memory", "released"): self.transition_memoery_released,
("ready", "error"): self.transition_generic_error,
("ready", "executing"): self.transition_ready_executing,
("released", "fetch"): self.transition_released_fetch,
("released", "waiting"): self.transition_released_waiting,
("waiting", "constrained"): self.transition_waiting_constrained,
("waiting", "ready"): self.transition_waiting_ready,
}

self._transition_counter = 0
Expand Down Expand Up @@ -1691,7 +1691,7 @@ def compute_task(
for key, value in nbytes.items():
self.tasks[key].nbytes = value

def transition_released_fetch__recs(self, ts, *, stimulus_id):
def transition_released_fetch(self, ts, *, stimulus_id):
if self.validate:
assert ts.state == "released"
assert ts.runspec is None
Expand All @@ -1702,7 +1702,7 @@ def transition_released_fetch__recs(self, ts, *, stimulus_id):
heapq.heappush(self.data_needed, (ts.priority, ts.key))
return {}, []

def transition_released_waiting__recs(self, ts, *, stimulus_id):
def transition_released_waiting(self, ts, *, stimulus_id):
if self.validate:
assert ts.state == "released"
assert all(d.key in self.tasks for d in ts.dependencies)
Expand All @@ -1721,7 +1721,7 @@ def transition_released_waiting__recs(self, ts, *, stimulus_id):
ts.state = "waiting"
return recommendations, []

def transition_fetch_flight__recs(self, ts, worker, *, stimulus_id):
def transition_fetch_flight(self, ts, worker, *, stimulus_id):
if self.validate:
assert ts.state == "fetch"
assert ts.who_has
Expand All @@ -1732,13 +1732,13 @@ def transition_fetch_flight__recs(self, ts, worker, *, stimulus_id):
self.in_flight_tasks += 1
return {}, []

def transition_memoery_released__recs(self, ts, *, stimulus_id):
def transition_memoery_released(self, ts, *, stimulus_id):
ts.state = "released"
ts.protected = False
self.release_key(ts.key)
return {}, []

def transition_waiting_constrained__recs(self, ts, *, stimulus_id):
def transition_waiting_constrained(self, ts, *, stimulus_id):
if self.validate:
assert ts.state == "waiting"
assert not ts.waiting_for_data
Expand All @@ -1752,7 +1752,7 @@ def transition_waiting_constrained__recs(self, ts, *, stimulus_id):
self.constrained.append(ts.key)
return {}, []

def transition_waiting_ready__recs(self, ts, *, stimulus_id):
def transition_waiting_ready(self, ts, *, stimulus_id):
if self.validate:
assert ts.state == "waiting"
assert not ts.waiting_for_data
Expand All @@ -1767,22 +1767,20 @@ def transition_waiting_ready__recs(self, ts, *, stimulus_id):

return {}, []

def transition_generic_error__recs(self, ts, exception, traceback, *, stimulus_id):
def transition_generic_error(self, ts, exception, traceback, *, stimulus_id):
ts.exception = exception
ts.traceback = traceback
smsgs = [self.get_task_state_for_scheduler(ts)]
ts.state = "error"
return {}, smsgs

def transition_executing_error__recs(
self, ts, exception, traceback, *, stimulus_id
):
def transition_executing_error(self, ts, exception, traceback, *, stimulus_id):
self.executing_count -= 1
return self.transition_generic_error__recs(
return self.transition_generic_error(
ts, exception, traceback, stimulus_id=stimulus_id
)

def transition_executing_memory__recs(self, ts, value=no_value, *, stimulus_id):
def transition_executing_memory(self, ts, value=no_value, *, stimulus_id):
if self.validate:
assert ts.state == "executing" or ts.key in self.long_running
assert not ts.waiting_for_data
Expand All @@ -1805,7 +1803,7 @@ def transition_executing_memory__recs(self, ts, value=no_value, *, stimulus_id):
s_msgs.append(self.get_task_state_for_scheduler(ts))
return recommendations, s_msgs

def transition_constrained_executing__recs(self, ts, *, stimulus_id):
def transition_constrained_executing(self, ts, *, stimulus_id):
if self.validate:
assert not ts.waiting_for_data
assert ts.key not in self.data
Expand All @@ -1822,7 +1820,7 @@ def transition_constrained_executing__recs(self, ts, *, stimulus_id):
self.loop.add_callback(self.execute, ts.key, stimulus_id=stimulus_id)
return {}, []

def transition_ready_executing__recs(self, ts, *, stimulus_id):
def transition_ready_executing(self, ts, *, stimulus_id):
if self.validate:
assert not ts.waiting_for_data
assert ts.key not in self.data
Expand All @@ -1837,7 +1835,7 @@ def transition_ready_executing__recs(self, ts, *, stimulus_id):
self.loop.add_callback(self.execute, ts.key, stimulus_id=stimulus_id)
return {}, []

def transition_flight_fetch__recs(self, ts, *, stimulus_id):
def transition_flight_fetch(self, ts, *, stimulus_id):
if self.validate:
assert ts.state == "flight"

Expand All @@ -1851,14 +1849,14 @@ def transition_flight_fetch__recs(self, ts, *, stimulus_id):

return {}, []

def transition_flight_error__recs(self, ts, exception, traceback, *, stimulus_id):
def transition_flight_error(self, ts, exception, traceback, *, stimulus_id):
self.in_flight_tasks -= 1
ts.coming_from = None
return self.transition_generic_error__recs(
return self.transition_generic_error(
ts, exception, traceback, stimulus_id=stimulus_id
)

def transition_flight_released__recs(self, ts, *, stimulus_id):
def transition_flight_released(self, ts, *, stimulus_id):
if self.validate:
assert ts.state == "flight"

Expand All @@ -1875,7 +1873,7 @@ def transition_flight_released__recs(self, ts, *, stimulus_id):

return recommendations, scheduler_msgs

def transition_fetch_released__recs(self, ts, *, stimulus_id):
def transition_fetch_released(self, ts, *, stimulus_id):
if self.validate:
assert ts.state == "fetch"

Expand All @@ -1890,9 +1888,7 @@ def transition_fetch_released__recs(self, ts, *, stimulus_id):

return recommendations, scheduler_msgs

def transition_executing_long_running__recs(
self, ts, compute_duration, *, stimulus_id
):
def transition_executing_long_running(self, ts, compute_duration, *, stimulus_id):

if self.validate:
assert ts.state == "executing"
Expand All @@ -1910,7 +1906,7 @@ def transition_executing_long_running__recs(
self.io_loop.add_callback(self.ensure_computing)
return {}, scheduler_msgs

def transition_flight_memory__recs(self, ts, value, *, stimulus_id):
def transition_flight_memory(self, ts, value, *, stimulus_id):
if self.validate:
assert ts.state == "flight"

Expand Down Expand Up @@ -2157,7 +2153,7 @@ def get_task_state_for_scheduler(self, ts):
"status": "OK",
"key": ts.key,
"nbytes": ts.nbytes,
"thread": self.threads.get(ts.key, list(self.threads.values())[0]),
"thread": self.threads.get(ts.key),
"type": typ_serialized,
"typename": typename(typ),
"metadata": ts.metadata,
Expand All @@ -2167,7 +2163,7 @@ def get_task_state_for_scheduler(self, ts):
"op": "task-erred",
"status": "error",
"key": ts.key,
"thread": self.threads.get(ts.key, list(self.threads.values())[0]),
"thread": self.threads.get(ts.key),
"exception": ts.exception,
"traceback": ts.traceback,
}
Expand Down Expand Up @@ -2275,8 +2271,12 @@ async def gather_dep(
# Keep namespace clean since this func is long and has many
# dep*, *ts* variables

# This is awkward, see FIXME below
for dep_key in to_gather:
# For diagnostics we want to attach the transfer to a single
# task. this task is typically the next to be executed but
# since we're fetching tasks for potentially many
# dependents, an exact match is not possible.
# If there are no dependents, this is a pure replica fetch
dep_ts = cause = self.tasks[dep_key]
for dependent in dep_ts.dependents:
cause = dependent
Expand Down

0 comments on commit d416520

Please sign in to comment.