diff --git a/dask_drmaa/adaptive.py b/dask_drmaa/adaptive.py index d09ab70..6c193ec 100644 --- a/dask_drmaa/adaptive.py +++ b/dask_drmaa/adaptive.py @@ -38,8 +38,9 @@ def __init__(self, scheduler=None, cluster=None, interval=1000, startup_cost=1): scheduler = cluster.scheduler self.scheduler = scheduler self.startup_cost = startup_cost - self._adapt_callback = PeriodicCallback(self._adapt, interval, - self.scheduler.loop) + self._adapt_callback = PeriodicCallback(callback=self._adapt, + callback_time=interval, + io_loop=self.scheduler.loop) self.scheduler.loop.add_callback(self._adapt_callback.start) self._adapting = False @@ -63,20 +64,35 @@ def _adapt(self): self._adapting = True try: + busy = {w for w in s.workers + if len(s.processing[w]) > 2 * s.ncores[w] + and s.occupancy[w] > self.startup_cost * 2} + if s.unrunnable or busy: + if any(get_session().jobStatus(jid) == 'queued_active' for + jid in self.cluster.workers): # TODO: is this slow? + return + if len(s.workers) < len(self.cluster.workers): + # TODO: this depends on reliable cleanup of closed workers + return if s.unrunnable: - key = first(s.unrunnable) - memory = s.resource_restrictions[key]['memory'] - - # We need a worker with more resources. See if one has already been requested. - for worker, resources in self.cluster.workers.items(): - if (resources.get("memory", 0) >= memory * 2 and - get_session().jobStatus(worker) in ('running', 'queued_active')): - #There is already an existing valid worker requested with the necessary - # resources to run this task. If the worker has any other status (like DONE, HOLD, etc.), scheduler another task. - break + duration = 0 + memory = [] + for key in s.unrunnable: + duration += s.task_duration.get(key, 0.1) + if key in s.resource_restrictions: + m = s.resource_restrictions[key].get('memory') + if m: + memory.append(m) + + if memory: + workers = self.cluster.start_workers(1, memory=max(memory) * 2) else: - logger.info("Starting worker for unrunnable {}".format(key)) - self.cluster.start_workers(1, memory=memory * 2) + workers = self.cluster.start_workers(1) + logger.info("Starting workers due to resource constraints: %s", workers) + + if busy and not s.idle: + workers = self.cluster.start_workers(len(busy)) + logger.info("Starting workers due to over-saturation: %s", workers) yield self._retire_workers() finally: diff --git a/dask_drmaa/tests/test_adaptive.py b/dask_drmaa/tests/test_adaptive.py index 83a4d5a..ff42dd9 100644 --- a/dask_drmaa/tests/test_adaptive.py +++ b/dask_drmaa/tests/test_adaptive.py @@ -1,11 +1,12 @@ from time import sleep, time import pytest +from toolz import first from dask_drmaa import SGECluster from dask_drmaa.adaptive import Adaptive from distributed import Client -from distributed.utils_test import loop, inc +from distributed.utils_test import loop, inc, slowinc def test_adaptive_memory(loop): @@ -25,9 +26,82 @@ def test_adaptive_memory(loop): sleep(0.3) assert time() < start + 10 - """ # TODO: jobs aren't shutting down when process ends + """ # TODO: jobs aren't shutting down when process endst start = time() while cluster.workers: sleep(0.1) assert time() < start + 60 """ + + +def test_adaptive_normal_tasks(loop): + with SGECluster(scheduler_port=0) as cluster: + adapt = Adaptive(cluster=cluster) + with Client(cluster, loop=loop) as client: + future = client.submit(inc, 1) + assert future.result() == 2 + + +@pytest.mark.parametrize('interval', [50, 1000]) +def test_dont_over_request(loop, interval): + with SGECluster(scheduler_port=0) as cluster: + adapt = Adaptive(cluster=cluster) + with Client(cluster, loop=loop) as client: + future = client.submit(inc, 1) + assert future.result() == 2 + assert len(cluster.scheduler.workers) == 1 + + for i in range(5): + sleep(0.2) + assert len(cluster.scheduler.workers) == 1 + + +def test_request_more_than_one(loop): + with SGECluster(scheduler_port=0) as cluster: + adapt = Adaptive(cluster=cluster) + with Client(cluster, loop=loop) as client: + futures = client.map(slowinc, range(1000), delay=0.2) + while len(cluster.scheduler.workers) < 3: + sleep(0.1) + + +def test_dont_request_if_idle(loop): + with SGECluster(scheduler_port=0) as cluster: + cluster.start_workers(1) + with Client(cluster, loop=loop) as client: + while not cluster.scheduler.workers: + sleep(0.1) + futures = client.map(slowinc, range(1000), delay=0.2, + workers=first(cluster.scheduler.workers)) + adapt = Adaptive(cluster=cluster, interval=2000) + + for i in range(60): + sleep(0.1) + assert len(cluster.workers) < 5 + + +def test_dont_request_if_not_enough_tasks(loop): + with SGECluster(scheduler_port=0) as cluster: + adapt = Adaptive(cluster=cluster) + with Client(cluster, loop=loop) as client: + cluster.scheduler.task_duration['slowinc'] = 1000 + future = client.submit(slowinc, 1, delay=1000) + + for i in range(50): + sleep(0.1) + assert len(cluster.workers) < 2 + + +def test_dont_request_on_many_short_tasks(loop): + with SGECluster(scheduler_port=0) as cluster: + adapt = Adaptive(cluster=cluster, interval=50, startup_cost=10) + with Client(cluster, loop=loop) as client: + cluster.scheduler.task_duration['slowinc'] = 0.001 + futures = client.map(slowinc, range(1000), delay=0.001) + + while not cluster.scheduler.workers: + sleep(0.01) + + for i in range(20): + sleep(0.1) + assert len(cluster.workers) < 2