Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker_key parameter to Adaptive #1992

Merged
merged 1 commit into from
May 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class Adaptive(object):
target_duration: timedelta or str, default "5s"
Amount of time we want a computation to take.
This affects how aggressively we scale up.
worker_key: Callable[WorkerState]
Function to group workers together when scaling down
See Scheduler.workers_to_close for more information
minimum: int
Minimum number of workers to keep around
maximum: int
Expand Down Expand Up @@ -84,8 +87,9 @@ class Adaptive(object):

def __init__(self, scheduler, cluster, interval='1s', startup_cost='1s',
scale_factor=2, minimum=0, maximum=None, wait_count=3,
target_duration='5s', **kwargs):
target_duration='5s', worker_key=lambda x: x, **kwargs):
interval = parse_timedelta(interval, default='ms')
self.worker_key = worker_key
self.scheduler = scheduler
self.cluster = cluster
self.startup_cost = parse_timedelta(startup_cost, default='s')
Expand Down Expand Up @@ -223,7 +227,8 @@ def workers_to_close(self, **kwargs):
@gen.coroutine
def _retire_workers(self, workers=None):
if workers is None:
workers = self.workers_to_close()
workers = self.workers_to_close(key=self.worker_key,
minimum=self.minimum)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the logic for passing in minimum here? Should you also pass in maximum?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minimum here corresponds to the minimum number of workers that we want to keep. The workers_to_close function is a slightly nicer place to handle this than outside. For example if workers_to_close tells us that we need to close all of our workers, we then look at minimum and see that we need one, which worker should we keep? Also, which other workers should we keep that are in the same group. Suddenly we have to replicate the grouping logic.

Maximum doesn't make sense because we're only reducing the number of workers, not increasing them.

if not workers:
raise gen.Return(workers)
with log_errors():
Expand Down Expand Up @@ -275,7 +280,8 @@ def _adapt(self):
self._adapting = True
try:
should_scale_up = self.should_scale_up()
workers = set(self.workers_to_close())
workers = set(self.workers_to_close(key=self.worker_key,
minimum=self.minimum))
if should_scale_up and workers:
logger.info("Attempting to scale up and scale down simultaneously.")
return
Expand Down
21 changes: 17 additions & 4 deletions distributed/deploy/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import print_function, division, absolute_import

import atexit
from datetime import timedelta
import logging
import math
from time import sleep
Expand All @@ -12,7 +13,7 @@
from .cluster import Cluster
from ..core import CommClosedError
from ..utils import (sync, ignoring, All, silence_logging, LoopRunner,
log_errors)
log_errors, thread_state)
from ..nanny import Nanny
from ..scheduler import Scheduler
from ..worker import Worker, _ncores
Expand Down Expand Up @@ -145,12 +146,24 @@ def __repr__(self):
def __await__(self):
return self._started.__await__()

