From 6959d74acd5683962231d453c2c20c221a1e0105 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 21 May 2018 14:32:57 -0400 Subject: [PATCH] Add worker_key parameter to Adaptive This allows adaptive clusters to intelligently close down groups of workers based on some logical association. See https://github.com/dask/dask-jobqueue/pull/63 for motivation --- distributed/deploy/adaptive.py | 12 ++++++-- distributed/deploy/local.py | 21 ++++++++++--- distributed/deploy/tests/test_adaptive.py | 36 +++++++++++++++++++++-- distributed/scheduler.py | 22 +++++++++----- 4 files changed, 74 insertions(+), 17 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index ae73e301a39..d7ae6ea5997 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -41,6 +41,9 @@ class Adaptive(object): target_duration: timedelta or str, default "5s" Amount of time we want a computation to take. This affects how aggressively we scale up. + worker_key: Callable[WorkerState] + Function to group workers together when scaling down + See Scheduler.workers_to_close for more information minimum: int Minimum number of workers to keep around maximum: int @@ -84,8 +87,9 @@ class Adaptive(object): def __init__(self, scheduler, cluster, interval='1s', startup_cost='1s', scale_factor=2, minimum=0, maximum=None, wait_count=3, - target_duration='5s', **kwargs): + target_duration='5s', worker_key=lambda x: x, **kwargs): interval = parse_timedelta(interval, default='ms') + self.worker_key = worker_key self.scheduler = scheduler self.cluster = cluster self.startup_cost = parse_timedelta(startup_cost, default='s') @@ -223,7 +227,8 @@ def workers_to_close(self, **kwargs): @gen.coroutine def _retire_workers(self, workers=None): if workers is None: - workers = self.workers_to_close() + workers = self.workers_to_close(key=self.worker_key, + minimum=self.minimum) if not workers: raise gen.Return(workers) with log_errors(): @@ -275,7 +280,8 @@ def _adapt(self): self._adapting = True try: should_scale_up = self.should_scale_up() - workers = set(self.workers_to_close()) + workers = set(self.workers_to_close(key=self.worker_key, + minimum=self.minimum)) if should_scale_up and workers: logger.info("Attempting to scale up and scale down simultaneously.") return diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index ab83073de3e..399a4746b51 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -1,6 +1,7 @@ from __future__ import print_function, division, absolute_import import atexit +from datetime import timedelta import logging import math from time import sleep @@ -12,7 +13,7 @@ from .cluster import Cluster from ..core import CommClosedError from ..utils import (sync, ignoring, All, silence_logging, LoopRunner, - log_errors) + log_errors, thread_state) from ..nanny import Nanny from ..scheduler import Scheduler from ..worker import Worker, _ncores @@ -145,12 +146,24 @@ def __repr__(self): def __await__(self): return self._started.__await__() + def sync(self, func, *args, **kwargs): + asynchronous = kwargs.pop('asynchronous', None) + if asynchronous or self._asynchronous or getattr(thread_state, 'asynchronous', False): + callback_timeout = kwargs.pop('callback_timeout', None) + future = func(*args, **kwargs) + if callback_timeout is not None: + future = gen.with_timeout(timedelta(seconds=callback_timeout), + future) + return future + else: + return sync(self.loop, func, *args, **kwargs) + def start(self, **kwargs): self._loop_runner.start() if self._asynchronous: self._started = self._start(**kwargs) else: - sync(self.loop, self._start, **kwargs) + self.sync(self._start, **kwargs) @gen.coroutine def _start(self, ip=None, n_workers=0): @@ -219,7 +232,7 @@ def start_worker(self, **kwargs): ------- The created Worker or Nanny object. Can be discarded. """ - return sync(self.loop, self._start_worker, **kwargs) + return self.sync(self._start_worker, **kwargs) @gen.coroutine def _stop_worker(self, w): @@ -236,7 +249,7 @@ def stop_worker(self, w): >>> w = c.start_worker(ncores=2) # doctest: +SKIP >>> c.stop_worker(w) # doctest: +SKIP """ - sync(self.loop, self._stop_worker, w) + self.sync(self._stop_worker, w) @gen.coroutine def _close(self): diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 443d30813d8..9b73756bcfb 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -131,9 +131,9 @@ def __init__(self, *args, **kwargs): self.min_size = kwargs.pop("min_size", 0) Adaptive.__init__(self, *args, **kwargs) - def workers_to_close(self): + def workers_to_close(self, **kwargs): num_workers = len(self.scheduler.workers) - to_close = self.scheduler.workers_to_close() + to_close = self.scheduler.workers_to_close(**kwargs) if num_workers - len(to_close) < self.min_size: to_close = to_close[:num_workers - self.min_size] @@ -360,3 +360,35 @@ def test_target_duration(): finally: yield client._close() yield cluster._close() + + +@gen_test(timeout=None) +def test_worker_keys(): + """ Ensure that redefining adapt with a lower maximum removes workers """ + cluster = yield LocalCluster(0, asynchronous=True, processes=False, + scheduler_port=0, silence_logs=False, + diagnostics_port=None) + + try: + yield [cluster.start_worker(name='a-1'), + cluster.start_worker(name='a-2'), + cluster.start_worker(name='b-1'), + cluster.start_worker(name='b-2')] + + while len(cluster.scheduler.workers) != 4: + yield gen.sleep(0.01) + + def key(ws): + return ws.name.split('-')[0] + cluster._adaptive_options = {'worker_key': key} + + adaptive = cluster.adapt(minimum=1) + yield adaptive._adapt() + + while len(cluster.scheduler.workers) == 4: + yield gen.sleep(0.01) + + names = {ws.name for ws in cluster.scheduler.workers.values()} + assert names == {'a-1', 'a-2'} or names == {'b-1', 'b-2'} + finally: + yield cluster._close() diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 29b69dc6c68..bfe579ebf80 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2522,7 +2522,8 @@ def replicate(self, comm=None, keys=None, n=None, workers=None, 'key-count': len(keys), 'branching-factor': branching_factor}) - def workers_to_close(self, memory_ratio=None, n=None, key=None): + def workers_to_close(self, memory_ratio=None, n=None, key=None, + minimum=None): """ Find workers that we can close with low cost @@ -2542,6 +2543,8 @@ def workers_to_close(self, memory_ratio=None, n=None, key=None): currently have data. n: int Number of workers to close + minimum: int + Minimum number of workers to keep around key: Callable(WorkerState) An optional callable mapping a WorkerState object to a group affiliation. Groups will be closed together. This is useful when @@ -2597,26 +2600,29 @@ def key(group): is_idle = not any(ws.processing for ws in groups[group]) bytes = -group_bytes[group] return (is_idle, bytes) + idle = sorted(groups, key=key) to_close = [] + n_remain = len(self.workers) while idle: group = idle.pop() if n is None and any(ws.processing for ws in groups[group]): break - limit -= limit_bytes[group] + if minimum and n_remain - len(groups[group]) < minimum: + break - if n is not None and len(to_close) < n: - to_close.append(group) - continue + limit -= limit_bytes[group] - if memory_ratio is not None and limit >= memory_ratio * total: + if ((n is not None and len(to_close) < n) or + (memory_ratio is not None and limit >= memory_ratio * total)): to_close.append(group) - continue + n_remain -= len(groups[group]) - break + else: + break result = [ws.address for g in to_close for ws in groups[g]] if result: