Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lsf #78

Merged
merged 23 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .pbs import PBSCluster
from .slurm import SLURMCluster
from .sge import SGECluster
from .lsf import LSFCluster

from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
16 changes: 15 additions & 1 deletion dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class JobQueueCluster(Cluster):
cancel_command: str
Abstract attribute for job scheduler cancel command,
should be overriden
scheduler_name: str
Abstract attribute for job scheduler name,
should be overriden

See Also
--------
Expand All @@ -88,6 +91,11 @@ class JobQueueCluster(Cluster):
cancel_command = None
scheduler_name = ''

# Required for excuting commands through the shell in the subprocess module.
# It handles the shell input redirection e.g. bsub < script_filename.sh
# and does not consider '<' as a command, file or directory.
shell = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would not use a comment in the base class.

Also I would go for a more informative variable name, maybe popen_shell = True (better name more than welcome)?


def __init__(self,
name=None,
cores=None,
Expand Down Expand Up @@ -210,12 +218,16 @@ def job_file(self):
f.write(self.job_script())
yield fn

def submit_job(self, script_filename):
""" Sumbits job """
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I am in favour of not having a docstring for this function since this is just a one-liner. More generally I think if your docstring is just repeating the function name, remove it.

return self._call(shlex.split(self.submit_command) + [script_filename])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a private method so that users don't get the wrong idea that they should use it to submit jobs.


def start_workers(self, n=1):
""" Start workers and point them to our local scheduler """
workers = []
for _ in range(n):
with self.job_file() as fn:
out = self._call(shlex.split(self.submit_command) + [fn])
out = self.submit_job(fn)
job = self._job_id_from_submit_output(out.decode())
self.jobs[self.n] = job
workers.append(self.n)
Expand Down Expand Up @@ -251,6 +263,7 @@ def _calls(self, cmds):
for cmd in cmds:
logger.debug(' '.join(cmd))
procs = [subprocess.Popen(cmd,
shell=self.shell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for cmd in cmds]
Expand All @@ -273,6 +286,7 @@ def stop_workers(self, workers):
return
workers = list(map(int, workers))
jobs = [self.jobs[w] for w in workers]
self.shell = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably don't need this.

self._call([self.cancel_command] + list(jobs))
for w in workers:
with ignoring(KeyError):
Expand Down
22 changes: 22 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,25 @@ jobqueue:
env-extra: []
resource-spec: null
job-extra: []

lsf:
name: dask-worker

# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR

# LSF resource manager options
queue: null
project: null
walltime: '00:30'
extra: ""
env-extra: []
ncpus: null
mem: null
job-extra: []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking in, does LSF need/use all of these options? I'm slightly concerned that as we copy configs from different systems we may accrue more than is necessary.

Copy link
Member Author

@raybellwaves raybellwaves Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only ones I haven't used are extra and env-extra. extra is for Additional arguments to pass to dask-worker so that is probably worth keeping. env-extra is other commands to the script before launching the worker. I can't see myself using this but core.py checks for it

env_extra : list

A future PR could be to move that out of core.py and have users specify it in their individual classes. I'll let you decide that.

127 changes: 127 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from __future__ import absolute_import, division, print_function

import logging
import math
import os

import dask

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


class LSFCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster

Parameters
----------
queue : str
Destination queue for each worker job. Passed to `#BSUB -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#BSUB -P` option.
ncpus : int
Number of cpus. Passed to `#BSUB -n` option.
mem : int
Request memory in bytes. Passed to `#BSUB -M` option.
walltime : str
Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option.
job_extra : list
List of other LSF options, for example -u. Each option witll be
prepended with the #LSF prefix.
%(JobQueueCluster.parameters)s

Examples
--------
>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFcluster(queue='general', project='DaskonLSF')
>>> cluster.scale(10) # this may take a few seconds to launch

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and
kill workers based on load.

>>> cluster.adapt()
""", 4)

# Override class variables
submit_command = 'bsub'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand from your last comment, the submit_command should be bsub < instead of just bsub.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raybellwaves any response to this comment?

cancel_command = 'bkill'
scheduler_name = 'lsf'

# Required for excuting commands through the shell in the subprocess module.
# It handles the shell input redirection e.g. bsub < script_filename.sh
# and does not consider '<' as a command, file or directory.
shell = True

def __init__(self, queue=None, project=None, ncpus=None, mem=None,
walltime=None, job_extra=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name)
if ncpus is None:
ncpus = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name)
if mem is None:
memory = dask.config.get('jobqueue.%s.mem' % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name)

# Instantiate args and parameters from parent abstract class
super(LSFCluster, self).__init__(**kwargs)

header_lines = []
# LSF header build
if self.name is not None:
header_lines.append('#BSUB -J %s' % self.name)
header_lines.append('#BSUB -e %s.err' % self.name)
header_lines.append('#BSUB -o %s.out' % self.name)
if queue is not None:
header_lines.append('#BSUB -q %s' % queue)
if project is not None:
header_lines.append('#BSUB -P %s' % project)
if ncpus is None:
# Compute default cores specifications
ncpus = self.worker_cores
logger.info("ncpus specification for LSF not set, "
"initializing it to %s" % ncpus)
if ncpus is not None:
header_lines.append('#BSUB -n %s' % ncpus)
if mem is None:
# Compute default memory specifications
mem = self.worker_memory
logger.info("mem specification for LSF not set, "
"initializing it to %s" % mem)
if mem is not None:
memory_string = lsf_format_bytes_ceil(mem)
header_lines.append('#BSUB -M %s' % memory_string)
if walltime is not None:
header_lines.append('#BSUB -W %s' % walltime)
header_lines.extend(['#BSUB %s' % arg for arg in job_extra])

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'll need to add a line here to set the JOB_ID environment variable. I think this will come from the lsf variable: LSB_JOBID. The PBSCluster has a good example that you can follow.

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('<')[1].split('>')[0].strip()

def submit_job(self, script_filename):
""" Sumbits job and handles lsf exception """
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this docstring too, same typo interestingly :-)

piped_cmd = [self.submit_command + '<\"' + script_filename + '\"']
return self._call(piped_cmd)


def lsf_format_bytes_ceil(n):
""" Format bytes as text
LSF expects megabytes
Copy link
Member

@lesteve lesteve Jul 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really know what LSF expects but for MB you have to divide by 1024**2 but by 1000**2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raybellwaves any response to this comment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it would be good to format this docstring like http://dask.pydata.org/en/latest/develop.html#docstrings

>>> lsf_format_bytes_ceil(1234567890)
'1178'
"""
return '%d' % math.ceil(n / (1024**2))
2 changes: 1 addition & 1 deletion dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
super(SGECluster, self).__init__(**kwargs)

header_lines = ['#!/bin/bash']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change this back to #!/usr/bin/env bash? I must have missed this in the merge conflict resolution.


# SGE header build
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General piece of advice: do your best to keep a PR focussed and avoid unrelated changes. It's fine to keep it like this for this PR.

if self.name is not None:
header_lines.append('#$ -N %(name)s')
if queue is not None:
Expand Down
153 changes: 153 additions & 0 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import sys
from time import sleep, time

import dask
from dask.distributed import Client
from distributed.utils_test import loop # noqa: F401
import pytest

from dask_jobqueue import LSFCluster


def test_header():
with LSFCluster(walltime='00:02', processes=4, cores=8, memory='8GB') as cluster:

assert '#BSUB' in cluster.job_header
assert '#BSUB -J dask-worker' in cluster.job_header
assert '#BSUB -n 8' in cluster.job_header
assert '#BSUB -M 7630' in cluster.job_header
assert '#BSUB -W 00:02' in cluster.job_header
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header

with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8,
memory='28GB', ncpus=24, mem=100000000000) as cluster:

assert '#BSUB -q general' in cluster.job_header
assert '#BSUB -J dask-worker' in cluster.job_header
assert '#BSUB -n 24' in cluster.job_header
assert '#BSUB -n 8' not in cluster.job_header
assert '#BSUB -M 95368' in cluster.job_header
assert '#BSUB -M 26703' not in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -P DaskOnLSF' in cluster.job_header

with LSFCluster(cores=4, memory='8GB') as cluster:

assert '#BSUB -n' in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -M' in cluster.job_header
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header

with LSFCluster(cores=4, memory='8GB',
job_extra=['-u email@domain.com']) as cluster:

assert '#BSUB -u email@domain.com' in cluster.job_header
assert '#BSUB -n' in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -M' in cluster.job_header
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header


def test_job_script():
with LSFCluster(walltime='00:02', processes=4, cores=8,
memory='8GB') as cluster:

job_script = cluster.job_script()
assert '#BSUB' in job_script
assert '#BSUB -J dask-worker' in job_script
assert '#BSUB -n 8' in job_script
assert '#BSUB -M 7630' in job_script
assert '#BSUB -W 00:02' in job_script
assert '#BSUB -q' not in cluster.job_header
assert '#BSUB -P' not in cluster.job_header

assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 2.00GB' in job_script

with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8,
memory='28GB', ncpus=24, mem=100000000000) as cluster:

job_script = cluster.job_script()
assert '#BSUB -q general' in cluster.job_header
assert '#BSUB -J dask-worker' in cluster.job_header
assert '#BSUB -n 24' in cluster.job_header
assert '#BSUB -n 8' not in cluster.job_header
assert '#BSUB -M 95368' in cluster.job_header
assert '#BSUB -M 26703' not in cluster.job_header
assert '#BSUB -W' in cluster.job_header
assert '#BSUB -P DaskOnLSF' in cluster.job_header

assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7.00GB' in job_script


@pytest.mark.env("lsf") # noqa: F811
def test_basic(loop):
with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB',
local_directory='/tmp', loop=loop) as cluster:

with Client(cluster) as client:
workers = cluster.start_workers(2)
future = client.submit(lambda x: x + 1, 10)
assert future.result(60) == 11
assert cluster.jobs

info = client.scheduler_info()
w = list(info['workers'].values())[0]
assert w['memory_limit'] == 2e9
assert w['ncores'] == 2

cluster.stop_workers(workers)

start = time()
while len(client.scheduler_info()['workers']) > 0:
sleep(0.100)
assert time() < start + 10

assert not cluster.jobs


@pytest.mark.env("lsf") # noqa: F811
def test_adaptive(loop):
with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB',
local_directory='/tmp', loop=loop) as cluster:
cluster.adapt()
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)
assert future.result(60) == 11

assert cluster.jobs

start = time()
processes = cluster.worker_processes
while len(client.scheduler_info()['workers']) != processes:
sleep(0.1)
assert time() < start + 10

del future

start = time()
while len(client.scheduler_info()['workers']) > 0:
sleep(0.100)
assert time() < start + 10


def test_config(loop): # noqa: F811
with dask.config.set({'jobqueue.lsf.walltime': '00:02',
'jobqueue.lsf.local-directory': '/foo'}):
with LSFCluster(loop=loop, cores=1, memory='2GB') as cluster:
assert '00:02' in cluster.job_script()
assert '--local-directory /foo' in cluster.job_script()


def test_informative_errors():
with pytest.raises(ValueError) as info:
LSFCluster(memory=None, cores=4)
assert 'memory' in str(info.value)

with pytest.raises(ValueError) as info:
LSFCluster(memory='1GB', cores=None)
assert 'cores' in str(info.value)