From 966d8e171f4e531c9e003e80f45cf985f595b892 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Thu, 30 Aug 2018 09:08:13 +0000 Subject: [PATCH 1/4] Add a nested folder for storing jobqueue log file, use separate files for each job with LSF and SLURM --- dask_jobqueue/core.py | 11 ++++++++++- dask_jobqueue/jobqueue.yaml | 6 ++++++ dask_jobqueue/lsf.py | 6 +++--- dask_jobqueue/slurm.py | 4 ++-- dask_jobqueue/tests/test_jobqueue_core.py | 9 +++++++++ 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c5625327..729d437c 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -2,6 +2,7 @@ import logging import math +import os import re import shlex import subprocess @@ -151,6 +152,7 @@ def __init__(self, local_directory=None, extra=None, env_extra=None, + log_directory=None, walltime=None, threads=None, **kwargs @@ -183,6 +185,8 @@ def __init__(self, extra = dask.config.get('jobqueue.%s.extra' % self.scheduler_name) if env_extra is None: env_extra = dask.config.get('jobqueue.%s.env-extra' % self.scheduler_name) + if log_directory is None: + log_directory = dask.config.get('jobqueue.%s.log-directory' % self.scheduler_name) if dask.config.get('jobqueue.%s.threads', None): warnings.warn(threads_deprecation_message) @@ -240,6 +244,10 @@ def __init__(self, self._command_template += " --local-directory %s" % local_directory if extra is not None: self._command_template += extra + if log_directory is not None: + self.log_directory = log_directory + if not os.path.exists(self.log_directory): + os.mkdir(self.log_directory) def __repr__(self): running_workers = sum(len(value) for value in self.running_jobs.values()) @@ -290,7 +298,8 @@ def job_file(self): yield fn def _submit_job(self, script_filename): - return self._call(shlex.split(self.submit_command) + [script_filename]) + return self._call(shlex.split(self.submit_command) + [script_filename], + cwd=self.log_directory) def start_workers(self, n=1): """ Start workers and point them to our local scheduler """ diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index fbb960c3..5d37f2fe 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -19,6 +19,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + log-directory: 'dask-jobqueue-logs' pbs: name: dask-worker @@ -40,6 +41,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + log-directory: 'dask-jobqueue-logs' sge: name: dask-worker @@ -59,6 +61,7 @@ jobqueue: walltime: '00:30:00' extra: "" env-extra: [] + log-directory: 'dask-jobqueue-logs' resource-spec: null @@ -83,6 +86,7 @@ jobqueue: job-cpu: null job-mem: null job-extra: {} + log-directory: 'dask-jobqueue-logs' moab: name: dask-worker @@ -104,6 +108,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + log-directory: 'dask-jobqueue-logs' lsf: name: dask-worker @@ -126,3 +131,4 @@ jobqueue: ncpus: null mem: null job-extra: [] + log-directory: 'dask-jobqueue-logs' diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 7e2cc210..f77968f0 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -73,8 +73,8 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None # LSF header build if self.name is not None: header_lines.append('#BSUB -J %s' % self.name) - header_lines.append('#BSUB -e %s.err' % self.name) - header_lines.append('#BSUB -o %s.out' % self.name) + header_lines.append('#BSUB -e %s-%%J.err' % self.name) + header_lines.append('#BSUB -o %s-%%J.out' % self.name) if queue is not None: header_lines.append('#BSUB -q %s' % queue) if project is not None: @@ -104,7 +104,7 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None def _submit_job(self, script_filename): piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null'] - return self._call(piped_cmd, shell=True) + return self._call(piped_cmd, shell=True, cwd=self.log_directory) def lsf_format_bytes_ceil(n): diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 29dc2352..30c1ee80 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -73,8 +73,8 @@ def __init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_me # SLURM header build if self.name is not None: header_lines.append('#SBATCH -J %s' % self.name) - header_lines.append('#SBATCH -e %s.err' % self.name) - header_lines.append('#SBATCH -o %s.out' % self.name) + header_lines.append('#SBATCH -e %s-%%j.err' % self.name) + header_lines.append('#SBATCH -o %s-%%j.out' % self.name) if queue is not None: header_lines.append('#SBATCH -p %s' % queue) if project is not None: diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index bc0c9260..4d47a846 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -1,6 +1,8 @@ from __future__ import absolute_import, division, print_function +import os import pytest +import shutil import socket from dask_jobqueue import (JobQueueCluster, PBSCluster, MoabCluster, @@ -79,3 +81,10 @@ def test_job_id_error_handling(Cluster): return_string = 'Job <12345> submited to .' cluster.job_id_regexp = r'(\d+)' cluster._job_id_from_submit_output(return_string) + + +def test_log_directory(): + shutil.rmtree('log-folder', ignore_errors=True) + with PBSCluster(cores=1, memory='1GB', + log_directory='log-folder') as cluster: + assert os.path.exists('log-folder') From 5335ea389770f929ed0bfefc1af79e8fb5bce00d Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Wed, 5 Sep 2018 14:22:30 +0000 Subject: [PATCH 2/4] Use -e and -o for log folder instead of Popen(cwd) --- dask_jobqueue/core.py | 10 +++++----- dask_jobqueue/jobqueue.yaml | 12 ++++++------ dask_jobqueue/lsf.py | 7 +++++-- dask_jobqueue/pbs.py | 3 +++ dask_jobqueue/sge.py | 6 +++++- dask_jobqueue/slurm.py | 7 +++++-- dask_jobqueue/tests/test_jobqueue_core.py | 11 +++++++---- 7 files changed, 36 insertions(+), 20 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 729d437c..c2f2b1b2 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -244,10 +244,11 @@ def __init__(self, self._command_template += " --local-directory %s" % local_directory if extra is not None: self._command_template += extra - if log_directory is not None: - self.log_directory = log_directory + + self.log_directory = log_directory + if self.log_directory is not None: if not os.path.exists(self.log_directory): - os.mkdir(self.log_directory) + os.makedirs(self.log_directory) def __repr__(self): running_workers = sum(len(value) for value in self.running_jobs.values()) @@ -298,8 +299,7 @@ def job_file(self): yield fn def _submit_job(self, script_filename): - return self._call(shlex.split(self.submit_command) + [script_filename], - cwd=self.log_directory) + return self._call(shlex.split(self.submit_command) + [script_filename]) def start_workers(self, n=1): """ Start workers and point them to our local scheduler """ diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index 5d37f2fe..0e93db88 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -19,7 +19,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] - log-directory: 'dask-jobqueue-logs' + log-directory: null pbs: name: dask-worker @@ -41,7 +41,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] - log-directory: 'dask-jobqueue-logs' + log-directory: null sge: name: dask-worker @@ -61,7 +61,7 @@ jobqueue: walltime: '00:30:00' extra: "" env-extra: [] - log-directory: 'dask-jobqueue-logs' + log-directory: null resource-spec: null @@ -86,7 +86,7 @@ jobqueue: job-cpu: null job-mem: null job-extra: {} - log-directory: 'dask-jobqueue-logs' + log-directory: null moab: name: dask-worker @@ -108,7 +108,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] - log-directory: 'dask-jobqueue-logs' + log-directory: null lsf: name: dask-worker @@ -131,4 +131,4 @@ jobqueue: ncpus: null mem: null job-extra: [] - log-directory: 'dask-jobqueue-logs' + log-directory: null diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index f77968f0..2425bf5a 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -73,8 +73,11 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None # LSF header build if self.name is not None: header_lines.append('#BSUB -J %s' % self.name) - header_lines.append('#BSUB -e %s-%%J.err' % self.name) - header_lines.append('#BSUB -o %s-%%J.out' % self.name) + if self.log_directory is not None: + header_lines.append('#BSUB -e %s/%s-%%J.err' % + (self.log_directory, self.name or 'worker')) + header_lines.append('#BSUB -o %s/%s-%%J.out' % + (self.log_directory, self.name or 'worker')) if queue is not None: header_lines.append('#BSUB -q %s' % queue) if project is not None: diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 083b2c01..bc64702d 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -93,6 +93,9 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, header_lines.append('#PBS -l %s' % resource_spec) if walltime is not None: header_lines.append('#PBS -l walltime=%s' % walltime) + if self.log_directory is not None: + header_lines.append('#PBS -e %s/' % self.log_directory) + header_lines.append('#PBS -o %s/' % self.log_directory) header_lines.extend(['#PBS %s' % arg for arg in job_extra]) header_lines.append('JOB_ID=${PBS_JOBID%.*}') diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 1346b972..cc7164a4 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -66,6 +66,9 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, header_lines.append('#$ -l %(resource_spec)s') if walltime is not None: header_lines.append('#$ -l h_rt=%(walltime)s') + if self.log_directory is not None: + header_lines.append('#$ -e %(log_directory)s/') + header_lines.append('#$ -o %(log_directory)s/') header_lines.extend(['#$ -cwd', '#$ -j y']) header_template = '\n'.join(header_lines) @@ -74,7 +77,8 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, 'project': project, 'processes': self.worker_processes, 'walltime': walltime, - 'resource_spec': resource_spec} + 'resource_spec': resource_spec, + 'log_directory': self.log_directory} self.job_header = header_template % config logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 30c1ee80..7e0dcb90 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -73,8 +73,11 @@ def __init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_me # SLURM header build if self.name is not None: header_lines.append('#SBATCH -J %s' % self.name) - header_lines.append('#SBATCH -e %s-%%j.err' % self.name) - header_lines.append('#SBATCH -o %s-%%j.out' % self.name) + if self.log_directory is not None: + header_lines.append('#SBATCH -e %s/%s-%%J.err' % + (self.log_directory, self.name or 'worker')) + header_lines.append('#SBATCH -o %s/%s-%%J.out' % + (self.log_directory, self.name or 'worker')) if queue is not None: header_lines.append('#SBATCH -p %s' % queue) if project is not None: diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 4d47a846..e76c5bfa 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -83,8 +83,11 @@ def test_job_id_error_handling(Cluster): cluster._job_id_from_submit_output(return_string) -def test_log_directory(): - shutil.rmtree('log-folder', ignore_errors=True) +def test_log_directory(tmpdir): + shutil.rmtree(tmpdir.strpath, ignore_errors=True) + with PBSCluster(cores=1, memory='1GB'): + assert not os.path.exists(tmpdir.strpath) + with PBSCluster(cores=1, memory='1GB', - log_directory='log-folder') as cluster: - assert os.path.exists('log-folder') + log_directory=tmpdir.strpath): + assert os.path.exists(tmpdir.strpath) From 3710d36be275c07164f1b18c06be1bfce96c9a4a Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 16 Oct 2018 10:02:51 +0200 Subject: [PATCH 3/4] small fix on remaining lsf cwd arg --- dask_jobqueue/lsf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 2425bf5a..2e22325e 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -107,7 +107,7 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None def _submit_job(self, script_filename): piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null'] - return self._call(piped_cmd, shell=True, cwd=self.log_directory) + return self._call(piped_cmd, shell=True) def lsf_format_bytes_ceil(n): From a2760084ecb5c65bd159ea129e0913383e4c0c82 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 16 Oct 2018 10:22:33 +0200 Subject: [PATCH 4/4] Trigger CI