Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for Adaptive #63

Merged
merged 70 commits into from
Jul 16, 2018
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
7c56b1d
add job ids to dask workers
May 18, 2018
6b35688
pad job id
May 18, 2018
507be82
parse PBS/slurm job ids
May 18, 2018
99d0f1f
track workers individually (sort of)
May 21, 2018
78a22ff
add _adaptive_options
May 21, 2018
62e050c
generalize the parsing of the job id
May 22, 2018
d121180
fix typo
May 22, 2018
2329bfe
changes for review
May 22, 2018
92eaf4e
add pluggin (untested)
May 24, 2018
9084a35
a few fixes + tests
May 24, 2018
4776892
respond to first round of comments
May 29, 2018
ef62f59
fix list addition
May 29, 2018
a4e007a
mark test modules (again)
May 29, 2018
c19e4da
fixes while testing on pbs
May 29, 2018
5d5fd85
remove extra if block
May 29, 2018
115b0c1
use for/else
May 29, 2018
cde3ca4
fix two failing tests
May 30, 2018
75f2c6a
Merge branch 'jobids' of github.com:jhamman/dask-jobqueue into jobids
May 30, 2018
66db52d
respond to review comments
Jun 3, 2018
ea7d56d
fix bug in scale down
Jun 4, 2018
a6d31d2
fix marks
Jun 4, 2018
25965c0
Merge branch 'master' of github.com:dask/dask-jobqueue into jobids
Jun 15, 2018
ab4363a
more docs
Jun 21, 2018
604a563
debugging ci
Jun 21, 2018
1e0455e
more ci
Jun 21, 2018
ace37ad
only stop jobs if there are jobs to stop
Jun 21, 2018
56e2990
refactor remove_worker method
Jun 21, 2018
914244c
debug
Jun 21, 2018
359be59
print debug info
Jun 21, 2018
1441634
longer waits in tests
Jun 21, 2018
0bf53d1
refactor use of jobids, add scheduler plugin
May 18, 2018
cc2628f
debug stop workers
Jun 26, 2018
90dd730
fix tests of stop_workers
Jun 26, 2018
9fe2178
Merge branch 'master' into jobids
mrocklin Jun 26, 2018
18dfe31
cleanup
Jun 26, 2018
d98b141
Merge branch 'master' of github.com:dask/dask-jobqueue into jobids
Jun 26, 2018
b303275
more flexible stop workers
Jun 26, 2018
13e5dc3
Merge branch 'jobids' of github.com:jhamman/dask-jobqueue into jobids
Jun 26, 2018
b4877ad
Merge remote-tracking branch 'origin/jobids' into jobids
Jun 26, 2018
667369e
remove Job class
Jun 27, 2018
bf99d29
Merge branch 'master' of github.com:dask/dask-jobqueue into jobids
Jun 27, 2018
292b595
fix for worker name (again)
Jun 27, 2018
627f873
debug
Jun 27, 2018
f2b2a92
perform a backflip
Jun 27, 2018
1f0dc71
Merge branch 'jobids' of github.com:jhamman/dask-jobqueue into jobids
Jun 27, 2018
a1b102d
Merge branch 'master' of github.com:dask/dask-jobqueue into jobids
Jul 3, 2018
c988f1e
isort after merge conflict resolution
Jul 3, 2018
619047f
more docs stuff
Jul 3, 2018
8db65eb
update worker name template to use bracket delims
Jul 3, 2018
ef16298
add a few more comments
Jul 3, 2018
3803918
roll back changes in tests
Jul 3, 2018
aad58d4
fix slurm tests and missing scheduler plugin after merge conflict res…
Jul 3, 2018
fa1b717
fix threads in test
Jul 3, 2018
aeea2e5
debug on travis
Jul 6, 2018
b93a7c4
simplify tests
Jul 6, 2018
0a2e304
more debugging and more robust sleeps
Jul 7, 2018
1a1fe75
unify basic test
Jul 7, 2018
8c32872
unify adaptive tests
Jul 7, 2018
9c43f43
Merge remote-tracking branch 'origin/jobids' into jobids
Jul 9, 2018
a02abc8
Merge branch 'master' of github.com:dask/dask-jobqueue into jobids
Jul 9, 2018
ee89f20
debug statements and some nice fixups
Jul 9, 2018
4abcea1
future div
Jul 13, 2018
0c7425a
add logging stuff
Jul 13, 2018
5d4552d
-s for pytest
Jul 13, 2018
8a150c9
use --job_id-- for name
mrocklin Jul 13, 2018
ca0c727
fix memory in sge tests
mrocklin Jul 13, 2018
ce007df
remove pending jobs when scaling down
Jul 14, 2018
c23ce7c
remove pending jobs
Jul 14, 2018
7618467
cleanup after lots of debugging
Jul 15, 2018
d5e42b3
additional cleanup
Jul 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ci/none.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env bash

