-
-
Notifications
You must be signed in to change notification settings - Fork 144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes for Adaptive #63
Changes from 13 commits
7c56b1d
6b35688
507be82
99d0f1f
78a22ff
62e050c
d121180
2329bfe
92eaf4e
9084a35
4776892
ef62f59
a4e007a
c19e4da
5d5fd85
115b0c1
cde3ca4
75f2c6a
66db52d
ea7d56d
a6d31d2
25965c0
ab4363a
604a563
1e0455e
ace37ad
56e2990
914244c
359be59
1441634
0bf53d1
cc2628f
90dd730
9fe2178
18dfe31
d98b141
b303275
13e5dc3
b4877ad
667369e
bf99d29
292b595
627f873
f2b2a92
1f0dc71
a1b102d
c988f1e
619047f
8db65eb
ef16298
3803918
aad58d4
fa1b717
aeea2e5
b93a7c4
0a2e304
1a1fe75
8c32872
9c43f43
a02abc8
ee89f20
4abcea1
0c7425a
5d4552d
8a150c9
ca0c727
ce007df
c23ce7c
7618467
d5e42b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,70 @@ | ||
from __future__ import absolute_import, division, print_function | ||
|
||
import logging | ||
import math | ||
import shlex | ||
import socket | ||
import subprocess | ||
import sys | ||
from collections import OrderedDict | ||
from contextlib import contextmanager | ||
|
||
import dask | ||
import docrep | ||
from distributed import LocalCluster | ||
from distributed.deploy import Cluster | ||
from distributed.utils import get_ip_interface, ignoring, parse_bytes, tmpfile | ||
from distributed.utils import get_ip_interface, parse_bytes, tmpfile | ||
from distributed.diagnostics.plugin import SchedulerPlugin | ||
|
||
logger = logging.getLogger(__name__) | ||
docstrings = docrep.DocstringProcessor() | ||
|
||
|
||
def _job_id_from_worker_name(name): | ||
''' utility to parse the job ID from the worker name | ||
|
||
template: 'prefix-jobid[-proc]' | ||
''' | ||
pieces = name.split('-') | ||
print(name, pieces) | ||
if len(pieces) == 2: | ||
return pieces[-1] | ||
else: | ||
return pieces[-2] | ||
|
||
|
||
class JobQueuePlugin(SchedulerPlugin): | ||
def __init__(self): | ||
self.pending_jobs = OrderedDict() | ||
self.running_jobs = OrderedDict() | ||
self.finished_jobs = OrderedDict() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find |
||
self.all_workers = {} | ||
|
||
def add_worker(self, scheduler, worker=None, name=None, **kwargs): | ||
''' Run when a new worker enters the cluster''' | ||
w = scheduler.workers[worker] | ||
job_id = _job_id_from_worker_name(w.name) | ||
self.all_workers[worker] = (w.name, job_id) | ||
|
||
# if this is the first worker for this job, move job to running | ||
if job_id not in self.running_jobs: | ||
self.running_jobs[job_id] = self.pending_jobs.pop(job_id) | ||
|
||
# add worker to dict of workers in this job | ||
self.running_jobs[job_id][w.name] = w | ||
|
||
def remove_worker(self, scheduler=None, worker=None, **kwargs): | ||
''' Run when a worker leaves the cluster''' | ||
name, job_id = self.all_workers[worker] | ||
|
||
# remove worker from this job | ||
del self.running_jobs[job_id][name] | ||
|
||
# once there are no more workers, move this job to finished_jobs | ||
if not self.running_jobs[job_id]: | ||
self.finished_jobs[job_id] = self.running_jobs.pop(job_id) | ||
|
||
|
||
@docstrings.get_sectionsf('JobQueueCluster') | ||
class JobQueueCluster(Cluster): | ||
""" Base class to launch Dask Clusters for Job queues | ||
|
@@ -75,6 +125,8 @@ class JobQueueCluster(Cluster): | |
submit_command = None | ||
cancel_command = None | ||
scheduler_name = '' | ||
_adaptive_options = { | ||
'worker_key': lambda ws: _job_id_from_worker_name(ws.name)} | ||
|
||
def __init__(self, | ||
name=None, | ||
|
@@ -117,6 +169,10 @@ def __init__(self, | |
if env_extra is None: | ||
env_extra = dask.config.get('jobqueue.%s.env-extra' % self.scheduler_name) | ||
|
||
if '-' in name: | ||
raise ValueError( | ||
'name (%s) can not include the `-` character' % name) | ||
|
||
#This attribute should be overriden | ||
self.job_header = None | ||
|
||
|
@@ -128,15 +184,17 @@ def __init__(self, | |
|
||
self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) | ||
|
||
# plugin for tracking job status | ||
self._scheduler_plugin = JobQueuePlugin() | ||
self.cluster.scheduler.add_plugin(self._scheduler_plugin) | ||
|
||
# Keep information on process, threads and memory, for use in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably want to revert the change in this comment |
||
# subclasses | ||
self.worker_memory = parse_bytes(memory) if memory is not None else None | ||
self.worker_processes = processes | ||
self.worker_threads = threads | ||
self.name = name | ||
|
||
self.jobs = dict() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we can get rid of I'm in favour of keeping a dict mapping job_id --> job-status, e.g as @mrocklin proposed in #11: 'pending', 'running', 'finished' or equivalent. This way, in the scale_up method, we can take that into account. A maybe simpler solution is to only keep track of the number of workers that are pending or running, and at least to use this number in scale_up: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @guillaumeeb - I think we're in agreement here.
This would be nice but it might be somewhat difficult to do. We have three/four states that a job might be in:
Generally, I think any use of
So we probably don't want to do this very often. Do others have ideas as to how one would track the status of these jobs in a tractable way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could add a plugin to the scheduler that watched for when workers started and compared them against a set of known pending workers: http://distributed.readthedocs.io/en/latest/plugins.html Something like the following: from distributed.diagnostics.plugin import SchedulerPlugin
class JobQueuePlugin(SchedulerPlugin):
def add_worker(self, scheduler, worker=None, name=None, **kwargs):
job_id = parse(name)
pending_jobs.remove(job_id)
scheduler.add_plugin(JobQueuePlugin()) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sounds like what we need. I can implement this as part of this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good! We don't seem to need a watcher on stopped worker this way, we know started workers through scheduler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See 92eaf4e for an initial (untested) implementation using the scheduler plugin approach. |
||
self.n = 0 | ||
self._adaptive = None | ||
|
||
self._env_header = '\n'.join(env_extra) | ||
|
@@ -152,22 +210,36 @@ def __init__(self, | |
if memory is not None: | ||
self._command_template += " --memory-limit %s" % memory | ||
if name is not None: | ||
self._command_template += " --name %s" % name | ||
self._command_template += "-%(n)d" # Keep %(n) to be replaced later | ||
# worker names follow this template: {NAME}-{JOB_ID} | ||
self._command_template += " --name %s-${JOB_ID}" % name | ||
if death_timeout is not None: | ||
self._command_template += " --death-timeout %s" % death_timeout | ||
if local_directory is not None: | ||
self._command_template += " --local-directory %s" % local_directory | ||
if extra is not None: | ||
self._command_template += extra | ||
|
||
@property | ||
def pending_jobs(self): | ||
""" Jobs pending in the queue """ | ||
return self._scheduler_plugin.pending_jobs | ||
|
||
@property | ||
def running_jobs(self): | ||
""" Jobs with currenly active workers """ | ||
return self._scheduler_plugin.running_jobs | ||
|
||
@property | ||
def finished_jobs(self): | ||
""" Jobs that have finished """ | ||
return self._scheduler_plugin.finished_jobs | ||
|
||
def job_script(self): | ||
""" Construct a job submission script """ | ||
self.n += 1 | ||
template = self._command_template % {'n': self.n} | ||
return self._script_template % {'job_header': self.job_header, | ||
'env_header': self._env_header, | ||
'worker_command': template} | ||
pieces = {'job_header': self.job_header, | ||
'env_header': self._env_header, | ||
'worker_command': self._command_template} | ||
return self._script_template % pieces | ||
|
||
@contextmanager | ||
def job_file(self): | ||
|
@@ -179,14 +251,12 @@ def job_file(self): | |
|
||
def start_workers(self, n=1): | ||
""" Start workers and point them to our local scheduler """ | ||
workers = [] | ||
for _ in range(n): | ||
num_jobs = math.ceil(n / self.worker_processes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably need |
||
for _ in range(num_jobs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a breaking change I want to make sure everyone is aware of. The current behavior for a hypothetical setup that includes 10 workers per job would be: cluster.start_workers(1) ...and get 1 job and 10 workers. I'd like to change this so that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Historically start_workers was a semi-convention between a few projects. This has decayed, so I have no strong thoughts here. I do think that we need to be consistent on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this really help adaptive? Would'nt there still be a problem with starting the worker in a grouped manner? With your example, calling But this may be well handled by adaptive, I don't know. In this case, this may not be needed to do this breaking change? |
||
with self.job_file() as fn: | ||
out = self._call(shlex.split(self.submit_command) + [fn]) | ||
job = self._job_id_from_submit_output(out.decode()) | ||
self.jobs[self.n] = job | ||
workers.append(self.n) | ||
return workers | ||
self._scheduler_plugin.pending_jobs[job] = {} | ||
|
||
@property | ||
def scheduler(self): | ||
|
@@ -215,12 +285,12 @@ def _calls(self, cmds): | |
Also logs any stderr information | ||
""" | ||
logger.debug("Submitting the following calls to command line") | ||
procs = [] | ||
for cmd in cmds: | ||
logger.debug(' '.join(cmd)) | ||
procs = [subprocess.Popen(cmd, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE) | ||
for cmd in cmds] | ||
procs.append(subprocess.Popen(cmd, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE)) | ||
|
||
result = [] | ||
for proc in procs: | ||
|
@@ -238,29 +308,43 @@ def stop_workers(self, workers): | |
""" Stop a list of workers""" | ||
if not workers: | ||
return | ||
workers = list(map(int, workers)) | ||
jobs = [self.jobs[w] for w in workers] | ||
self._call([self.cancel_command] + list(jobs)) | ||
jobs = [] | ||
for w in workers: | ||
with ignoring(KeyError): | ||
del self.jobs[w] | ||
if isinstance(w, dict): | ||
jobs.append(_job_id_from_worker_name(w['name'])) | ||
else: | ||
jobs.append(_job_id_from_worker_name(w.name)) | ||
self.stop_jobs(jobs) | ||
|
||
def stop_jobs(self, jobs): | ||
""" Stop a list of jobs""" | ||
if jobs: | ||
self._call([self.cancel_command] + list(jobs)) | ||
|
||
def scale_up(self, n, **kwargs): | ||
""" Brings total worker count up to ``n`` """ | ||
return self.start_workers(n - len(self.jobs)) | ||
active_and_pending = sum([len(j.workers) for j in | ||
self.running_jobs.values()]) | ||
active_and_pending += self.worker_processes * len(self.pending_jobs) | ||
self.start_workers(n - active_and_pending) | ||
|
||
def scale_down(self, workers): | ||
''' Close the workers with the given addresses ''' | ||
if isinstance(workers, dict): | ||
names = {v['name'] for v in workers.values()} | ||
job_ids = {name.split('-')[-2] for name in names} | ||
self.stop_workers(job_ids) | ||
worker_states = [] | ||
for w in workers: | ||
try: | ||
# Get the actual WorkerState | ||
worker_states.append(self.scheduler.workers[w]) | ||
except KeyError: | ||
logger.debug('worker %s is already gone' % w) | ||
self.stop_workers(worker_states) | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, type, value, traceback): | ||
self.stop_workers(self.jobs) | ||
jobs = list(self.pending_jobs.keys()) + list(self.running_jobs.keys()) | ||
self.stop_jobs(jobs) | ||
self.cluster.__exit__(type, value, traceback) | ||
|
||
def _job_id_from_submit_output(self, out): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
from __future__ import absolute_import, division, print_function | ||
|
||
import logging | ||
|
||
import dask | ||
|
@@ -56,8 +58,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, | |
|
||
super(SGECluster, self).__init__(**kwargs) | ||
|
||
header_lines = ['#!/bin/bash'] | ||
|
||
header_lines = ['#!/usr/bin/env bash'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you don't have a solution for propagating JOB_ID var in sge script? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. I thought I did but I'll have to sort it out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I take that back. SGE uses |
||
if self.name is not None: | ||
header_lines.append('#$ -N %(name)s') | ||
if queue is not None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
|
||
QUEUE_WAIT = 60 # seconds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's great to have a constant that is used consistently in the test! Is there a good reason to leave this to 60s? If not a smaller number like 15s (I think that was the number before) would be good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree from a previous comment from @mrocklin about the
-
character to get the jobid. I thing we should use something else around it, maybe double it--jobid--
or use a different character.The
if len(pieces)
here looks kind of non deterministic, and preventing user to use dash in job name feels not right...