Skip to content

Commit

Permalink
Use .append instead of .extend in transition
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Jan 26, 2021
1 parent 5a86761 commit be4e152
Showing 1 changed file with 41 additions and 30 deletions.
71 changes: 41 additions & 30 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1965,7 +1965,7 @@ def transition_waiting_processing(self, key):

# logger.debug("Send job to worker: %s, %s", worker, key)

worker_msgs[worker] = [_task_to_msg(self, ts)]
worker_msgs[worker] = _task_to_msg(self, ts)

return {}, worker_msgs, client_msgs
except Exception as e:
Expand Down Expand Up @@ -2172,13 +2172,11 @@ def transition_memory_released(self, key, safe: bint = False):
ws._has_what.remove(ts)
ws._nbytes -= ts.get_nbytes()
ts._group._nbytes_in_memory -= ts.get_nbytes()
worker_msgs[ws._address] = [
{
"op": "delete-data",
"keys": [key],
"report": False,
}
]
worker_msgs[ws._address] = {
"op": "delete-data",
"keys": [key],
"report": False,
}

ts._who_has.clear()

Expand All @@ -2187,7 +2185,7 @@ def transition_memory_released(self, key, safe: bint = False):
report_msg = {"op": "lost-data", "key": key}
cs: ClientState
for cs in ts._who_wants:
client_msgs[cs._client_key] = [report_msg]
client_msgs[cs._client_key] = report_msg

if not ts._run_spec: # pure data
recommendations[key] = "forgotten"
Expand Down Expand Up @@ -2240,7 +2238,7 @@ def transition_released_erred(self, key):
}
cs: ClientState
for cs in ts._who_wants:
client_msgs[cs._client_key] = [report_msg]
client_msgs[cs._client_key] = report_msg

ts.state = "erred"

Expand Down Expand Up @@ -2282,7 +2280,7 @@ def transition_erred_released(self, key):
report_msg = {"op": "task-retried", "key": key}
cs: ClientState
for cs in ts._who_wants:
client_msgs[cs._client_key] = [report_msg]
client_msgs[cs._client_key] = report_msg

ts.state = "released"

Expand Down Expand Up @@ -2349,7 +2347,7 @@ def transition_processing_released(self, key):

w: str = _remove_from_processing(self, ts)
if w:
worker_msgs[w] = [{"op": "release-task", "key": key}]
worker_msgs[w] = {"op": "release-task", "key": key}

ts.state = "released"

Expand Down Expand Up @@ -2438,7 +2436,7 @@ def transition_processing_erred(
}
cs: ClientState
for cs in ts._who_wants:
client_msgs[cs._client_key] = [report_msg]
client_msgs[cs._client_key] = report_msg

cs = self._clients["fire-and-forget"]
if ts in cs._wants_what:
Expand Down Expand Up @@ -5867,7 +5865,6 @@ def _transition(self, key, finish: str, *args, **kwargs):
worker_msgs: dict
client_msgs: dict
msgs: list
new_msgs: list
dependents: set
dependencies: set
try:
Expand All @@ -5890,7 +5887,21 @@ def _transition(self, key, finish: str, *args, **kwargs):
func = self._transitions.get(start_finish)
if func is not None:
a: tuple = func(key, *args, **kwargs)
recommendations, worker_msgs, client_msgs = a
wmsgs: dict
cmsgs: dict
recommendations, wmsgs, cmsgs = a
for w, new_msg in wmsgs.items():
msgs = worker_msgs.get(w)
if msgs is not None:
msgs.append(new_msg)
else:
worker_msgs[w] = [new_msg]
for c, new_msg in cmsgs.items():
msgs = client_msgs.get(c)
if msgs is not None:
msgs.append(new_msg)
else:
client_msgs[c] = [new_msg]
elif "released" not in start_finish:
func = self._transitions["released", finish]
assert not args and not kwargs
Expand All @@ -5909,32 +5920,32 @@ def _transition(self, key, finish: str, *args, **kwargs):
b_recs, b_wmsgs, b_cmsgs = b

recommendations.update(a_recs)
for w, new_msgs in a_wmsgs.items():
for w, new_msg in a_wmsgs.items():
msgs = worker_msgs.get(w)
if msgs is not None:
msgs.extend(new_msgs)
msgs.append(new_msg)
else:
worker_msgs[w] = new_msgs
for c, new_msgs in a_cmsgs.items():
worker_msgs[w] = [new_msg]
for c, new_msg in a_cmsgs.items():
msgs = client_msgs.get(c)
if msgs is not None:
msgs.extend(new_msgs)
msgs.append(new_msg)
else:
client_msgs[c] = new_msgs
client_msgs[c] = [new_msg]

recommendations.update(b_recs)
for w, new_msgs in b_wmsgs.items():
for w, new_msg in b_wmsgs.items():
msgs = worker_msgs.get(w)
if msgs is not None:
msgs.extend(new_msgs)
msgs.append(new_msg)
else:
worker_msgs[w] = new_msgs
for c, new_msgs in b_cmsgs.items():
worker_msgs[w] = [new_msg]
for c, new_msg in b_cmsgs.items():
msgs = client_msgs.get(c)
if msgs is not None:
msgs.extend(new_msgs)
msgs.append(new_msg)
else:
client_msgs[c] = new_msgs
client_msgs[c] = [new_msg]

start = "released"
else:
Expand Down Expand Up @@ -6630,7 +6641,7 @@ def _add_to_memory(
report_msg["type"] = type

for cs in ts._who_wants:
client_msgs[cs._client_key] = [report_msg]
client_msgs[cs._client_key] = report_msg

ts.state = "memory"
ts._type = typename
Expand Down Expand Up @@ -6684,7 +6695,7 @@ def _propagate_forgotten(
ws._nbytes -= ts.get_nbytes()
w: str = ws._address
if w in state._workers_dv: # in case worker has died
worker_msgs[w] = [{"op": "delete-data", "keys": [key], "report": False}]
worker_msgs[w] = {"op": "delete-data", "keys": [key], "report": False}
ts._who_has.clear()


Expand Down Expand Up @@ -6791,7 +6802,7 @@ def _task_to_client_msgs(state: SchedulerState, ts: TaskState) -> dict:

client_msgs: dict = {}
for k in client_keys:
client_msgs[k] = [report_msg]
client_msgs[k] = report_msg

return client_msgs

Expand Down

0 comments on commit be4e152

Please sign in to comment.