Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
raybellwaves committed Jul 11, 2018
1 parent fc21ea0 commit caf89e2
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 5 deletions.
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .pbs import PBSCluster
from .slurm import SLURMCluster
from .sge import SGECluster
from .lsf import LSFCluster

from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
25 changes: 20 additions & 5 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class JobQueueCluster(Cluster):
cancel_command: str
Abstract attribute for job scheduler cancel command,
should be overriden
scheduler_name: str
Abstract attribute for job scheduler name,
should be overriden
See Also
--------
Expand Down Expand Up @@ -100,6 +103,7 @@ def __init__(self,
env_extra=None,
walltime=None,
threads=None,
shell=False,
**kwargs
):
""" """
Expand Down Expand Up @@ -153,9 +157,9 @@ def __init__(self,
else:
host = socket.gethostname()

self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs)
self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs)

# Keep information on process, cores, and memory, for use in subclasses
# Keep information on process, threads and memory, for use in subclasses
self.worker_memory = parse_bytes(memory)

self.worker_processes = processes
Expand Down Expand Up @@ -210,12 +214,21 @@ def job_file(self):
f.write(self.job_script())
yield fn

def submit_job(self, script_filename):
""" Sumbits job and handles lsf exception """
if self.scheduler_name == 'lsf':
piped_cmd = [self.submit_command + '<\"' + script_filename + '\"']
self.shell = True
return self._call(piped_cmd)
else:
return self._call(shlex.split(self.submit_command) + [script_filename])

def start_workers(self, n=1):
""" Start workers and point them to our local scheduler """
workers = []
for _ in range(n):
with self.job_file() as fn:
out = self._call(shlex.split(self.submit_command) + [fn])
out = self.submit_job(fn)
job = self._job_id_from_submit_output(out.decode())
self.jobs[self.n] = job
workers.append(self.n)
Expand All @@ -224,7 +237,7 @@ def start_workers(self, n=1):
@property
def scheduler(self):
""" The scheduler of this cluster """
return self.local_cluster.scheduler
return self.cluster.scheduler

def _calls(self, cmds):
""" Call a command using subprocess.communicate
Expand All @@ -251,6 +264,7 @@ def _calls(self, cmds):
for cmd in cmds:
logger.debug(' '.join(cmd))
procs = [subprocess.Popen(cmd,
shell=self.shell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for cmd in cmds]
Expand All @@ -273,6 +287,7 @@ def stop_workers(self, workers):
return
workers = list(map(int, workers))
jobs = [self.jobs[w] for w in workers]
self.shell = False
self._call([self.cancel_command] + list(jobs))
for w in workers:
with ignoring(KeyError):
Expand All @@ -294,7 +309,7 @@ def __enter__(self):

def __exit__(self, type, value, traceback):
self.stop_workers(self.jobs)
self.local_cluster.__exit__(type, value, traceback)
self.cluster.__exit__(type, value, traceback)

def _job_id_from_submit_output(self, out):
raise NotImplementedError('_job_id_from_submit_output must be '
Expand Down
22 changes: 22 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,25 @@ jobqueue:
env-extra: []
resource-spec: null
job-extra: []

lsf:
name: dask-worker

# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR

# LSF resource manager options
queue: null
project: null
walltime: '00:30'
extra: ""
env-extra: []
ncpus: null
mem: null
job-extra: {}
117 changes: 117 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import absolute_import, division, print_function

import logging
import math
import os

import dask

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


class LSFCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster
Parameters
----------
queue : str
Destination queue for each worker job. Passed to `#BSUB -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#BSUB -P` option.
ncpus : int
Number of cpus. Passed to `#BSUB -n` option.
mem : int
Request memory in bytes. Passed to `#BSUB -M` option.
walltime : str
Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option.
job_extra : list
List of other LSF options, for example -u. Each option witll be
prepended with the #LSF prefix.
%(JobQueueCluster.parameters)s
Examples
--------
>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFcluster(queue='general', project='DaskonLSF')
>>> cluster.scale(10) # this may take a few seconds to launch
>>> from dask.distributed import Client
>>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and
kill workers based on load.
>>> cluster.adapt()
""", 4)

# Override class variables
submit_command = 'bsub'
cancel_command = 'bkill'
scheduler_name = 'lsf'

