Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Clean up adapting #13

Merged
merged 4 commits into from
Jan 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions dask_drmaa/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def __init__(self, scheduler=None, cluster=None, interval=1000, startup_cost=1):
scheduler = cluster.scheduler
self.scheduler = scheduler
self.startup_cost = startup_cost
self._adapt_callback = PeriodicCallback(self._adapt, interval,
self.scheduler.loop)
self._adapt_callback = PeriodicCallback(callback=self._adapt,
callback_time=interval,
io_loop=self.scheduler.loop)
self.scheduler.loop.add_callback(self._adapt_callback.start)
self._adapting = False

Expand All @@ -63,20 +64,35 @@ def _adapt(self):

self._adapting = True
try:
busy = {w for w in s.workers
if len(s.processing[w]) > 2 * s.ncores[w]
and s.occupancy[w] > self.startup_cost * 2}
if s.unrunnable or busy:
if any(get_session().jobStatus(jid) == 'queued_active' for
jid in self.cluster.workers): # TODO: is this slow?
return
if len(s.workers) < len(self.cluster.workers):
# TODO: this depends on reliable cleanup of closed workers
return
if s.unrunnable:
key = first(s.unrunnable)
memory = s.resource_restrictions[key]['memory']

# We need a worker with more resources. See if one has already been requested.
for worker, resources in self.cluster.workers.items():
if (resources.get("memory", 0) >= memory * 2 and
get_session().jobStatus(worker) in ('running', 'queued_active')):
#There is already an existing valid worker requested with the necessary
# resources to run this task. If the worker has any other status (like DONE, HOLD, etc.), scheduler another task.
break
duration = 0
memory = []
for key in s.unrunnable:
duration += s.task_duration.get(key, 0.1)
if key in s.resource_restrictions:
m = s.resource_restrictions[key].get('memory')
if m:
memory.append(m)

if memory:
workers = self.cluster.start_workers(1, memory=max(memory) * 2)
else:
logger.info("Starting worker for unrunnable {}".format(key))
self.cluster.start_workers(1, memory=memory * 2)
workers = self.cluster.start_workers(1)
logger.info("Starting workers due to resource constraints: %s", workers)

if busy and not s.idle:
workers = self.cluster.start_workers(len(busy))
logger.info("Starting workers due to over-saturation: %s", workers)

yield self._retire_workers()
finally:
Expand Down
78 changes: 76 additions & 2 deletions dask_drmaa/tests/test_adaptive.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from time import sleep, time

import pytest
from toolz import first

from dask_drmaa import SGECluster
from dask_drmaa.adaptive import Adaptive
from distributed import Client
from distributed.utils_test import loop, inc
from distributed.utils_test import loop, inc, slowinc


def test_adaptive_memory(loop):
Expand All @@ -25,9 +26,82 @@ def test_adaptive_memory(loop):
sleep(0.3)
assert time() < start + 10

""" # TODO: jobs aren't shutting down when process ends
""" # TODO: jobs aren't shutting down when process endst
start = time()
while cluster.workers:
sleep(0.1)
assert time() < start + 60
"""


def test_adaptive_normal_tasks(loop):
with SGECluster(scheduler_port=0) as cluster:
adapt = Adaptive(cluster=cluster)
with Client(cluster, loop=loop) as client:
future = client.submit(inc, 1)
assert future.result() == 2


@pytest.mark.parametrize('interval', [50, 1000])
def test_dont_over_request(loop, interval):
with SGECluster(scheduler_port=0) as cluster:
adapt = Adaptive(cluster=cluster)
with Client(cluster, loop=loop) as client:
future = client.submit(inc, 1)
assert future.result() == 2
assert len(cluster.scheduler.workers) == 1

for i in range(5):
sleep(0.2)
assert len(cluster.scheduler.workers) == 1


def test_request_more_than_one(loop):
with SGECluster(scheduler_port=0) as cluster:
adapt = Adaptive(cluster=cluster)
with Client(cluster, loop=loop) as client:
futures = client.map(slowinc, range(1000), delay=0.2)
while len(cluster.scheduler.workers) < 3:
sleep(0.1)


def test_dont_request_if_idle(loop):
with SGECluster(scheduler_port=0) as cluster:
cluster.start_workers(1)
with Client(cluster, loop=loop) as client:
while not cluster.scheduler.workers:
sleep(0.1)
futures = client.map(slowinc, range(1000), delay=0.2,
workers=first(cluster.scheduler.workers))
adapt = Adaptive(cluster=cluster, interval=2000)

for i in range(60):
sleep(0.1)
assert len(cluster.workers) < 5


def test_dont_request_if_not_enough_tasks(loop):
with SGECluster(scheduler_port=0) as cluster:
adapt = Adaptive(cluster=cluster)
with Client(cluster, loop=loop) as client:
cluster.scheduler.task_duration['slowinc'] = 1000
future = client.submit(slowinc, 1, delay=1000)

for i in range(50):
sleep(0.1)
assert len(cluster.workers) < 2


def test_dont_request_on_many_short_tasks(loop):
with SGECluster(scheduler_port=0) as cluster:
adapt = Adaptive(cluster=cluster, interval=50, startup_cost=10)
with Client(cluster, loop=loop) as client:
cluster.scheduler.task_duration['slowinc'] = 0.001
futures = client.map(slowinc, range(1000), delay=0.001)

while not cluster.scheduler.workers:
sleep(0.01)

for i in range(20):
sleep(0.1)
assert len(cluster.workers) < 2