Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lsf #78

Merged
merged 23 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .pbs import PBSCluster
from .slurm import SLURMCluster
from .sge import SGECluster
from .lsf import LSFCluster

from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
14 changes: 9 additions & 5 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,16 @@ def job_file(self):
f.write(self.job_script())
yield fn

def _submit_job(self, script_filename):
return self._call(shlex.split(self.submit_command) + [script_filename])

def start_workers(self, n=1):
""" Start workers and point them to our local scheduler """
logger.debug('starting %s workers' % n)
num_jobs = math.ceil(n / self.worker_processes)
for _ in range(num_jobs):
with self.job_file() as fn:
out = self._call(shlex.split(self.submit_command) + [fn])
out = self._submit_job(fn)
job = self._job_id_from_submit_output(out.decode())
logger.debug("started job: %s" % job)
self.pending_jobs[job] = {}
Expand All @@ -296,7 +299,7 @@ def scheduler(self):
""" The scheduler of this cluster """
return self.local_cluster.scheduler

def _calls(self, cmds):
def _calls(self, cmds, **kwargs):
""" Call a command using subprocess.communicate

This centralzies calls out to the command line, providing consistent
Expand All @@ -323,7 +326,8 @@ def _calls(self, cmds):
logger.debug(' '.join(cmd))
procs.append(subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE))
stderr=subprocess.PIPE,
**kwargs))

result = []
for proc in procs:
Expand All @@ -333,9 +337,9 @@ def _calls(self, cmds):
result.append(out)
return result

def _call(self, cmd):
def _call(self, cmd, **kwargs):
""" Singular version of _calls """
return self._calls([cmd])[0]
return self._calls([cmd], **kwargs)[0]

def stop_workers(self, workers):
""" Stop a list of workers"""
Expand Down
22 changes: 22 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,25 @@ jobqueue:
env-extra: []
resource-spec: null
job-extra: []

lsf:
name: dask-worker

# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR

# LSF resource manager options
queue: null
project: null
walltime: '00:30'
extra: ""
env-extra: []
ncpus: null
mem: null
job-extra: []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking in, does LSF need/use all of these options? I'm slightly concerned that as we copy configs from different systems we may accrue more than is necessary.

Copy link
Member Author

@raybellwaves raybellwaves Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only ones I haven't used are extra and env-extra. extra is for Additional arguments to pass to dask-worker so that is probably worth keeping. env-extra is other commands to the script before launching the worker. I can't see myself using this but core.py checks for it

env_extra : list

A future PR could be to move that out of core.py and have users specify it in their individual classes. I'll let you decide that.

131 changes: 131 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from __future__ import absolute_import, division, print_function

import logging
import math

import dask

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


class LSFCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster

Parameters
----------
queue : str
Destination queue for each worker job. Passed to `#BSUB -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#BSUB -P` option.
ncpus : int
Number of cpus. Passed to `#BSUB -n` option.
mem : int
Request memory in bytes. Passed to `#BSUB -M` option.
walltime : str
Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option.
job_extra : list
List of other LSF options, for example -u. Each option will be
prepended with the #LSF prefix.
%(JobQueueCluster.parameters)s

Examples
--------
>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFcluster(queue='general', project='DaskonLSF',
... cores=15, memory='25GB')
>>> cluster.start_workers(10) # this may take a few seconds to launch

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and
kill workers based on load.

>>> cluster.adapt()
""", 4)

# Override class variables
submit_command = 'bsub <'
cancel_command = 'bkill'
scheduler_name = 'lsf'

def __init__(self, queue=None, project=None, ncpus=None, mem=None,
walltime=None, job_extra=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name)
if ncpus is None:
ncpus = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name)
if mem is None:
mem = dask.config.get('jobqueue.%s.mem' % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name)

# Instantiate args and parameters from parent abstract class
super(LSFCluster, self).__init__(**kwargs)

header_lines = []
# LSF header build
if self.name is not None:
header_lines.append('#BSUB -J %s' % self.name)
header_lines.append('#BSUB -e %s.err' % self.name)
header_lines.append('#BSUB -o %s.out' % self.name)
if queue is not None:
header_lines.append('#BSUB -q %s' % queue)
if project is not None:
header_lines.append('#BSUB -P %s' % project)
if ncpus is None:
# Compute default cores specifications
ncpus = self.worker_cores
logger.info("ncpus specification for LSF not set, "
"initializing it to %s" % ncpus)
if ncpus is not None:
header_lines.append('#BSUB -n %s' % ncpus)
if mem is None:
# Compute default memory specifications
mem = self.worker_memory
logger.info("mem specification for LSF not set, "
"initializing it to %s" % mem)
if mem is not None:
memory_string = lsf_format_bytes_ceil(mem)
header_lines.append('#BSUB -M %s' % memory_string)
if walltime is not None:
header_lines.append('#BSUB -W %s' % walltime)
header_lines.extend(['#BSUB %s' % arg for arg in job_extra])
header_lines.append('JOB_ID=${LSB_JOBID%.*}')

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('<')[1].split('>')[0].strip()

def _submit_job(self, script_filename):
piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null']
return self._call(piped_cmd, shell=True)


def lsf_format_bytes_ceil(n):
""" Format bytes as text

Convert bytes to megabytes which LSF requires.

Parameters
----------
n: int
Bytes

Examples
--------
>>> lsf_format_bytes_ceil(1234567890)
'1235'
"""
return '%d' % math.ceil(n / (1000**2))
Loading