def __init__(self, queue=None, project=None, ncpus=None, mem=None,
walltime=None, job_extra=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name)
if ncpus is None:
queue = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name)
if mem is None:
memory = dask.config.get('jobqueue.%s.mem' % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.lsf.job-extra')

# Instantiate args and parameters from parent abstract class
super(LSFCluster, self).__init__(**kwargs)

header_lines = []
# LSF header build
if self.name is not None:
header_lines.append('#BSUB -J %s' % self.name)
header_lines.append('#BSUB -e %s.err' % self.name)
header_lines.append('#BSUB -o %s.out' % self.name)
if queue is not None:
header_lines.append('#BSUB -q %s' % queue)
if project is not None:
header_lines.append('#BSUB -P %s' % project)
if ncpus is None:
# Compute default cores specifications
ncpus = self.worker_cores
logger.info("ncpus specification for LSF not set, "
"initializing it to %s" % ncpus)
if ncpus is not None:
header_lines.append('#BSUB -n %s' % ncpus)
if mem is None:
# Compute default memory specifications
mem = self.worker_memory
logger.info("mem specification for LSF not set, "
"initializing it to %s" % mem)
if mem is not None:
memory_string = lsf_format_bytes_ceil(mem)
header_lines.append('#BSUB -M %s' % memory_string)
if walltime is not None:
header_lines.append('#BSUB -W %s' % walltime)
header_lines.extend(['#BSUB %s' % arg for arg in job_extra])

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('<')[1].split('>')[0].strip()


def lsf_format_bytes_ceil(n):
""" Format bytes as text
LSF expects megabytes
>>> lsf_format_bytes_ceil(1234567890)
'1178'
"""
return '%d' % math.ceil(n / (1024**2))
153 changes: 153 additions & 0 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import sys
from time import sleep, time

import dask
from dask.distributed import Client
from distributed.utils_test import loop # noqa: F401
import pytest

from dask_jobqueue import LSFCluster


def test_header():
with LSFCluster(walltime='00:02', processes=4, cores=8, memory='8GB') as cluster:

assert '#BSUB' in cluster.job_header
assert '#BSUB -J dask-worker' in cluster.job_header
assert '#BSUB -n 8' in cluster.job_header
assert '#BSUB -M 7630' in cluster.job_header
assert '#BSUB -W 00:02' in cluster.job_header
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header

with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8,
memory='28GB', ncpus=24, mem=100000000000) as cluster:

assert '#BSUB -q general' in cluster.job_header
assert '#BSUB -J dask-worker' in cluster.job_header
assert '#BSUB -n 24' in cluster.job_header
assert '#BSUB -n 8' not in cluster.job_header
assert '#BSUB -M 95368' in cluster.job_header
assert '#BSUB -M 26703' not in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -P DaskOnLSF' in cluster.job_header

with LSFCluster(cores=4, memory='8GB') as cluster:

assert '#BSUB -n' in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -M' in cluster.job_header
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header

with LSFCluster(cores=4, memory='8GB',
job_extra=['-u email@domain.com']) as cluster:

assert '#BSUB -u email@domain.com' in cluster.job_header
assert '#BSUB -n' in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -M' in cluster.job_header
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header


def test_job_script():
with LSFCluster(walltime='00:02', processes=4, cores=8,
memory='8GB') as cluster:

job_script = cluster.job_script()
assert '#BSUB' in job_script
assert '#BSUB -J dask-worker' in job_script
assert '#BSUB -n 8' in job_script
assert '#BSUB -M 26703' in job_script
assert '#BSUB -W 00:02' in job_script
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header

assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 2.00GB' in job_script

with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8,
memory='28GB', ncpu=24, mem=100000000000) as cluster:

job_script = cluster.job_script()
assert '#BSUB -q general' in cluster.job_header
assert '#BSUB -J dask-worker' in cluster.job_header
assert '#BSUB -n 24' in cluster.job_header
assert '#BSUB -n 8' not in cluster.job_header
assert '#BSUB -M 95368' in cluster.job_header
assert '#BSUB -M 26703' not in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -P DaskOnLSF' in cluster.job_header

assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7.00GB' in job_script


@pytest.mark.env("lsf") # noqa: F811
def test_basic(loop):
with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB',
local_directory='/tmp', loop=loop) as cluster:

with Client(cluster) as client:
workers = cluster.start_workers(2)
future = client.submit(lambda x: x + 1, 10)
assert future.result(60) == 11
assert cluster.jobs

info = client.scheduler_info()
w = list(info['workers'].values())[0]
assert w['memory_limit'] == 2e9
assert w['ncores'] == 2

cluster.stop_workers(workers)

start = time()
while len(client.scheduler_info()['workers']) > 0:
sleep(0.100)
assert time() < start + 10

assert not cluster.jobs


@pytest.mark.env("lsf") # noqa: F811
def test_adaptive(loop):
with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB',
local_directory='/tmp', loop=loop) as cluster:
cluster.adapt()
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)
assert future.result(60) == 11

assert cluster.jobs

start = time()
processes = cluster.worker_processes
while len(client.scheduler_info()['workers']) != processes:
sleep(0.1)
assert time() < start + 10

del future

start = time()
while len(client.scheduler_info()['workers']) > 0:
sleep(0.100)
assert time() < start + 10


def test_config(loop): # noqa: F811
with dask.config.set({'jobqueue.lsf.walltime': '00:02',
'jobqueue.lsf.local-directory': '/foo'}):
with LSFCluster(loop=loop) as cluster:
assert '00:02' in cluster.job_script()
assert '--local-directory /foo' in cluster.job_script()


def test_informative_errors():
with pytest.raises(ValueError) as info:
LSFCluster(memory=None, cores=4)
assert 'memory' in str(info.value)

with pytest.raises(ValueError) as info:
LSFCluster(memory='1GB', cores=None)
assert 'cores' in str(info.value)

0 comments on commit caf89e2

Please sign in to comment.