Skip to content

Commit

Permalink
scheduler.py / worker.py code cleanup (#4626)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Mar 30, 2021
1 parent 93f6b58 commit 77a1fd1
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 132 deletions.
107 changes: 48 additions & 59 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3604,29 +3604,27 @@ async def close_worker(self, comm=None, worker=None, safe=None):
def heartbeat_worker(
self,
comm=None,
address=None,
resolve_address=True,
now=None,
resources=None,
host_info=None,
metrics=None,
executing=None,
*,
address,
resolve_address: bool = True,
now: float = None,
resources: dict = None,
host_info: dict = None,
metrics: dict,
executing: dict = None,
):
parent: SchedulerState = cast(SchedulerState, self)
address = self.coerce_address(address, resolve_address)
address = normalize_address(address)
if address not in parent._workers:
ws: WorkerState = parent._workers.get(address)
if ws is None:
return {"status": "missing"}

host = get_address_host(address)
local_now = time()
now = now or time()
assert metrics
host_info = host_info or {}

dh: dict = parent._host_info.get(host)
if dh is None:
parent._host_info[host] = dh = dict()
dh: dict = parent._host_info.setdefault(host, {})
dh["last-seen"] = local_now

frac = 1 / len(parent._workers)
Expand All @@ -3650,26 +3648,20 @@ def heartbeat_worker(
1 - alpha
)

ws: WorkerState = parent._workers[address]

ws._last_seen = time()

ws._last_seen = local_now
if executing is not None:
ws._executing = {
parent._tasks[key]: duration for key, duration in executing.items()
}

if metrics:
ws._metrics = metrics
ws._metrics = metrics

if host_info:
dh: dict = parent._host_info.get(host)
if dh is None:
parent._host_info[host] = dh = dict()
dh: dict = parent._host_info.setdefault(host, {})
dh.update(host_info)

delay = time() - now
ws._time_delay = delay
if now:
ws._time_delay = local_now - now

if resources:
self.add_resources(worker=address, resources=resources)
Expand All @@ -3678,7 +3670,7 @@ def heartbeat_worker(

return {
"status": "OK",
"time": time(),
"time": local_now,
"heartbeat-interval": heartbeat_interval(len(parent._workers)),
}

Expand Down Expand Up @@ -3756,7 +3748,7 @@ async def add_worker(
parent._total_nthreads += nthreads
parent._aliases[name] = address

response = self.heartbeat_worker(
self.heartbeat_worker(
address=address,
resolve_address=resolve_address,
now=now,
Expand Down Expand Up @@ -5331,7 +5323,7 @@ async def rebalance(self, comm=None, keys=None, workers=None):
map(first, sorted(worker_bytes.items(), key=second, reverse=True))
)

recipients = iter(reversed(sorted_workers))
recipients = reversed(sorted_workers)
recipient = next(recipients)
msgs = [] # (sender, recipient, key)
for sender in sorted_workers[: len(workers) // 2]:
Expand All @@ -5343,19 +5335,16 @@ async def rebalance(self, comm=None, keys=None, workers=None):
)

try:
while worker_bytes[sender] > avg:
while (
worker_bytes[recipient] < avg
and worker_bytes[sender] > avg
):
while avg < worker_bytes[sender]:
while worker_bytes[recipient] < avg < worker_bytes[sender]:
ts, nb = next(sender_keys)
if ts not in tasks_by_worker[recipient]:
tasks_by_worker[recipient].add(ts)
# tasks_by_worker[sender].remove(ts)
msgs.append((sender, recipient, ts))
worker_bytes[sender] -= nb
worker_bytes[recipient] += nb
if worker_bytes[sender] > avg:
if avg < worker_bytes[sender]:
recipient = next(recipients)
except StopIteration:
break
Expand Down Expand Up @@ -5386,7 +5375,7 @@ async def rebalance(self, comm=None, keys=None, workers=None):
},
)

if not all(r["status"] == "OK" for r in result):
if any(r["status"] != "OK" for r in result):
return {
"status": "missing-data",
"keys": tuple(
Expand Down Expand Up @@ -5687,7 +5676,7 @@ async def retire_workers(
workers: list (optional)
List of worker addresses to retire.
If not provided we call ``workers_to_close`` which finds a good set
workers_names: list (optional)
names: list (optional)
List of worker names to retire.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
Expand Down Expand Up @@ -5715,30 +5704,31 @@ async def retire_workers(
with log_errors():
async with self._lock if lock else empty_context:
if names is not None:
if workers is not None:
raise TypeError("names and workers are mutually exclusive")
if names:
logger.info("Retire worker names %s", names)
names = set(map(str, names))
workers = [
workers = {
ws._address
for ws in parent._workers_dv.values()
if str(ws._name) in names
]
if workers is None:
}
elif workers is None:
while True:
try:
workers = self.workers_to_close(**kwargs)
if workers:
workers = await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
return workers
else:
if not workers:
return {}
return await self.retire_workers(
workers=workers,
remove=remove,
close_workers=close_workers,
lock=False,
)
except KeyError: # keys left during replicate
pass

workers = {
parent._workers_dv[w] for w in workers if w in parent._workers_dv
}
Expand All @@ -5750,22 +5740,21 @@ async def retire_workers(
keys = set.union(*[w.has_what for w in workers])
keys = {ts._key for ts in keys if ts._who_has.issubset(workers)}

other_workers = set(parent._workers_dv.values()) - workers
if keys:
if other_workers:
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)
else:
other_workers = set(parent._workers_dv.values()) - workers
if not other_workers:
return {}
logger.info("Moving %d keys to other workers", len(keys))
await self.replicate(
keys=keys,
workers=[ws._address for ws in other_workers],
n=1,
delete=False,
lock=False,
)

worker_keys = {ws._address: ws.identity() for ws in workers}
if close_workers and worker_keys:
if close_workers:
await asyncio.gather(
*[self.close_worker(worker=w, safe=True) for w in worker_keys]
)
Expand Down
Loading

0 comments on commit 77a1fd1

Please sign in to comment.