Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Subclass distributed.Adaptive #35

Merged
merged 2 commits into from
Aug 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 63 additions & 66 deletions dask_drmaa/adaptive.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
from __future__ import print_function, division, absolute_import

import logging
from distributed.utils import log_errors
import warnings

from toolz import first
from distributed import Scheduler
from distributed.utils import log_errors
from distributed.deploy import adaptive
from tornado import gen
from tornado.ioloop import PeriodicCallback

from .core import get_session

logger = logging.getLogger(__file__)


class Adaptive(object):
class Adaptive(adaptive.Adaptive):
'''
Adaptively allocate workers based on scheduler load. A superclass.

Contains logic to dynamically resize a Dask cluster based on current use.

Parameters
----------
scheduler: distributed.Scheduler
cluster: object
Must have scale_up and scale_down methods/coroutines
scheduler: distributed.Scheduler

Examples
--------
Expand All @@ -32,71 +33,67 @@ class Adaptive(object):
... def scale_down(self, workers):
... """ Remove worker addresses from cluster """
'''
def __init__(self, cluster=None, scheduler=None, interval=1000, startup_cost=1):
self.cluster = cluster
def __init__(self, cluster=None, scheduler=None, interval=1000,
startup_cost=1, scale_factor=2):
if cluster is None:
raise TypeError("`Adaptive.__init__() missing required argument: "
"`cluster`")

if isinstance(cluster, Scheduler):
warnings.warn("The ``cluster`` and ``scheduler`` arguments to "
"Adaptive.__init__ will switch positions in a future"
" release. Please use keyword arguments.",
FutureWarning)
cluster, scheduler = scheduler, cluster
if scheduler is None:
scheduler = cluster.scheduler
self.scheduler = scheduler
self.startup_cost = startup_cost
self._adapt_callback = PeriodicCallback(callback=self._adapt,
callback_time=interval,
io_loop=self.scheduler.loop)
self.scheduler.loop.add_callback(self._adapt_callback.start)
self._adapting = False

super(Adaptive, self).__init__(scheduler, cluster, interval,
startup_cost=startup_cost,
scale_factor=scale_factor)

def get_busy_workers(self):
s = self.scheduler
busy = {w for w in s.workers
if len(s.processing[w]) > 2 * s.ncores[w]
and s.occupancy[w] > self.startup_cost * 2}
return busy

def needs_cpu(self):
# don't want to call super(), since it ignores number of tasks
s = self.scheduler
busy = self.get_busy_workers()
if s.unrunnable or busy:
if any(get_session().jobStatus(jid) == 'queued_active' for
jid in self.cluster.workers): # TODO: is this slow?
return False
if len(s.workers) < len(self.cluster.workers):
# TODO: this depends on reliable cleanup of closed workers
return False
return True

def get_scale_up_kwargs(self):
instances = max(1, len(self.scheduler.ncores) * self.scale_factor)
kwargs = {'n': max(instances, len(self.get_busy_workers()))}
memory = []
if self.scheduler.unrunnable:
for key in self.scheduler.unrunnable:
duration = 0
memory = []
duration += self.scheduler.task_duration.get(key, 0.1)

if key in self.scheduler.resource_restrictions:
m = self.scheduler.resource_restrictions[key].get('memory')
if m:
memory.append(m)
if memory:
kwargs['memory'] = max(memory) * 4
logger.info("Starting workers due to resource constraints: %s",
kwargs['n'])
return kwargs

@gen.coroutine
def _retire_workers(self):
"""
Get the cluster scheduler to cleanup any workers it decides can retire
"""
with log_errors():
workers = yield self.scheduler.retire_workers(close=True)
workers = yield self.scheduler.retire_workers(close_workers=True)
logger.info("Retiring workers {}".format(workers))

@gen.coroutine
def _adapt(self):
logger.info("Adapting")
with log_errors():
if self._adapting: # Semaphore to avoid overlapping adapt calls
return

s = self.scheduler

self._adapting = True
try:
busy = {w for w in s.workers
if len(s.processing[w]) > 2 * s.ncores[w]
and s.occupancy[w] > self.startup_cost * 2}
if s.unrunnable or busy:
if any(get_session().jobStatus(jid) == 'queued_active' for
jid in self.cluster.workers): # TODO: is this slow?
return
if len(s.workers) < len(self.cluster.workers):
# TODO: this depends on reliable cleanup of closed workers
return
if s.unrunnable:
duration = 0
memory = []
for key in s.unrunnable:
duration += s.task_duration.get(key, 0.1)
if key in s.resource_restrictions:
m = s.resource_restrictions[key].get('memory')
if m:
memory.append(m)

if memory:
workers = self.cluster.start_workers(1, memory=max(memory) * 4)
else:
workers = self.cluster.start_workers(1)
logger.info("Starting workers due to resource constraints: %s", workers)

if busy and not s.idle:
workers = self.cluster.start_workers(len(busy))
logger.info("Starting workers due to over-saturation: %s", workers)

yield self._retire_workers()
finally:
self._adapting = False

def adapt(self):
self.scheduler.loop.add_callback(self._adapt)
10 changes: 10 additions & 0 deletions dask_drmaa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ def stop_workers(self, worker_ids, sync=False):
if sync:
get_session().synchronize(worker_ids, dispose=True)

@gen.coroutine
def scale_up(self, n, **kwargs):
yield [self.start_workers(**kwargs)
for _ in range(n - len(self.workers))]

@gen.coroutine
def scale_down(self, workers):
workers = set(workers)
yield self.scheduler.retire_workers(workers=workers)

def close(self):
logger.info("Closing DRMAA cluster")
self.local_cluster.close()
Expand Down
9 changes: 8 additions & 1 deletion dask_drmaa/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def test_adaptive_memory(loop):
with SGECluster(scheduler_port=0, cleanup_interval=100) as cluster:
adapt = Adaptive(cluster)
adapt = Adaptive(cluster, cluster.scheduler)
with Client(cluster, loop=loop) as client:
future = client.submit(inc, 1, resources={'memory': 1e9})
assert future.result() == 2
Expand Down Expand Up @@ -103,3 +103,10 @@ def test_dont_request_on_many_short_tasks(loop):
for i in range(20):
sleep(0.1)
assert len(cluster.workers) < 2


def test_order_warns(loop):
with SGECluster(scheduler_port=0) as cluster:
scheduler = cluster.scheduler
with pytest.warns(FutureWarning):
adapt = Adaptive(scheduler, cluster)