set -x

function jobqueue_before_install {
# Install miniconda
./ci/conda_setup.sh
Expand Down
2 changes: 0 additions & 2 deletions ci/pbs.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env bash

set -x

function jobqueue_before_install {
docker version
docker-compose version
Expand Down
2 changes: 0 additions & 2 deletions ci/sge.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env bash

set -x

function jobqueue_before_install {
docker version
docker-compose version
Expand Down
2 changes: 0 additions & 2 deletions ci/slurm.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env bash

set -x

function jobqueue_before_install {
docker version
docker-compose version
Expand Down
3 changes: 1 addition & 2 deletions dask_jobqueue/config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from __future__ import print_function, division, absolute_import
from __future__ import absolute_import, division, print_function

import os

import dask
import yaml


fn = os.path.join(os.path.dirname(__file__), 'jobqueue.yaml')
dask.config.ensure_file(source=fn)

Expand Down
150 changes: 115 additions & 35 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from contextlib import contextmanager
import logging
import math
import shlex
import socket
import subprocess
import sys
import warnings
from collections import OrderedDict
from contextlib import contextmanager

import dask
import docrep
from distributed import LocalCluster
from distributed.deploy import Cluster
from distributed.utils import (get_ip_interface, ignoring, parse_bytes, tmpfile,
format_bytes)
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.utils import (
format_bytes, get_ip_interface, parse_bytes, tmpfile)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
docstrings = docrep.DocstringProcessor()


Expand All @@ -28,6 +32,46 @@
""".strip()


def _job_id_from_worker_name(name):
''' utility to parse the job ID from the worker name

template: 'prefix[jobid]suffix'
'''
return name.split('[', 1)[1].split(']')[0]


class JobQueuePlugin(SchedulerPlugin):
def __init__(self):
self.pending_jobs = OrderedDict()
self.running_jobs = OrderedDict()
self.finished_jobs = OrderedDict()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find finished_jobs is not such a great name because those are jobs that have been qdeled. In my mind finished_jobs means the job has finished normally (i.e. was not qdeled). I don't have a very good suggestion for a better name though, maybe stopped_jobs or canceled_jobs.

self.all_workers = {}

def add_worker(self, scheduler, worker=None, name=None, **kwargs):
''' Run when a new worker enters the cluster'''
w = scheduler.workers[worker]
job_id = _job_id_from_worker_name(w.name)
self.all_workers[worker] = (w.name, job_id)

# if this is the first worker for this job, move job to running
if job_id not in self.running_jobs:
self.running_jobs[job_id] = self.pending_jobs.pop(job_id)

# add worker to dict of workers in this job
self.running_jobs[job_id][w.name] = w

def remove_worker(self, scheduler=None, worker=None, **kwargs):
''' Run when a worker leaves the cluster'''
name, job_id = self.all_workers[worker]

# remove worker from this job
del self.running_jobs[job_id][name]

# once there are no more workers, move this job to finished_jobs
if not self.running_jobs[job_id]:
self.finished_jobs[job_id] = self.running_jobs.pop(job_id)


@docstrings.get_sectionsf('JobQueueCluster')
class JobQueueCluster(Cluster):
""" Base class to launch Dask Clusters for Job queues
Expand Down Expand Up @@ -87,6 +131,8 @@ class JobQueueCluster(Cluster):
submit_command = None
cancel_command = None
scheduler_name = ''
_adaptive_options = {
'worker_key': lambda ws: _job_id_from_worker_name(ws.name)}

def __init__(self,
name=None,
Expand Down Expand Up @@ -155,15 +201,17 @@ def __init__(self,

self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs)

# Keep information on process, cores, and memory, for use in subclasses
self.worker_memory = parse_bytes(memory)

# Keep information on process, threads and memory, for use in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to revert the change in this comment

# subclasses
self.worker_memory = parse_bytes(memory) if memory is not None else None
self.worker_processes = processes
self.worker_cores = cores
self.name = name

self.jobs = dict()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.jobs was a mapping from n-->job_id. However, we were not really using it and it often was not cleaned up when a job ended (so I've removed it).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can get rid of self.n and thus self.jobs as it is right now. However, for adaptive to work correctly, I believe we should keep track of all submitted jobs and their statuses. If not, don't we risk to continuously submit new jobs that are kept in the scheduler queue?

I'm in favour of keeping a dict mapping job_id --> job-status, e.g as @mrocklin proposed in #11: 'pending', 'running', 'finished' or equivalent. This way, in the scale_up method, we can take that into account.

A maybe simpler solution is to only keep track of the number of workers that are pending or running, and at least to use this number in scale_up:
return self.start_workers(n - number_of_pending_or_running_worker)
But it seems difficult to deal with finished, running and pending worker this way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guillaumeeb - I think we're in agreement here.

we should keep track of all submitted jobs and their statuses.

This would be nice but it might be somewhat difficult to do. We have three/four states that a job might be in:

  • Pending - we may be able to combine some form of qstat job_identifier with a dictionary of submitted jobs self.jobs above
  • Running - it is straight forward to determine which workers are attached to the scheduler
  • Finished - jobs can exit normally or be killed (e.g. exceeded wall time). When JobQueue culls a worker, its easy to remove that worker from the list of jobs. However, when the queuing system kills a worker, we would need a way to remove that job from the list of running jobs.

Generally, I think any use of qstat is going to be a bit ugly just because repeated queries of the queueing system tend to be quite expensive. For example:

$ time qstat 9595055
Job id            Name             User              Time Use S Queue
----------------  ---------------- ----------------  -------- - -----
9595055.chadmin1  dask             jhamman           00:07:22 R premium

real	0m1.030s
user	0m0.012s
sys	0m0.000s

So we probably don't want to do this very often. Do others have ideas as to how one would track the status of these jobs in a tractable way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a plugin to the scheduler that watched for when workers started and compared them against a set of known pending workers: http://distributed.readthedocs.io/en/latest/plugins.html

Something like the following:

from distributed.diagnostics.plugin import SchedulerPlugin

class JobQueuePlugin(SchedulerPlugin):
    def add_worker(self, scheduler, worker=None, name=None, **kwargs):
        job_id = parse(name)
        pending_jobs.remove(job_id)

scheduler.add_plugin(JobQueuePlugin())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like what we need. I can implement this as part of this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! We don't seem to need a watcher on stopped worker this way, we know started workers through scheduler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 92eaf4e for an initial (untested) implementation using the scheduler plugin approach.

self.n = 0
# plugin for tracking job status
self._scheduler_plugin = JobQueuePlugin()
self.local_cluster.scheduler.add_plugin(self._scheduler_plugin)

self._adaptive = None

self._env_header = '\n'.join(env_extra)
Expand All @@ -181,26 +229,42 @@ def __init__(self,
self._command_template += " --memory-limit %s" % mem

if name is not None:
self._command_template += " --name %s" % name
self._command_template += "-%(n)d" # Keep %(n) to be replaced later
# worker names follow this template: {NAME}[{JOB_ID}]
# {JOB_ID} is an environment variable defined by the individual
# job scrips/schedulers
self._command_template += " --name %s[${JOB_ID}]" % name
if death_timeout is not None:
self._command_template += " --death-timeout %s" % death_timeout
if local_directory is not None:
self._command_template += " --local-directory %s" % local_directory
if extra is not None:
self._command_template += extra

@property
def pending_jobs(self):
""" Jobs pending in the queue """
return self._scheduler_plugin.pending_jobs

@property
def running_jobs(self):
""" Jobs with currenly active workers """
return self._scheduler_plugin.running_jobs

@property
def finished_jobs(self):
""" Jobs that have finished """
return self._scheduler_plugin.finished_jobs

@property
def worker_threads(self):
return int(self.worker_cores / self.worker_processes)

def job_script(self):
""" Construct a job submission script """
self.n += 1
template = self._command_template % {'n': self.n}
return self._script_template % {'job_header': self.job_header,
'env_header': self._env_header,
'worker_command': template}
pieces = {'job_header': self.job_header,
'env_header': self._env_header,
'worker_command': self._command_template}
return self._script_template % pieces

@contextmanager
def job_file(self):
Expand All @@ -212,14 +276,12 @@ def job_file(self):

def start_workers(self, n=1):
""" Start workers and point them to our local scheduler """
workers = []
for _ in range(n):
num_jobs = math.ceil(n / self.worker_processes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need from __future__ import division at the top of this file.

for _ in range(num_jobs):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change I want to make sure everyone is aware of. The current behavior for a hypothetical setup that includes 10 workers per job would be:

cluster.start_workers(1)

...and get 1 job and 10 workers.

I'd like to change this so that start_workers(n) gives us n workers and as many jobs as needed to make that happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically start_workers was a semi-convention between a few projects. This has decayed, so I have no strong thoughts here. I do think that we need to be consistent on scale though, which seems a bit more standard today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this really help adaptive? Would'nt there still be a problem with starting the worker in a grouped manner?

With your example, calling cluster.start_workers(1) will still lead to 1 job and 10 workers!

But this may be well handled by adaptive, I don't know. In this case, this may not be needed to do this breaking change?

with self.job_file() as fn:
out = self._call(shlex.split(self.submit_command) + [fn])
job = self._job_id_from_submit_output(out.decode())
self.jobs[self.n] = job
workers.append(self.n)
return workers
self._scheduler_plugin.pending_jobs[job] = {}

@property
def scheduler(self):
Expand Down Expand Up @@ -248,12 +310,12 @@ def _calls(self, cmds):
Also logs any stderr information
"""
logger.debug("Submitting the following calls to command line")
procs = []
for cmd in cmds:
logger.debug(' '.join(cmd))
procs = [subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for cmd in cmds]
procs.append(subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE))

result = []
for proc in procs:
Expand All @@ -269,31 +331,49 @@ def _call(self, cmd):

def stop_workers(self, workers):
""" Stop a list of workers"""
logger.debug("Stopping workers: %s" % workers)
if not workers:
return
workers = list(map(int, workers))
jobs = [self.jobs[w] for w in workers]
self._call([self.cancel_command] + list(jobs))
jobs = []
for w in workers:
with ignoring(KeyError):
del self.jobs[w]
if isinstance(w, dict):
jobs.append(_job_id_from_worker_name(w['name']))
else:
jobs.append(_job_id_from_worker_name(w.name))
self.stop_jobs(set(jobs))

def stop_jobs(self, jobs):
""" Stop a list of jobs"""
logger.debug("Stopping jobs: %s" % jobs)
if jobs:
self._call([self.cancel_command] + list(set(jobs)))

def scale_up(self, n, **kwargs):
""" Brings total worker count up to ``n`` """
return self.start_workers(n - len(self.jobs))
logger.debug("Scaling up to %d workers." % n)
active_and_pending = sum([len(j) for j in self.running_jobs.values()])
active_and_pending += self.worker_processes * len(self.pending_jobs)
logger.debug("Found %d active/pending workers." % active_and_pending)
self.start_workers(n - active_and_pending)

def scale_down(self, workers):
''' Close the workers with the given addresses '''
if isinstance(workers, dict):
names = {v['name'] for v in workers.values()}
job_ids = {name.split('-')[-2] for name in names}
self.stop_workers(job_ids)
logger.debug("Scaling down. Workers: %s" % workers)
worker_states = []
for w in workers:
try:
# Get the actual WorkerState
worker_states.append(self.scheduler.workers[w])
except KeyError:
logger.debug('worker %s is already gone' % w)
self.stop_workers(worker_states)

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
self.stop_workers(self.jobs)
jobs = list(self.pending_jobs.keys()) + list(self.running_jobs.keys())
self.stop_jobs(set(jobs))
self.local_cluster.__exit__(type, value, traceback)

def _job_id_from_submit_output(self, out):
Expand Down
2 changes: 1 addition & 1 deletion dask_jobqueue/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MoabCluster(PBSCluster):
memory='16G', resource_spec='96G',
job_extra=['-d /home/First.Last', '-M none'],
local_directory=os.getenv('TMPDIR', '/tmp'))
>>> cluster.start_workers(10) # this may take a few seconds to launch
>>> cluster.start_workers(10) # submit enough jobs to deploy 10 workers

>>> from dask.distributed import Client
>>> client = Client(cluster)
Expand Down
6 changes: 5 additions & 1 deletion dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
# Instantiate args and parameters from parent abstract class
super(PBSCluster, self).__init__(**kwargs)

header_lines = []
# Try to find a project name from environment variable
project = project or os.environ.get('PBS_ACCOUNT')

header_lines = ['#!/usr/bin/env bash']
# PBS header build
if self.name is not None:
header_lines.append('#PBS -N %s' % self.name)
Expand All @@ -95,6 +98,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
if walltime is not None:
header_lines.append('#PBS -l walltime=%s' % walltime)
header_lines.extend(['#PBS %s' % arg for arg in job_extra])
header_lines.append('JOB_ID=${PBS_JOBID%.*}')

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)
Expand Down
5 changes: 3 additions & 2 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import absolute_import, division, print_function

import logging

import dask
Expand Down Expand Up @@ -56,8 +58,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,

super(SGECluster, self).__init__(**kwargs)

header_lines = ['#!/bin/bash']

header_lines = ['#!/usr/bin/env bash']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you don't have a solution for propagating JOB_ID var in sge script?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I thought I did but I'll have to sort it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take that back. SGE uses JOB_ID for its environment variable so its already present.

if self.name is not None:
header_lines.append('#$ -N %(name)s')
if queue is not None:
Expand Down
6 changes: 4 additions & 2 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SLURMCluster(JobQueueCluster):
>>> cluster.adapt()
""", 4)

#Override class variables
# Override class variables
submit_command = 'sbatch --parsable'
cancel_command = 'scancel'
scheduler_name = 'slurm'
Expand All @@ -74,7 +74,7 @@ def __init__(self, queue=None, project=None, walltime=None,
super(SLURMCluster, self).__init__(**kwargs)

# Always ask for only one task
header_lines = []
header_lines = ['#!/usr/bin/env bash']
# SLURM header build
if self.name is not None:
header_lines.append('#SBATCH -J %s' % self.name)
Expand All @@ -100,6 +100,8 @@ def __init__(self, queue=None, project=None, walltime=None,
header_lines.append('#SBATCH -t %s' % walltime)
header_lines.extend(['#SBATCH %s' % arg for arg in job_extra])

header_lines.append('JOB_ID=${SLURM_JOB_ID%;*}')

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

Expand Down
2 changes: 2 additions & 0 deletions dask_jobqueue/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

QUEUE_WAIT = 60 # seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to have a constant that is used consistently in the test!

Is there a good reason to leave this to 60s? If not a smaller number like 15s (I think that was the number before) would be good.

5 changes: 0 additions & 5 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
from dask_jobqueue import JobQueueCluster


def test_jq_core_placeholder():
# to test that CI is working
pass


def test_errors():
with pytest.raises(NotImplementedError) as info:
JobQueueCluster(cores=4)
Expand Down
Loading