Skip to content

Commit

Permalink
Send all messages after processing all transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Jan 26, 2021
1 parent 2a32851 commit 496c361
Showing 1 changed file with 82 additions and 19 deletions.
101 changes: 82 additions & 19 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5865,46 +5865,79 @@ def _transition(self, key, finish: str, *args, **kwargs):
worker_msgs: dict
client_msgs: dict
msgs: list
new_msgs: list
dependents: set
dependencies: set
try:
recommendations = {}
worker_msgs = {}
client_msgs = {}

ts = parent._tasks.get(key)
if ts is None:
return recommendations
return recommendations, worker_msgs, client_msgs
start = ts._state
if start == finish:
return recommendations
return recommendations, worker_msgs, client_msgs

if self.plugins:
dependents = set(ts._dependents)
dependencies = set(ts._dependencies)

start_finish = (start, finish)
worker_msgs = {}
client_msgs = {}
func = self._transitions.get(start_finish)
if func is not None:
t: tuple = func(key, *args, **kwargs)
recommendations, worker_msgs, client_msgs = t
a: tuple = func(key, *args, **kwargs)
recommendations, worker_msgs, client_msgs = a
elif "released" not in start_finish:
func = self._transitions["released", finish]
assert not args and not kwargs
a: dict = self._transition(key, "released")
v = a.get(key)
a_recs: dict
a_wmsgs: dict
a_cmsgs: dict
a: tuple = self._transition(key, "released")
a_recs, a_wmsgs, a_cmsgs = a
v = a_recs.get(key)
if v is not None:
func = self._transitions["released", v]
b: dict
t: tuple = func(key)
b, worker_msgs, client_msgs = t
recommendations.update(a)
recommendations.update(b)
b_recs: dict
b_wmsgs: dict
b_cmsgs: dict
b: tuple = func(key)
b_recs, b_wmsgs, b_cmsgs = b

recommendations.update(a_recs)
for w, new_msgs in a_wmsgs.items():
msgs = worker_msgs.get(w)
if msgs is not None:
msgs.extend(new_msgs)
else:
worker_msgs[w] = new_msgs
for c, new_msgs in a_cmsgs.items():
msgs = client_msgs.get(c)
if msgs is not None:
msgs.extend(new_msgs)
else:
client_msgs[c] = new_msgs

recommendations.update(b_recs)
for w, new_msgs in b_wmsgs.items():
msgs = worker_msgs.get(w)
if msgs is not None:
msgs.extend(new_msgs)
else:
worker_msgs[w] = new_msgs
for c, new_msgs in b_cmsgs.items():
msgs = client_msgs.get(c)
if msgs is not None:
msgs.extend(new_msgs)
else:
client_msgs[c] = new_msgs

start = "released"
else:
raise RuntimeError("Impossible transition from %r to %r" % start_finish)

self.send_all(client_msgs, worker_msgs)

finish2 = ts._state
self.transition_log.append((key, start, finish2, recommendations, time()))
if parent._validate:
Expand Down Expand Up @@ -5942,7 +5975,7 @@ def _transition(self, key, finish: str, *args, **kwargs):
ts._prefix._groups.remove(tg)
del parent._task_groups[tg._name]

return recommendations
return recommendations, worker_msgs, client_msgs
except Exception as e:
logger.exception("Error transitioning %r from %r to %r", key, start, finish)
if LOG_PDB:
Expand All @@ -5967,7 +6000,13 @@ def transition(self, key, finish: str, *args, **kwargs):
--------
Scheduler.transitions: transitive version of this function
"""
return self._transition(key, finish, *args, **kwargs)
recommendations: dict
worker_msgs: dict
client_msgs: dict
a: tuple = self._transition(key, finish, *args, **kwargs)
recommendations, worker_msgs, client_msgs = a
self.send_all(client_msgs, worker_msgs)
return recommendations

def transitions(self, recommendations: dict):
"""Process transitions until none are left
Expand All @@ -5978,12 +6017,36 @@ def transitions(self, recommendations: dict):
parent: SchedulerState = cast(SchedulerState, self)
keys: set = set()
recommendations = recommendations.copy()
new: dict
worker_msgs: dict = {}
client_msgs: dict = {}
msgs: list
new_msgs: list
new: tuple
new_recs: dict
new_wmsgs: dict
new_cmsgs: dict
while recommendations:
key, finish = recommendations.popitem()
keys.add(key)

new = self._transition(key, finish)
recommendations.update(new)
new_recs, new_wmsgs, new_cmsgs = new

recommendations.update(new_recs)
for w, new_msgs in new_wmsgs.items():
msgs = worker_msgs.get(w)
if msgs is not None:
msgs.extend(new_msgs)
else:
worker_msgs[w] = new_msgs
for c, new_msgs in new_cmsgs.items():
msgs = client_msgs.get(c)
if msgs is not None:
msgs.extend(new_msgs)
else:
client_msgs[c] = new_msgs

self.send_all(client_msgs, worker_msgs)

if parent._validate:
for key in keys:
Expand Down

0 comments on commit 496c361

Please sign in to comment.