From caf89e25bec99513879f92051eeb3b7bc8634459 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 10 Jul 2018 23:53:41 -0400 Subject: [PATCH] rebase --- dask_jobqueue/__init__.py | 1 + dask_jobqueue/core.py | 25 ++++-- dask_jobqueue/jobqueue.yaml | 22 +++++ dask_jobqueue/lsf.py | 117 ++++++++++++++++++++++++ dask_jobqueue/tests/test_lsf.py | 153 ++++++++++++++++++++++++++++++++ 5 files changed, 313 insertions(+), 5 deletions(-) create mode 100644 dask_jobqueue/lsf.py create mode 100644 dask_jobqueue/tests/test_lsf.py diff --git a/dask_jobqueue/__init__.py b/dask_jobqueue/__init__.py index 6c580fe7..33dbe3d3 100644 --- a/dask_jobqueue/__init__.py +++ b/dask_jobqueue/__init__.py @@ -5,6 +5,7 @@ from .pbs import PBSCluster from .slurm import SLURMCluster from .sge import SGECluster +from .lsf import LSFCluster from ._version import get_versions __version__ = get_versions()['version'] diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index ab4b6024..42fad4a3 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -66,6 +66,9 @@ class JobQueueCluster(Cluster): cancel_command: str Abstract attribute for job scheduler cancel command, should be overriden + scheduler_name: str + Abstract attribute for job scheduler name, + should be overriden See Also -------- @@ -100,6 +103,7 @@ def __init__(self, env_extra=None, walltime=None, threads=None, + shell=False, **kwargs ): """ """ @@ -153,9 +157,9 @@ def __init__(self, else: host = socket.gethostname() - self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - # Keep information on process, cores, and memory, for use in subclasses + # Keep information on process, threads and memory, for use in subclasses self.worker_memory = parse_bytes(memory) self.worker_processes = processes @@ -210,12 +214,21 @@ def job_file(self): f.write(self.job_script()) yield fn + def submit_job(self, script_filename): + """ Sumbits job and handles lsf exception """ + if self.scheduler_name == 'lsf': + piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] + self.shell = True + return self._call(piped_cmd) + else: + return self._call(shlex.split(self.submit_command) + [script_filename]) + def start_workers(self, n=1): """ Start workers and point them to our local scheduler """ workers = [] for _ in range(n): with self.job_file() as fn: - out = self._call(shlex.split(self.submit_command) + [fn]) + out = self.submit_job(fn) job = self._job_id_from_submit_output(out.decode()) self.jobs[self.n] = job workers.append(self.n) @@ -224,7 +237,7 @@ def start_workers(self, n=1): @property def scheduler(self): """ The scheduler of this cluster """ - return self.local_cluster.scheduler + return self.cluster.scheduler def _calls(self, cmds): """ Call a command using subprocess.communicate @@ -251,6 +264,7 @@ def _calls(self, cmds): for cmd in cmds: logger.debug(' '.join(cmd)) procs = [subprocess.Popen(cmd, + shell=self.shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmds] @@ -273,6 +287,7 @@ def stop_workers(self, workers): return workers = list(map(int, workers)) jobs = [self.jobs[w] for w in workers] + self.shell = False self._call([self.cancel_command] + list(jobs)) for w in workers: with ignoring(KeyError): @@ -294,7 +309,7 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.stop_workers(self.jobs) - self.local_cluster.__exit__(type, value, traceback) + self.cluster.__exit__(type, value, traceback) def _job_id_from_submit_output(self, out): raise NotImplementedError('_job_id_from_submit_output must be ' diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index b495f132..1b21daba 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -83,3 +83,25 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + + lsf: + name: dask-worker + + # Dask worker options + cores: null # Total number of cores per job + memory: null # Total amount of memory per job + processes: 1 # Number of Python processes per job + + interface: null # Network interface to use like eth0 or ib0 + death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler + local-directory: null # Location of fast local storage like /scratch or $TMPDIR + + # LSF resource manager options + queue: null + project: null + walltime: '00:30' + extra: "" + env-extra: [] + ncpus: null + mem: null + job-extra: {} diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py new file mode 100644 index 00000000..523928b3 --- /dev/null +++ b/dask_jobqueue/lsf.py @@ -0,0 +1,117 @@ +from __future__ import absolute_import, division, print_function + +import logging +import math +import os + +import dask + +from .core import JobQueueCluster, docstrings + +logger = logging.getLogger(__name__) + + +class LSFCluster(JobQueueCluster): + __doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster + + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#BSUB -q` option. + project : str + Accounting string associated with each worker job. Passed to + `#BSUB -P` option. + ncpus : int + Number of cpus. Passed to `#BSUB -n` option. + mem : int + Request memory in bytes. Passed to `#BSUB -M` option. + walltime : str + Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. + job_extra : list + List of other LSF options, for example -u. Each option witll be + prepended with the #LSF prefix. + %(JobQueueCluster.parameters)s + + Examples + -------- + >>> from dask_jobqueue import LSFCluster + >>> cluster = LSFcluster(queue='general', project='DaskonLSF') + >>> cluster.scale(10) # this may take a few seconds to launch + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and + kill workers based on load. + + >>> cluster.adapt() + """, 4) + + # Override class variables + submit_command = 'bsub' + cancel_command = 'bkill' + scheduler_name = 'lsf' + + def __init__(self, queue=None, project=None, ncpus=None, mem=None, + walltime=None, job_extra=None, **kwargs): + if queue is None: + queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name) + if project is None: + project = dask.config.get('jobqueue.%s.project' % self.scheduler_name) + if ncpus is None: + queue = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name) + if mem is None: + memory = dask.config.get('jobqueue.%s.mem' % self.scheduler_name) + if walltime is None: + walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name) + if job_extra is None: + job_extra = dask.config.get('jobqueue.lsf.job-extra') + + # Instantiate args and parameters from parent abstract class + super(LSFCluster, self).__init__(**kwargs) + + header_lines = [] + # LSF header build + if self.name is not None: + header_lines.append('#BSUB -J %s' % self.name) + header_lines.append('#BSUB -e %s.err' % self.name) + header_lines.append('#BSUB -o %s.out' % self.name) + if queue is not None: + header_lines.append('#BSUB -q %s' % queue) + if project is not None: + header_lines.append('#BSUB -P %s' % project) + if ncpus is None: + # Compute default cores specifications + ncpus = self.worker_cores + logger.info("ncpus specification for LSF not set, " + "initializing it to %s" % ncpus) + if ncpus is not None: + header_lines.append('#BSUB -n %s' % ncpus) + if mem is None: + # Compute default memory specifications + mem = self.worker_memory + logger.info("mem specification for LSF not set, " + "initializing it to %s" % mem) + if mem is not None: + memory_string = lsf_format_bytes_ceil(mem) + header_lines.append('#BSUB -M %s' % memory_string) + if walltime is not None: + header_lines.append('#BSUB -W %s' % walltime) + header_lines.extend(['#BSUB %s' % arg for arg in job_extra]) + + # Declare class attribute that shall be overriden + self.job_header = '\n'.join(header_lines) + + logger.debug("Job script: \n %s" % self.job_script()) + + def _job_id_from_submit_output(self, out): + return out.split('<')[1].split('>')[0].strip() + + +def lsf_format_bytes_ceil(n): + """ Format bytes as text + LSF expects megabytes + >>> lsf_format_bytes_ceil(1234567890) + '1178' + """ + return '%d' % math.ceil(n / (1024**2)) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py new file mode 100644 index 00000000..7f72819b --- /dev/null +++ b/dask_jobqueue/tests/test_lsf.py @@ -0,0 +1,153 @@ +import sys +from time import sleep, time + +import dask +from dask.distributed import Client +from distributed.utils_test import loop # noqa: F401 +import pytest + +from dask_jobqueue import LSFCluster + + +def test_header(): + with LSFCluster(walltime='00:02', processes=4, cores=8, memory='8GB') as cluster: + + assert '#BSUB' in cluster.job_header + assert '#BSUB -J dask-worker' in cluster.job_header + assert '#BSUB -n 8' in cluster.job_header + assert '#BSUB -M 7630' in cluster.job_header + assert '#BSUB -W 00:02' in cluster.job_header + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8, + memory='28GB', ncpus=24, mem=100000000000) as cluster: + + assert '#BSUB -q general' in cluster.job_header + assert '#BSUB -J dask-worker' in cluster.job_header + assert '#BSUB -n 24' in cluster.job_header + assert '#BSUB -n 8' not in cluster.job_header + assert '#BSUB -M 95368' in cluster.job_header + assert '#BSUB -M 26703' not in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -P DaskOnLSF' in cluster.job_header + + with LSFCluster(cores=4, memory='8GB') as cluster: + + assert '#BSUB -n' in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -M' in cluster.job_header + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + with LSFCluster(cores=4, memory='8GB', + job_extra=['-u email@domain.com']) as cluster: + + assert '#BSUB -u email@domain.com' in cluster.job_header + assert '#BSUB -n' in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -M' in cluster.job_header + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + +def test_job_script(): + with LSFCluster(walltime='00:02', processes=4, cores=8, + memory='8GB') as cluster: + + job_script = cluster.job_script() + assert '#BSUB' in job_script + assert '#BSUB -J dask-worker' in job_script + assert '#BSUB -n 8' in job_script + assert '#BSUB -M 26703' in job_script + assert '#BSUB -W 00:02' in job_script + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script + assert '--nthreads 2 --nprocs 4 --memory-limit 2.00GB' in job_script + + with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8, + memory='28GB', ncpu=24, mem=100000000000) as cluster: + + job_script = cluster.job_script() + assert '#BSUB -q general' in cluster.job_header + assert '#BSUB -J dask-worker' in cluster.job_header + assert '#BSUB -n 24' in cluster.job_header + assert '#BSUB -n 8' not in cluster.job_header + assert '#BSUB -M 95368' in cluster.job_header + assert '#BSUB -M 26703' not in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -P DaskOnLSF' in cluster.job_header + + assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script + assert '--nthreads 2 --nprocs 4 --memory-limit 7.00GB' in job_script + + +@pytest.mark.env("lsf") # noqa: F811 +def test_basic(loop): + with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB', + local_directory='/tmp', loop=loop) as cluster: + + with Client(cluster) as client: + workers = cluster.start_workers(2) + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + assert cluster.jobs + + info = client.scheduler_info() + w = list(info['workers'].values())[0] + assert w['memory_limit'] == 2e9 + assert w['ncores'] == 2 + + cluster.stop_workers(workers) + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + assert not cluster.jobs + + +@pytest.mark.env("lsf") # noqa: F811 +def test_adaptive(loop): + with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB', + local_directory='/tmp', loop=loop) as cluster: + cluster.adapt() + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + + assert cluster.jobs + + start = time() + processes = cluster.worker_processes + while len(client.scheduler_info()['workers']) != processes: + sleep(0.1) + assert time() < start + 10 + + del future + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + +def test_config(loop): # noqa: F811 + with dask.config.set({'jobqueue.lsf.walltime': '00:02', + 'jobqueue.lsf.local-directory': '/foo'}): + with LSFCluster(loop=loop) as cluster: + assert '00:02' in cluster.job_script() + assert '--local-directory /foo' in cluster.job_script() + + +def test_informative_errors(): + with pytest.raises(ValueError) as info: + LSFCluster(memory=None, cores=4) + assert 'memory' in str(info.value) + + with pytest.raises(ValueError) as info: + LSFCluster(memory='1GB', cores=None) + assert 'cores' in str(info.value)