def sync(self, func, *args, **kwargs):
asynchronous = kwargs.pop('asynchronous', None)
if asynchronous or self._asynchronous or getattr(thread_state, 'asynchronous', False):
callback_timeout = kwargs.pop('callback_timeout', None)
future = func(*args, **kwargs)
if callback_timeout is not None:
future = gen.with_timeout(timedelta(seconds=callback_timeout),
future)
return future
else:
return sync(self.loop, func, *args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the grouped worker issue? (I'm not following)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something was slightly nicer if we switched to using self.sync here (this is a common pattern in the client). I decided to clean this up as part of this PR.


def start(self, **kwargs):
self._loop_runner.start()
if self._asynchronous:
self._started = self._start(**kwargs)
else:
sync(self.loop, self._start, **kwargs)
self.sync(self._start, **kwargs)

@gen.coroutine
def _start(self, ip=None, n_workers=0):
Expand Down Expand Up @@ -219,7 +232,7 @@ def start_worker(self, **kwargs):
-------
The created Worker or Nanny object. Can be discarded.
"""
return sync(self.loop, self._start_worker, **kwargs)
return self.sync(self._start_worker, **kwargs)

@gen.coroutine
def _stop_worker(self, w):
Expand All @@ -236,7 +249,7 @@ def stop_worker(self, w):
>>> w = c.start_worker(ncores=2) # doctest: +SKIP
>>> c.stop_worker(w) # doctest: +SKIP
"""
sync(self.loop, self._stop_worker, w)
self.sync(self._stop_worker, w)

@gen.coroutine
def _close(self):
Expand Down
36 changes: 34 additions & 2 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ def __init__(self, *args, **kwargs):
self.min_size = kwargs.pop("min_size", 0)
Adaptive.__init__(self, *args, **kwargs)

def workers_to_close(self):
def workers_to_close(self, **kwargs):
num_workers = len(self.scheduler.workers)
to_close = self.scheduler.workers_to_close()
to_close = self.scheduler.workers_to_close(**kwargs)
if num_workers - len(to_close) < self.min_size:
to_close = to_close[:num_workers - self.min_size]

Expand Down Expand Up @@ -360,3 +360,35 @@ def test_target_duration():
finally:
yield client._close()
yield cluster._close()


@gen_test(timeout=None)
def test_worker_keys():
""" Ensure that redefining adapt with a lower maximum removes workers """
cluster = yield LocalCluster(0, asynchronous=True, processes=False,
scheduler_port=0, silence_logs=False,
diagnostics_port=None)

try:
yield [cluster.start_worker(name='a-1'),
cluster.start_worker(name='a-2'),
cluster.start_worker(name='b-1'),
cluster.start_worker(name='b-2')]

while len(cluster.scheduler.workers) != 4:
yield gen.sleep(0.01)

def key(ws):
return ws.name.split('-')[0]
cluster._adaptive_options = {'worker_key': key}

adaptive = cluster.adapt(minimum=1)
yield adaptive._adapt()

while len(cluster.scheduler.workers) == 4:
yield gen.sleep(0.01)

names = {ws.name for ws in cluster.scheduler.workers.values()}
assert names == {'a-1', 'a-2'} or names == {'b-1', 'b-2'}
finally:
yield cluster._close()
22 changes: 14 additions & 8 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2522,7 +2522,8 @@ def replicate(self, comm=None, keys=None, n=None, workers=None,
'key-count': len(keys),
'branching-factor': branching_factor})

def workers_to_close(self, memory_ratio=None, n=None, key=None):
def workers_to_close(self, memory_ratio=None, n=None, key=None,
minimum=None):
"""
Find workers that we can close with low cost

Expand All @@ -2542,6 +2543,8 @@ def workers_to_close(self, memory_ratio=None, n=None, key=None):
currently have data.
n: int
Number of workers to close
minimum: int
Minimum number of workers to keep around
key: Callable(WorkerState)
An optional callable mapping a WorkerState object to a group
affiliation. Groups will be closed together. This is useful when
Expand Down Expand Up @@ -2597,26 +2600,29 @@ def key(group):
is_idle = not any(ws.processing for ws in groups[group])
bytes = -group_bytes[group]
return (is_idle, bytes)

idle = sorted(groups, key=key)

to_close = []
n_remain = len(self.workers)

while idle:
group = idle.pop()
if n is None and any(ws.processing for ws in groups[group]):
break

limit -= limit_bytes[group]
if minimum and n_remain - len(groups[group]) < minimum:
break

if n is not None and len(to_close) < n:
to_close.append(group)
continue
limit -= limit_bytes[group]

if memory_ratio is not None and limit >= memory_ratio * total:
if ((n is not None and len(to_close) < n) or
(memory_ratio is not None and limit >= memory_ratio * total)):
to_close.append(group)
continue
n_remain -= len(groups[group])

break
else:
break

result = [ws.address for g in to_close for ws in groups[g]]
if result:
Expand Down