From caf89e25bec99513879f92051eeb3b7bc8634459 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 10 Jul 2018 23:53:41 -0400 Subject: [PATCH 01/22] rebase --- dask_jobqueue/__init__.py | 1 + dask_jobqueue/core.py | 25 ++++-- dask_jobqueue/jobqueue.yaml | 22 +++++ dask_jobqueue/lsf.py | 117 ++++++++++++++++++++++++ dask_jobqueue/tests/test_lsf.py | 153 ++++++++++++++++++++++++++++++++ 5 files changed, 313 insertions(+), 5 deletions(-) create mode 100644 dask_jobqueue/lsf.py create mode 100644 dask_jobqueue/tests/test_lsf.py diff --git a/dask_jobqueue/__init__.py b/dask_jobqueue/__init__.py index 6c580fe7..33dbe3d3 100644 --- a/dask_jobqueue/__init__.py +++ b/dask_jobqueue/__init__.py @@ -5,6 +5,7 @@ from .pbs import PBSCluster from .slurm import SLURMCluster from .sge import SGECluster +from .lsf import LSFCluster from ._version import get_versions __version__ = get_versions()['version'] diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index ab4b6024..42fad4a3 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -66,6 +66,9 @@ class JobQueueCluster(Cluster): cancel_command: str Abstract attribute for job scheduler cancel command, should be overriden + scheduler_name: str + Abstract attribute for job scheduler name, + should be overriden See Also -------- @@ -100,6 +103,7 @@ def __init__(self, env_extra=None, walltime=None, threads=None, + shell=False, **kwargs ): """ """ @@ -153,9 +157,9 @@ def __init__(self, else: host = socket.gethostname() - self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - # Keep information on process, cores, and memory, for use in subclasses + # Keep information on process, threads and memory, for use in subclasses self.worker_memory = parse_bytes(memory) self.worker_processes = processes @@ -210,12 +214,21 @@ def job_file(self): f.write(self.job_script()) yield fn + def submit_job(self, script_filename): + """ Sumbits job and handles lsf exception """ + if self.scheduler_name == 'lsf': + piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] + self.shell = True + return self._call(piped_cmd) + else: + return self._call(shlex.split(self.submit_command) + [script_filename]) + def start_workers(self, n=1): """ Start workers and point them to our local scheduler """ workers = [] for _ in range(n): with self.job_file() as fn: - out = self._call(shlex.split(self.submit_command) + [fn]) + out = self.submit_job(fn) job = self._job_id_from_submit_output(out.decode()) self.jobs[self.n] = job workers.append(self.n) @@ -224,7 +237,7 @@ def start_workers(self, n=1): @property def scheduler(self): """ The scheduler of this cluster """ - return self.local_cluster.scheduler + return self.cluster.scheduler def _calls(self, cmds): """ Call a command using subprocess.communicate @@ -251,6 +264,7 @@ def _calls(self, cmds): for cmd in cmds: logger.debug(' '.join(cmd)) procs = [subprocess.Popen(cmd, + shell=self.shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmds] @@ -273,6 +287,7 @@ def stop_workers(self, workers): return workers = list(map(int, workers)) jobs = [self.jobs[w] for w in workers] + self.shell = False self._call([self.cancel_command] + list(jobs)) for w in workers: with ignoring(KeyError): @@ -294,7 +309,7 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.stop_workers(self.jobs) - self.local_cluster.__exit__(type, value, traceback) + self.cluster.__exit__(type, value, traceback) def _job_id_from_submit_output(self, out): raise NotImplementedError('_job_id_from_submit_output must be ' diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index b495f132..1b21daba 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -83,3 +83,25 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + + lsf: + name: dask-worker + + # Dask worker options + cores: null # Total number of cores per job + memory: null # Total amount of memory per job + processes: 1 # Number of Python processes per job + + interface: null # Network interface to use like eth0 or ib0 + death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler + local-directory: null # Location of fast local storage like /scratch or $TMPDIR + + # LSF resource manager options + queue: null + project: null + walltime: '00:30' + extra: "" + env-extra: [] + ncpus: null + mem: null + job-extra: {} diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py new file mode 100644 index 00000000..523928b3 --- /dev/null +++ b/dask_jobqueue/lsf.py @@ -0,0 +1,117 @@ +from __future__ import absolute_import, division, print_function + +import logging +import math +import os + +import dask + +from .core import JobQueueCluster, docstrings + +logger = logging.getLogger(__name__) + + +class LSFCluster(JobQueueCluster): + __doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster + + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#BSUB -q` option. + project : str + Accounting string associated with each worker job. Passed to + `#BSUB -P` option. + ncpus : int + Number of cpus. Passed to `#BSUB -n` option. + mem : int + Request memory in bytes. Passed to `#BSUB -M` option. + walltime : str + Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. + job_extra : list + List of other LSF options, for example -u. Each option witll be + prepended with the #LSF prefix. + %(JobQueueCluster.parameters)s + + Examples + -------- + >>> from dask_jobqueue import LSFCluster + >>> cluster = LSFcluster(queue='general', project='DaskonLSF') + >>> cluster.scale(10) # this may take a few seconds to launch + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and + kill workers based on load. + + >>> cluster.adapt() + """, 4) + + # Override class variables + submit_command = 'bsub' + cancel_command = 'bkill' + scheduler_name = 'lsf' + + def __init__(self, queue=None, project=None, ncpus=None, mem=None, + walltime=None, job_extra=None, **kwargs): + if queue is None: + queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name) + if project is None: + project = dask.config.get('jobqueue.%s.project' % self.scheduler_name) + if ncpus is None: + queue = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name) + if mem is None: + memory = dask.config.get('jobqueue.%s.mem' % self.scheduler_name) + if walltime is None: + walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name) + if job_extra is None: + job_extra = dask.config.get('jobqueue.lsf.job-extra') + + # Instantiate args and parameters from parent abstract class + super(LSFCluster, self).__init__(**kwargs) + + header_lines = [] + # LSF header build + if self.name is not None: + header_lines.append('#BSUB -J %s' % self.name) + header_lines.append('#BSUB -e %s.err' % self.name) + header_lines.append('#BSUB -o %s.out' % self.name) + if queue is not None: + header_lines.append('#BSUB -q %s' % queue) + if project is not None: + header_lines.append('#BSUB -P %s' % project) + if ncpus is None: + # Compute default cores specifications + ncpus = self.worker_cores + logger.info("ncpus specification for LSF not set, " + "initializing it to %s" % ncpus) + if ncpus is not None: + header_lines.append('#BSUB -n %s' % ncpus) + if mem is None: + # Compute default memory specifications + mem = self.worker_memory + logger.info("mem specification for LSF not set, " + "initializing it to %s" % mem) + if mem is not None: + memory_string = lsf_format_bytes_ceil(mem) + header_lines.append('#BSUB -M %s' % memory_string) + if walltime is not None: + header_lines.append('#BSUB -W %s' % walltime) + header_lines.extend(['#BSUB %s' % arg for arg in job_extra]) + + # Declare class attribute that shall be overriden + self.job_header = '\n'.join(header_lines) + + logger.debug("Job script: \n %s" % self.job_script()) + + def _job_id_from_submit_output(self, out): + return out.split('<')[1].split('>')[0].strip() + + +def lsf_format_bytes_ceil(n): + """ Format bytes as text + LSF expects megabytes + >>> lsf_format_bytes_ceil(1234567890) + '1178' + """ + return '%d' % math.ceil(n / (1024**2)) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py new file mode 100644 index 00000000..7f72819b --- /dev/null +++ b/dask_jobqueue/tests/test_lsf.py @@ -0,0 +1,153 @@ +import sys +from time import sleep, time + +import dask +from dask.distributed import Client +from distributed.utils_test import loop # noqa: F401 +import pytest + +from dask_jobqueue import LSFCluster + + +def test_header(): + with LSFCluster(walltime='00:02', processes=4, cores=8, memory='8GB') as cluster: + + assert '#BSUB' in cluster.job_header + assert '#BSUB -J dask-worker' in cluster.job_header + assert '#BSUB -n 8' in cluster.job_header + assert '#BSUB -M 7630' in cluster.job_header + assert '#BSUB -W 00:02' in cluster.job_header + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8, + memory='28GB', ncpus=24, mem=100000000000) as cluster: + + assert '#BSUB -q general' in cluster.job_header + assert '#BSUB -J dask-worker' in cluster.job_header + assert '#BSUB -n 24' in cluster.job_header + assert '#BSUB -n 8' not in cluster.job_header + assert '#BSUB -M 95368' in cluster.job_header + assert '#BSUB -M 26703' not in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -P DaskOnLSF' in cluster.job_header + + with LSFCluster(cores=4, memory='8GB') as cluster: + + assert '#BSUB -n' in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -M' in cluster.job_header + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + with LSFCluster(cores=4, memory='8GB', + job_extra=['-u email@domain.com']) as cluster: + + assert '#BSUB -u email@domain.com' in cluster.job_header + assert '#BSUB -n' in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -M' in cluster.job_header + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + +def test_job_script(): + with LSFCluster(walltime='00:02', processes=4, cores=8, + memory='8GB') as cluster: + + job_script = cluster.job_script() + assert '#BSUB' in job_script + assert '#BSUB -J dask-worker' in job_script + assert '#BSUB -n 8' in job_script + assert '#BSUB -M 26703' in job_script + assert '#BSUB -W 00:02' in job_script + assert '#BSUB -q' not in cluster.job_header + assert '#BSUB -P' not in cluster.job_header + + assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script + assert '--nthreads 2 --nprocs 4 --memory-limit 2.00GB' in job_script + + with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8, + memory='28GB', ncpu=24, mem=100000000000) as cluster: + + job_script = cluster.job_script() + assert '#BSUB -q general' in cluster.job_header + assert '#BSUB -J dask-worker' in cluster.job_header + assert '#BSUB -n 24' in cluster.job_header + assert '#BSUB -n 8' not in cluster.job_header + assert '#BSUB -M 95368' in cluster.job_header + assert '#BSUB -M 26703' not in cluster.job_header + assert '#BSUB -W' in cluster.job_header + assert '#BSUB -P DaskOnLSF' in cluster.job_header + + assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script + assert '--nthreads 2 --nprocs 4 --memory-limit 7.00GB' in job_script + + +@pytest.mark.env("lsf") # noqa: F811 +def test_basic(loop): + with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB', + local_directory='/tmp', loop=loop) as cluster: + + with Client(cluster) as client: + workers = cluster.start_workers(2) + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + assert cluster.jobs + + info = client.scheduler_info() + w = list(info['workers'].values())[0] + assert w['memory_limit'] == 2e9 + assert w['ncores'] == 2 + + cluster.stop_workers(workers) + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + assert not cluster.jobs + + +@pytest.mark.env("lsf") # noqa: F811 +def test_adaptive(loop): + with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB', + local_directory='/tmp', loop=loop) as cluster: + cluster.adapt() + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + assert future.result(60) == 11 + + assert cluster.jobs + + start = time() + processes = cluster.worker_processes + while len(client.scheduler_info()['workers']) != processes: + sleep(0.1) + assert time() < start + 10 + + del future + + start = time() + while len(client.scheduler_info()['workers']) > 0: + sleep(0.100) + assert time() < start + 10 + + +def test_config(loop): # noqa: F811 + with dask.config.set({'jobqueue.lsf.walltime': '00:02', + 'jobqueue.lsf.local-directory': '/foo'}): + with LSFCluster(loop=loop) as cluster: + assert '00:02' in cluster.job_script() + assert '--local-directory /foo' in cluster.job_script() + + +def test_informative_errors(): + with pytest.raises(ValueError) as info: + LSFCluster(memory=None, cores=4) + assert 'memory' in str(info.value) + + with pytest.raises(ValueError) as info: + LSFCluster(memory='1GB', cores=None) + assert 'cores' in str(info.value) From 9a5e5c855a79e3447977dc7d6e6080548adc0615 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Wed, 11 Jul 2018 00:04:03 -0400 Subject: [PATCH 02/22] update local_cluster --- dask_jobqueue/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 42fad4a3..8fcabd61 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -157,7 +157,7 @@ def __init__(self, else: host = socket.gethostname() - self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs) # Keep information on process, threads and memory, for use in subclasses self.worker_memory = parse_bytes(memory) @@ -237,7 +237,7 @@ def start_workers(self, n=1): @property def scheduler(self): """ The scheduler of this cluster """ - return self.cluster.scheduler + return self.local_cluster.scheduler def _calls(self, cmds): """ Call a command using subprocess.communicate @@ -309,7 +309,7 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.stop_workers(self.jobs) - self.cluster.__exit__(type, value, traceback) + self.local_cluster.__exit__(type, value, traceback) def _job_id_from_submit_output(self, out): raise NotImplementedError('_job_id_from_submit_output must be ' From 1e47f4d961103e100bc530e75529757c0c5598c4 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Wed, 11 Jul 2018 00:06:33 -0400 Subject: [PATCH 03/22] update thread to cores --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 8fcabd61..1b1c649f 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -159,7 +159,7 @@ def __init__(self, self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - # Keep information on process, threads and memory, for use in subclasses + # Keep information on process, cores and memory, for use in subclasses self.worker_memory = parse_bytes(memory) self.worker_processes = processes From b6d248abec189dc215df84cb2a565062c68e4e3d Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Wed, 11 Jul 2018 00:08:03 -0400 Subject: [PATCH 04/22] add comma --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 1b1c649f..5dbdb9c4 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -159,7 +159,7 @@ def __init__(self, self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - # Keep information on process, cores and memory, for use in subclasses + # Keep information on process, cores, and memory, for use in subclasses self.worker_memory = parse_bytes(memory) self.worker_processes = processes From 7fbd76a24c03471681da65f54c25393987b67995 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Wed, 11 Jul 2018 17:44:30 -0400 Subject: [PATCH 05/22] typos --- dask_jobqueue/lsf.py | 2 +- dask_jobqueue/tests/test_lsf.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 523928b3..a15fa123 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -59,7 +59,7 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, if project is None: project = dask.config.get('jobqueue.%s.project' % self.scheduler_name) if ncpus is None: - queue = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name) + ncpus = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name) if mem is None: memory = dask.config.get('jobqueue.%s.mem' % self.scheduler_name) if walltime is None: diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 7f72819b..e3511cca 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -59,7 +59,7 @@ def test_job_script(): assert '#BSUB' in job_script assert '#BSUB -J dask-worker' in job_script assert '#BSUB -n 8' in job_script - assert '#BSUB -M 26703' in job_script + assert '#BSUB -M 7630' in job_script assert '#BSUB -W 00:02' in job_script assert '#BSUB -q' not in cluster.job_header assert '#BSUB -P' not in cluster.job_header From 72f195cd94d6a6260f90784802b419c70fc6994a Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Wed, 11 Jul 2018 17:49:50 -0400 Subject: [PATCH 06/22] one more typo --- dask_jobqueue/tests/test_lsf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index e3511cca..44efb878 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -68,7 +68,7 @@ def test_job_script(): assert '--nthreads 2 --nprocs 4 --memory-limit 2.00GB' in job_script with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8, - memory='28GB', ncpu=24, mem=100000000000) as cluster: + memory='28GB', ncpus=24, mem=100000000000) as cluster: job_script = cluster.job_script() assert '#BSUB -q general' in cluster.job_header From f9970b3f9805ea10c44bc34c8a46401db3ca8fb9 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Wed, 11 Jul 2018 17:52:27 -0400 Subject: [PATCH 07/22] add cores and memory parameters to config test --- dask_jobqueue/tests/test_lsf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 44efb878..22456fa0 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -138,7 +138,7 @@ def test_adaptive(loop): def test_config(loop): # noqa: F811 with dask.config.set({'jobqueue.lsf.walltime': '00:02', 'jobqueue.lsf.local-directory': '/foo'}): - with LSFCluster(loop=loop) as cluster: + with LSFCluster(loop=loop, cores=1, memory='2GB') as cluster: assert '00:02' in cluster.job_script() assert '--local-directory /foo' in cluster.job_script() From 702b1ec76b06f75b05fb373db85591a5096e2400 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Wed, 11 Jul 2018 20:02:01 -0400 Subject: [PATCH 08/22] move submit_job to lsf.py --- dask_jobqueue/core.py | 9 ++------- dask_jobqueue/jobqueue.yaml | 2 +- dask_jobqueue/lsf.py | 8 +++++++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5dbdb9c4..4c6130b8 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -215,13 +215,8 @@ def job_file(self): yield fn def submit_job(self, script_filename): - """ Sumbits job and handles lsf exception """ - if self.scheduler_name == 'lsf': - piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] - self.shell = True - return self._call(piped_cmd) - else: - return self._call(shlex.split(self.submit_command) + [script_filename]) + """ Sumbits job """ + return self._call(shlex.split(self.submit_command) + [script_filename]) def start_workers(self, n=1): """ Start workers and point them to our local scheduler """ diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index 1b21daba..121ec633 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -104,4 +104,4 @@ jobqueue: env-extra: [] ncpus: null mem: null - job-extra: {} + job-extra: [] diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index a15fa123..194746d8 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -65,7 +65,7 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, if walltime is None: walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name) if job_extra is None: - job_extra = dask.config.get('jobqueue.lsf.job-extra') + job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name) # Instantiate args and parameters from parent abstract class super(LSFCluster, self).__init__(**kwargs) @@ -107,6 +107,12 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, def _job_id_from_submit_output(self, out): return out.split('<')[1].split('>')[0].strip() + def submit_job(self, script_filename): + """ Sumbits job and handles lsf exception """ + piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] + self.shell = True + return self._call(piped_cmd) + def lsf_format_bytes_ceil(n): """ Format bytes as text From d115ff4fbcf93345a0c18c5cb3e4f2df81dee0f4 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Thu, 12 Jul 2018 11:48:20 -0400 Subject: [PATCH 09/22] add self.shell in base classes --- dask_jobqueue/core.py | 1 - dask_jobqueue/lsf.py | 4 +++- dask_jobqueue/pbs.py | 3 +++ dask_jobqueue/sge.py | 5 ++++- dask_jobqueue/slurm.py | 3 +++ 5 files changed, 13 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 4c6130b8..4047af87 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -103,7 +103,6 @@ def __init__(self, env_extra=None, walltime=None, threads=None, - shell=False, **kwargs ): """ """ diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 194746d8..9ada3722 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -70,6 +70,9 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, # Instantiate args and parameters from parent abstract class super(LSFCluster, self).__init__(**kwargs) + # Required for submitting jobs with LSF + self.shell = True + header_lines = [] # LSF header build if self.name is not None: @@ -110,7 +113,6 @@ def _job_id_from_submit_output(self, out): def submit_job(self, script_filename): """ Sumbits job and handles lsf exception """ piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] - self.shell = True return self._call(piped_cmd) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 6545b8dc..de29c592 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -75,6 +75,9 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, # Instantiate args and parameters from parent abstract class super(PBSCluster, self).__init__(**kwargs) + # Required for submitting jobs with LSF + self.shell = False + header_lines = [] # PBS header build if self.name is not None: diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index a7492f3f..12f26cce 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -56,8 +56,11 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, super(SGECluster, self).__init__(**kwargs) - header_lines = ['#!/bin/bash'] + # Required for submitting jobs with LSF + self.shell = False + header_lines = ['#!/bin/bash'] + # SGE header build if self.name is not None: header_lines.append('#$ -N %(name)s') if queue is not None: diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 00291b0b..012c711d 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -73,6 +73,9 @@ def __init__(self, queue=None, project=None, walltime=None, super(SLURMCluster, self).__init__(**kwargs) + # Required for submitting jobs with LSF + self.shell = False + # Always ask for only one task header_lines = [] # SLURM header build From d5f691beb9ac705c706941d183710993806db831 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Fri, 13 Jul 2018 17:29:53 -0400 Subject: [PATCH 10/22] move shell to class variable --- dask_jobqueue/core.py | 2 ++ dask_jobqueue/lsf.py | 5 ++--- dask_jobqueue/pbs.py | 3 --- dask_jobqueue/sge.py | 3 --- dask_jobqueue/slurm.py | 3 --- 5 files changed, 4 insertions(+), 12 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 4047af87..3d879062 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -90,6 +90,8 @@ class JobQueueCluster(Cluster): submit_command = None cancel_command = None scheduler_name = '' + + shell = False # Required for submitting jobs with LSF def __init__(self, name=None, diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 9ada3722..4fe576cc 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -51,6 +51,8 @@ class LSFCluster(JobQueueCluster): submit_command = 'bsub' cancel_command = 'bkill' scheduler_name = 'lsf' + + shell = True # Required for submitting jobs with LSF def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, **kwargs): @@ -70,9 +72,6 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, # Instantiate args and parameters from parent abstract class super(LSFCluster, self).__init__(**kwargs) - # Required for submitting jobs with LSF - self.shell = True - header_lines = [] # LSF header build if self.name is not None: diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index de29c592..6545b8dc 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -75,9 +75,6 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, # Instantiate args and parameters from parent abstract class super(PBSCluster, self).__init__(**kwargs) - # Required for submitting jobs with LSF - self.shell = False - header_lines = [] # PBS header build if self.name is not None: diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 12f26cce..f0e98480 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -56,9 +56,6 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, super(SGECluster, self).__init__(**kwargs) - # Required for submitting jobs with LSF - self.shell = False - header_lines = ['#!/bin/bash'] # SGE header build if self.name is not None: diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 012c711d..00291b0b 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -73,9 +73,6 @@ def __init__(self, queue=None, project=None, walltime=None, super(SLURMCluster, self).__init__(**kwargs) - # Required for submitting jobs with LSF - self.shell = False - # Always ask for only one task header_lines = [] # SLURM header build From 20616b66a0775a82bd95ed7e3be16d2960fdff58 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Fri, 13 Jul 2018 21:27:35 -0400 Subject: [PATCH 11/22] update comment --- dask_jobqueue/core.py | 7 +++++-- dask_jobqueue/lsf.py | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 3d879062..78889add 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -90,8 +90,11 @@ class JobQueueCluster(Cluster): submit_command = None cancel_command = None scheduler_name = '' - - shell = False # Required for submitting jobs with LSF + + # Required for excuting commands through the shell in the subprocess module. + # It handles the shell input redirection e.g. bsub < script_filename.sh + # and does not consider it a less than operator. + shell = False def __init__(self, name=None, diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 4fe576cc..fc885550 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -51,8 +51,11 @@ class LSFCluster(JobQueueCluster): submit_command = 'bsub' cancel_command = 'bkill' scheduler_name = 'lsf' - - shell = True # Required for submitting jobs with LSF + + # Required for excuting commands through the shell in the subprocess module. + # It handles the shell input redirection e.g. bsub < script_filename.sh + # and does not consider it a less than operator. + shell = True def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, **kwargs): From c87f02e4cafcf639a9cd2d89fde4cbf0f6bcde3c Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Fri, 13 Jul 2018 21:32:10 -0400 Subject: [PATCH 12/22] slighter better explanation --- dask_jobqueue/core.py | 2 +- dask_jobqueue/lsf.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 78889add..8b024cc0 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -93,7 +93,7 @@ class JobQueueCluster(Cluster): # Required for excuting commands through the shell in the subprocess module. # It handles the shell input redirection e.g. bsub < script_filename.sh - # and does not consider it a less than operator. + # and does not consider '<' as a 'less than' operator. shell = False def __init__(self, diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index fc885550..2905124b 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -54,7 +54,7 @@ class LSFCluster(JobQueueCluster): # Required for excuting commands through the shell in the subprocess module. # It handles the shell input redirection e.g. bsub < script_filename.sh - # and does not consider it a less than operator. + # and does not consider '<' as a 'less than' operator. shell = True def __init__(self, queue=None, project=None, ncpus=None, mem=None, From 5f370d51409ba1b0f0ef285011a12f1444f48bba Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Fri, 13 Jul 2018 23:24:18 -0400 Subject: [PATCH 13/22] update comment --- dask_jobqueue/core.py | 2 +- dask_jobqueue/lsf.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 8b024cc0..8c0a6d0a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -93,7 +93,7 @@ class JobQueueCluster(Cluster): # Required for excuting commands through the shell in the subprocess module. # It handles the shell input redirection e.g. bsub < script_filename.sh - # and does not consider '<' as a 'less than' operator. + # and does not consider '<' as a command, file or directory. shell = False def __init__(self, diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 2905124b..ddbdcc31 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -54,7 +54,7 @@ class LSFCluster(JobQueueCluster): # Required for excuting commands through the shell in the subprocess module. # It handles the shell input redirection e.g. bsub < script_filename.sh - # and does not consider '<' as a 'less than' operator. + # and does not consider '<' as a command, file or directory. shell = True def __init__(self, queue=None, project=None, ncpus=None, mem=None, From 462110a529895f986feda62863ae132a6cc57f8e Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Sun, 15 Jul 2018 22:26:51 -0400 Subject: [PATCH 14/22] clean up. add kill_jobs --- dask_jobqueue/core.py | 14 ++++++-------- dask_jobqueue/lsf.py | 12 ++++++++---- dask_jobqueue/tests/test_lsf.py | 12 ++++++------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 8c0a6d0a..4aa67e90 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -91,10 +91,7 @@ class JobQueueCluster(Cluster): cancel_command = None scheduler_name = '' - # Required for excuting commands through the shell in the subprocess module. - # It handles the shell input redirection e.g. bsub < script_filename.sh - # and does not consider '<' as a command, file or directory. - shell = False + popen_shell = False def __init__(self, name=None, @@ -219,7 +216,6 @@ def job_file(self): yield fn def submit_job(self, script_filename): - """ Sumbits job """ return self._call(shlex.split(self.submit_command) + [script_filename]) def start_workers(self, n=1): @@ -263,7 +259,7 @@ def _calls(self, cmds): for cmd in cmds: logger.debug(' '.join(cmd)) procs = [subprocess.Popen(cmd, - shell=self.shell, + shell=self.popen_shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmds] @@ -280,14 +276,16 @@ def _call(self, cmd): """ Singular version of _calls """ return self._calls([cmd])[0] + def kill_jobs(self, jobs): + return self._call([self.cancel_command] + list(jobs)) + def stop_workers(self, workers): """ Stop a list of workers""" if not workers: return workers = list(map(int, workers)) jobs = [self.jobs[w] for w in workers] - self.shell = False - self._call([self.cancel_command] + list(jobs)) + self.kill_jobs(jobs) for w in workers: with ignoring(KeyError): del self.jobs[w] diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index ddbdcc31..d83cc740 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -55,7 +55,7 @@ class LSFCluster(JobQueueCluster): # Required for excuting commands through the shell in the subprocess module. # It handles the shell input redirection e.g. bsub < script_filename.sh # and does not consider '<' as a command, file or directory. - shell = True + popen_shell = True def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, **kwargs): @@ -113,15 +113,19 @@ def _job_id_from_submit_output(self, out): return out.split('<')[1].split('>')[0].strip() def submit_job(self, script_filename): - """ Sumbits job and handles lsf exception """ + self.popen_shell = True piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] return self._call(piped_cmd) + def kill_jobs(self, jobs): + self.popen_shell = False + return self._call([self.cancel_command] + list(jobs)) + def lsf_format_bytes_ceil(n): """ Format bytes as text LSF expects megabytes >>> lsf_format_bytes_ceil(1234567890) - '1178' + '1235' """ - return '%d' % math.ceil(n / (1024**2)) + return '%d' % math.ceil(n / (1000**2)) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 22456fa0..b5d99516 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -15,7 +15,7 @@ def test_header(): assert '#BSUB' in cluster.job_header assert '#BSUB -J dask-worker' in cluster.job_header assert '#BSUB -n 8' in cluster.job_header - assert '#BSUB -M 7630' in cluster.job_header + assert '#BSUB -M 8000' in cluster.job_header assert '#BSUB -W 00:02' in cluster.job_header assert '#BSUB -q' not in cluster.job_header assert '#BSUB -P' not in cluster.job_header @@ -27,8 +27,8 @@ def test_header(): assert '#BSUB -J dask-worker' in cluster.job_header assert '#BSUB -n 24' in cluster.job_header assert '#BSUB -n 8' not in cluster.job_header - assert '#BSUB -M 95368' in cluster.job_header - assert '#BSUB -M 26703' not in cluster.job_header + assert '#BSUB -M 100000' in cluster.job_header + assert '#BSUB -M 28000' not in cluster.job_header assert '#BSUB -W' in cluster.job_header assert '#BSUB -P DaskOnLSF' in cluster.job_header @@ -59,7 +59,7 @@ def test_job_script(): assert '#BSUB' in job_script assert '#BSUB -J dask-worker' in job_script assert '#BSUB -n 8' in job_script - assert '#BSUB -M 7630' in job_script + assert '#BSUB -M 8000' in job_script assert '#BSUB -W 00:02' in job_script assert '#BSUB -q' not in cluster.job_header assert '#BSUB -P' not in cluster.job_header @@ -75,8 +75,8 @@ def test_job_script(): assert '#BSUB -J dask-worker' in cluster.job_header assert '#BSUB -n 24' in cluster.job_header assert '#BSUB -n 8' not in cluster.job_header - assert '#BSUB -M 95368' in cluster.job_header - assert '#BSUB -M 26703' not in cluster.job_header + assert '#BSUB -M 100000' in cluster.job_header + assert '#BSUB -M 28000' not in cluster.job_header assert '#BSUB -W' in cluster.job_header assert '#BSUB -P DaskOnLSF' in cluster.job_header From 5740d451c54886f13606eb668963ab5dd05cf28a Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 17 Jul 2018 14:48:57 -0400 Subject: [PATCH 15/22] rebase --- dask_jobqueue/core.py | 4 +- dask_jobqueue/lsf.py | 11 +++-- dask_jobqueue/sge.py | 3 +- dask_jobqueue/tests/test_lsf.py | 73 +++++++++++++++++++++++++-------- 4 files changed, 66 insertions(+), 25 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 6e868d5f..82f91629 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -54,6 +54,7 @@ def add_worker(self, scheduler, worker=None, name=None, **kwargs): logger.debug("adding worker %s" % worker) w = scheduler.workers[worker] job_id = _job_id_from_worker_name(w.name) + logger.debug("job id for new worker: %s" % job_id) self.all_workers[worker] = (w.name, job_id) @@ -346,9 +347,6 @@ def _call(self, cmd): """ Singular version of _calls """ return self._calls([cmd])[0] - def kill_jobs(self, jobs): - return self._call([self.cancel_command] + list(jobs)) - def stop_workers(self, workers): """ Stop a list of workers""" logger.debug("Stopping workers: %s" % workers) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index d83cc740..073782cf 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -28,7 +28,7 @@ class LSFCluster(JobQueueCluster): walltime : str Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. job_extra : list - List of other LSF options, for example -u. Each option witll be + List of other LSF options, for example -u. Each option will be prepended with the #LSF prefix. %(JobQueueCluster.parameters)s @@ -103,6 +103,7 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, if walltime is not None: header_lines.append('#BSUB -W %s' % walltime) header_lines.extend(['#BSUB %s' % arg for arg in job_extra]) + header_lines.append('JOB_ID=${LSF_JOBID%.*}') # Declare class attribute that shall be overriden self.job_header = '\n'.join(header_lines) @@ -117,9 +118,13 @@ def submit_job(self, script_filename): piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] return self._call(piped_cmd) - def kill_jobs(self, jobs): + def stop_jobs(self, jobs): + """ Stop a list of jobs""" self.popen_shell = False - return self._call([self.cancel_command] + list(jobs)) + logger.debug("Stopping jobs: %s" % jobs) + if jobs: + jobs = list(jobs) + self._call([self.cancel_command] + list(set(jobs))) def lsf_format_bytes_ceil(n): diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 17985857..1c49dfe7 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -58,8 +58,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, super(SGECluster, self).__init__(**kwargs) - header_lines = ['#!/bin/bash'] - # SGE header build + header_lines = ['#!/usr/bin/env bash'] if self.name is not None: header_lines.append('#$ -N %(name)s') if queue is not None: diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index b5d99516..f8c5afb8 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -2,12 +2,14 @@ from time import sleep, time import dask +import pytest from dask.distributed import Client from distributed.utils_test import loop # noqa: F401 -import pytest from dask_jobqueue import LSFCluster +from . import QUEUE_WAIT + def test_header(): with LSFCluster(walltime='00:02', processes=4, cores=8, memory='8GB') as cluster: @@ -19,6 +21,7 @@ def test_header(): assert '#BSUB -W 00:02' in cluster.job_header assert '#BSUB -q' not in cluster.job_header assert '#BSUB -P' not in cluster.job_header + assert '--name dask-worker--${JOB_ID}--' in cluster.job_script() with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8, memory='28GB', ncpus=24, mem=100000000000) as cluster: @@ -53,19 +56,19 @@ def test_header(): def test_job_script(): with LSFCluster(walltime='00:02', processes=4, cores=8, - memory='8GB') as cluster: + memory='28GB') as cluster: job_script = cluster.job_script() assert '#BSUB' in job_script assert '#BSUB -J dask-worker' in job_script assert '#BSUB -n 8' in job_script - assert '#BSUB -M 8000' in job_script + assert '#BSUB -M 28000' in job_script assert '#BSUB -W 00:02' in job_script assert '#BSUB -q' not in cluster.job_header assert '#BSUB -P' not in cluster.job_header assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script - assert '--nthreads 2 --nprocs 4 --memory-limit 2.00GB' in job_script + assert '--nthreads 2 --nprocs 4 --memory-limit 7.00GB' in job_script with LSFCluster(queue='general', project='DaskOnLSF', processes=4, cores=8, memory='28GB', ncpus=24, mem=100000000000) as cluster: @@ -88,26 +91,26 @@ def test_job_script(): def test_basic(loop): with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB', local_directory='/tmp', loop=loop) as cluster: - with Client(cluster) as client: - workers = cluster.start_workers(2) + cluster.start_workers(2) + assert cluster.pending_jobs or cluster.running_jobs future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - assert cluster.jobs + assert future.result(QUEUE_WAIT) == 11 + assert cluster.running_jobs - info = client.scheduler_info() - w = list(info['workers'].values())[0] + workers = list(client.scheduler_info()['workers'].values()) + w = workers[0] assert w['memory_limit'] == 2e9 assert w['ncores'] == 2 cluster.stop_workers(workers) start = time() - while len(client.scheduler_info()['workers']) > 0: + while client.scheduler_info()['workers']: sleep(0.100) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT - assert not cluster.jobs + assert not cluster.running_jobs @pytest.mark.env("lsf") # noqa: F811 @@ -117,22 +120,58 @@ def test_adaptive(loop): cluster.adapt() with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) - assert future.result(60) == 11 - assert cluster.jobs + start = time() + while not (cluster.pending_jobs or cluster.running_jobs): + sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert future.result(QUEUE_WAIT) == 11 start = time() processes = cluster.worker_processes while len(client.scheduler_info()['workers']) != processes: sleep(0.1) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT del future start = time() while len(client.scheduler_info()['workers']) > 0: sleep(0.100) - assert time() < start + 10 + assert time() < start + QUEUE_WAIT + + start = time() + while cluster.pending_jobs or cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + assert cluster.finished_jobs + + +@pytest.mark.env("lsf") # noqa: F811 +def test_adaptive_grouped(loop): + with LSFCluster(walltime='00:02', processes=1, cores=2, memory='2GB', + local_directory='/tmp', loop=loop) as cluster: + cluster.adapt(minimum=1) # at least 1 worker + with Client(cluster) as client: + start = time() + while not (cluster.pending_jobs or cluster.running_jobs): + sleep(0.100) + assert time() < start + QUEUE_WAIT + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + + start = time() + while not cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + + start = time() + processes = cluster.worker_processes + while len(client.scheduler_info()['workers']) != processes: + sleep(0.1) + assert time() < start + QUEUE_WAIT def test_config(loop): # noqa: F811 From a56c616f7edb08f38dd0b2042c1b91a359ec62f2 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 17 Jul 2018 15:02:01 -0400 Subject: [PATCH 16/22] remove empty lines --- dask_jobqueue/core.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 82f91629..032929dd 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -54,7 +54,6 @@ def add_worker(self, scheduler, worker=None, name=None, **kwargs): logger.debug("adding worker %s" % worker) w = scheduler.workers[worker] job_id = _job_id_from_worker_name(w.name) - logger.debug("job id for new worker: %s" % job_id) self.all_workers[worker] = (w.name, job_id) @@ -120,9 +119,6 @@ class JobQueueCluster(Cluster): cancel_command: str Abstract attribute for job scheduler cancel command, should be overriden - scheduler_name: str - Abstract attribute for job scheduler name, - should be overriden See Also -------- From 2a61e9ee42278aad12c0372627490319a9dc3374 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 17 Jul 2018 15:21:38 -0400 Subject: [PATCH 17/22] rename JOB_ID var to LSB_JOBID --- dask_jobqueue/lsf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 073782cf..eb221fee 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -103,7 +103,7 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, if walltime is not None: header_lines.append('#BSUB -W %s' % walltime) header_lines.extend(['#BSUB %s' % arg for arg in job_extra]) - header_lines.append('JOB_ID=${LSF_JOBID%.*}') + header_lines.append('JOB_ID=${LSB_JOBID%.*}') # Declare class attribute that shall be overriden self.job_header = '\n'.join(header_lines) From c1f85fd27b76d59687bc42f971cc3c5509d97833 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 17 Jul 2018 22:14:04 -0400 Subject: [PATCH 18/22] supress Job is submitted to project. --- dask_jobqueue/core.py | 7 +++++-- dask_jobqueue/lsf.py | 10 +++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 032929dd..c8a912ea 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -301,6 +301,10 @@ def scheduler(self): """ The scheduler of this cluster """ return self.local_cluster.scheduler + def _error_capture(self, err): + if err: + logger.error(err.decode()) + def _calls(self, cmds): """ Call a command using subprocess.communicate @@ -334,8 +338,7 @@ def _calls(self, cmds): result = [] for proc in procs: out, err = proc.communicate() - if err: - logger.error(err.decode()) + self._error_capture(err) result.append(out) return result diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index eb221fee..1a98a69b 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -114,18 +114,26 @@ def _job_id_from_submit_output(self, out): return out.split('<')[1].split('>')[0].strip() def submit_job(self, script_filename): + """ set `self.popen_shell = True` and redirect the `script_filename` to bsub """ self.popen_shell = True piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] return self._call(piped_cmd) def stop_jobs(self, jobs): - """ Stop a list of jobs""" + """ set `self.popen_shell = False` """ self.popen_shell = False logger.debug("Stopping jobs: %s" % jobs) if jobs: jobs = list(jobs) self._call([self.cancel_command] + list(set(jobs))) + def _error_capture(self, err): + """ supress `Job is submitted to project.` """ + if err.decode("utf-8").split()[0:4] == ['Job', 'is', 'submitted', 'to']: + err = False + if err: + logger.error(err.decode()) + def lsf_format_bytes_ceil(n): """ Format bytes as text From 8b872b34854e330762773babf0b75b45e81701bd Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Thu, 19 Jul 2018 22:46:50 -0400 Subject: [PATCH 19/22] suppress Job is submitted to project in bsub command --- dask_jobqueue/core.py | 7 ++----- dask_jobqueue/lsf.py | 16 +++++++--------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c8a912ea..032929dd 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -301,10 +301,6 @@ def scheduler(self): """ The scheduler of this cluster """ return self.local_cluster.scheduler - def _error_capture(self, err): - if err: - logger.error(err.decode()) - def _calls(self, cmds): """ Call a command using subprocess.communicate @@ -338,7 +334,8 @@ def _calls(self, cmds): result = [] for proc in procs: out, err = proc.communicate() - self._error_capture(err) + if err: + logger.error(err.decode()) result.append(out) return result diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 1a98a69b..ef7e6e84 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -114,9 +114,14 @@ def _job_id_from_submit_output(self, out): return out.split('<')[1].split('>')[0].strip() def submit_job(self, script_filename): - """ set `self.popen_shell = True` and redirect the `script_filename` to bsub """ + """ set `self.popen_shell = True` and redirect the `script_filename` to bsub. + Output of bsub normally looks like: + `Job is submitted to project.` which is the stderr and + `Job is submitted to (default) queue .` which is the stdout. + Supress the stderr by redirecting it to nowhere. + The `piped_cmd` looks like ['bsub < tmp.sh 2> /dev/null'] """ self.popen_shell = True - piped_cmd = [self.submit_command + '<\"' + script_filename + '\"'] + piped_cmd = [self.submit_command+' < '+script_filename+' 2> /dev/null'] return self._call(piped_cmd) def stop_jobs(self, jobs): @@ -127,13 +132,6 @@ def stop_jobs(self, jobs): jobs = list(jobs) self._call([self.cancel_command] + list(set(jobs))) - def _error_capture(self, err): - """ supress `Job is submitted to project.` """ - if err.decode("utf-8").split()[0:4] == ['Job', 'is', 'submitted', 'to']: - err = False - if err: - logger.error(err.decode()) - def lsf_format_bytes_ceil(n): """ Format bytes as text From fc29249976586ad45d375f1c6b10881d3cbbcd5d Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Fri, 27 Jul 2018 15:48:42 -0400 Subject: [PATCH 20/22] Cealn up. Use **kwargs in _call --- dask_jobqueue/core.py | 16 +++++++--------- dask_jobqueue/lsf.py | 39 +++++++++++++++++---------------------- 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 032929dd..9cf3fada 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -143,8 +143,6 @@ class JobQueueCluster(Cluster): _adaptive_options = { 'worker_key': lambda ws: _job_id_from_worker_name(ws.name)} - popen_shell = False - def __init__(self, name=None, cores=None, @@ -282,7 +280,7 @@ def job_file(self): f.write(self.job_script()) yield fn - def submit_job(self, script_filename): + def _submit_job(self, script_filename): return self._call(shlex.split(self.submit_command) + [script_filename]) def start_workers(self, n=1): @@ -291,7 +289,7 @@ def start_workers(self, n=1): num_jobs = math.ceil(n / self.worker_processes) for _ in range(num_jobs): with self.job_file() as fn: - out = self.submit_job(fn) + out = self._submit_job(fn) job = self._job_id_from_submit_output(out.decode()) logger.debug("started job: %s" % job) self.pending_jobs[job] = {} @@ -301,7 +299,7 @@ def scheduler(self): """ The scheduler of this cluster """ return self.local_cluster.scheduler - def _calls(self, cmds): + def _calls(self, cmds, **kwargs): """ Call a command using subprocess.communicate This centralzies calls out to the command line, providing consistent @@ -327,9 +325,9 @@ def _calls(self, cmds): for cmd in cmds: logger.debug(' '.join(cmd)) procs.append(subprocess.Popen(cmd, - shell=self.popen_shell, stdout=subprocess.PIPE, - stderr=subprocess.PIPE)) + stderr=subprocess.PIPE, + **kwargs)) result = [] for proc in procs: @@ -339,9 +337,9 @@ def _calls(self, cmds): result.append(out) return result - def _call(self, cmd): + def _call(self, cmd, **kwargs): """ Singular version of _calls """ - return self._calls([cmd])[0] + return self._calls([cmd], **kwargs)[0] def stop_workers(self, workers): """ Stop a list of workers""" diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index ef7e6e84..07272c27 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -2,7 +2,6 @@ import logging import math -import os import dask @@ -48,15 +47,10 @@ class LSFCluster(JobQueueCluster): """, 4) # Override class variables - submit_command = 'bsub' + submit_command = 'bsub <' cancel_command = 'bkill' scheduler_name = 'lsf' - # Required for excuting commands through the shell in the subprocess module. - # It handles the shell input redirection e.g. bsub < script_filename.sh - # and does not consider '<' as a command, file or directory. - popen_shell = True - def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, **kwargs): if queue is None: @@ -66,7 +60,7 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, if ncpus is None: ncpus = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name) if mem is None: - memory = dask.config.get('jobqueue.%s.mem' % self.scheduler_name) + mem = dask.config.get('jobqueue.%s.mem' % self.scheduler_name) if walltime is None: walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name) if job_extra is None: @@ -113,29 +107,30 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, def _job_id_from_submit_output(self, out): return out.split('<')[1].split('>')[0].strip() - def submit_job(self, script_filename): - """ set `self.popen_shell = True` and redirect the `script_filename` to bsub. - Output of bsub normally looks like: - `Job is submitted to project.` which is the stderr and - `Job is submitted to (default) queue .` which is the stdout. - Supress the stderr by redirecting it to nowhere. - The `piped_cmd` looks like ['bsub < tmp.sh 2> /dev/null'] """ - self.popen_shell = True - piped_cmd = [self.submit_command+' < '+script_filename+' 2> /dev/null'] - return self._call(piped_cmd) + def _submit_job(self, script_filename): + piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null'] + return self._call(piped_cmd, shell=True) def stop_jobs(self, jobs): - """ set `self.popen_shell = False` """ - self.popen_shell = False + """ Stop a list of jobs""" logger.debug("Stopping jobs: %s" % jobs) if jobs: jobs = list(jobs) - self._call([self.cancel_command] + list(set(jobs))) + self._call([self.cancel_command] + list(set(jobs)), shell=False) def lsf_format_bytes_ceil(n): """ Format bytes as text - LSF expects megabytes + + Convert bytes to megabytes which LSF requires. + + Parameters + ---------- + n: int + Bytes + + Examples + -------- >>> lsf_format_bytes_ceil(1234567890) '1235' """ From ccc61cb0a03e89ce02039113172562383c722702 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 31 Jul 2018 13:27:47 -0400 Subject: [PATCH 21/22] rm stop_jobs for lsy.py --- dask_jobqueue/lsf.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 07272c27..7bc5a319 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -111,13 +111,6 @@ def _submit_job(self, script_filename): piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null'] return self._call(piped_cmd, shell=True) - def stop_jobs(self, jobs): - """ Stop a list of jobs""" - logger.debug("Stopping jobs: %s" % jobs) - if jobs: - jobs = list(jobs) - self._call([self.cancel_command] + list(set(jobs)), shell=False) - def lsf_format_bytes_ceil(n): """ Format bytes as text From 22cbb6027f6c9358c847aea120fe7a21b5e70e38 Mon Sep 17 00:00:00 2001 From: raybellwaves Date: Tue, 31 Jul 2018 23:12:00 -0400 Subject: [PATCH 22/22] updates docs with lsf --- dask_jobqueue/lsf.py | 5 +++-- docs/api.rst | 3 ++- docs/examples.rst | 13 +++++++++++++ docs/index.rst | 2 +- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 7bc5a319..c83eb4aa 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -34,8 +34,9 @@ class LSFCluster(JobQueueCluster): Examples -------- >>> from dask_jobqueue import LSFCluster - >>> cluster = LSFcluster(queue='general', project='DaskonLSF') - >>> cluster.scale(10) # this may take a few seconds to launch + >>> cluster = LSFcluster(queue='general', project='DaskonLSF', + ... cores=15, memory='25GB') + >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client >>> client = Client(cluster) diff --git a/docs/api.rst b/docs/api.rst index 572f83b6..a1df4b1e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -9,7 +9,8 @@ API .. autosummary:: :toctree: generated/ + LSFCluster MoabCluster PBSCluster - SLURMCluster SGECluster + SLURMCluster diff --git a/docs/examples.rst b/docs/examples.rst index eea59b8c..d450ca98 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -63,6 +63,19 @@ On systems which use SGE as the scheduler, ```SGECluster`` can be used: processes=10, memory='20GB') +LSF Deployments +--------------- + +.. code-block:: python + + from dask_jobqueue import LSFCluster + + cluster = LSFCluster(queue='general', + project='cpp' + walltime='00:30', + cores=15, + memory='25GB') + SLURM Deployments ----------------- diff --git a/docs/index.rst b/docs/index.rst index 47497b0e..ca3cf414 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,7 +1,7 @@ Dask-Jobqueue ============= -*Easily deploy Dask on job queuing systems like PBS, Slurm, MOAB, and SGE.* +*Easily deploy Dask on job queuing systems like PBS, Slurm, MOAB, SGE, and LSF.* The Dask-jobqueue project makes it easy to deploy Dask on common job queuing