Skip to content

Commit

Permalink
[Adaptive] Do not allow workers to downscale if they are running long…
Browse files Browse the repository at this point in the history
…-running tasks (e.g. `worker_client`) (#7481)
  • Loading branch information
fjetter authored Dec 18, 2023
1 parent 415d4fa commit 53e95ec
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
7 changes: 6 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6935,7 +6935,12 @@ def workers_to_close(
if isinstance(key, bytes):
key = pickle.loads(key)

groups = groupby(key, self.workers.values())
# Long running tasks typically use a worker_client to schedule
# other tasks. We should never shut down the worker they're
# running on, as it would cause them to restart from scratch
# somewhere else.
valid_workers = [ws for ws in self.workers.values() if not ws.long_running]
groups = groupby(key, valid_workers)

limit_bytes = {k: sum(ws.memory_limit for ws in v) for k, v in groups.items()}
group_bytes = {k: sum(ws.nbytes for ws in v) for k, v in groups.items()}
Expand Down
32 changes: 32 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,38 @@ def key(ws):
assert set(s.workers_to_close(key=key)) == {workers[0].address, workers[1].address}


@pytest.mark.parametrize("reverse", [True, False])
@gen_cluster(client=True)
async def test_workers_to_close_never_close_long_running(c, s, a, b, reverse):
if reverse:
a, b = b, a
wait_evt = Event()

def executing(evt):
evt.wait()

def long_running_secede(evt):
secede()
evt.wait()

assert a.address in s.workers_to_close()
assert b.address in s.workers_to_close()
long_fut = c.submit(long_running_secede, wait_evt, workers=[a.address])
wsA = s.workers[a.address]
while not wsA.long_running:
await asyncio.sleep(0.01)
assert s.workers_to_close() == [b.address]
futs = [c.submit(executing, wait_evt, workers=[b.address]) for _ in range(10)]
assert a.address not in s.workers_to_close(n=2)
while not b.state.tasks:
await asyncio.sleep(0.01)
assert s.workers_to_close() == []
assert s.workers_to_close(n=1) == [b.address]
assert s.workers_to_close(n=2) == [b.address]

await wait_evt.set()


@gen_cluster(client=True)
async def test_retire_workers_no_suspicious_tasks(c, s, a, b):
future = c.submit(
Expand Down

0 comments on commit 53e95ec

Please sign in to comment.