Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a nested folder for storing jobqueue log file, use separate files… #145

Merged
merged 4 commits into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import math
import os
import re
import shlex
import subprocess
Expand Down Expand Up @@ -151,6 +152,7 @@ def __init__(self,
local_directory=None,
extra=None,
env_extra=None,
log_directory=None,
walltime=None,
threads=None,
**kwargs
Expand Down Expand Up @@ -183,6 +185,8 @@ def __init__(self,
extra = dask.config.get('jobqueue.%s.extra' % self.scheduler_name)
if env_extra is None:
env_extra = dask.config.get('jobqueue.%s.env-extra' % self.scheduler_name)
if log_directory is None:
log_directory = dask.config.get('jobqueue.%s.log-directory' % self.scheduler_name)

if dask.config.get('jobqueue.%s.threads', None):
warnings.warn(threads_deprecation_message)
Expand Down Expand Up @@ -241,6 +245,11 @@ def __init__(self,
if extra is not None:
self._command_template += extra

self.log_directory = log_directory
if self.log_directory is not None:
if not os.path.exists(self.log_directory):
os.makedirs(self.log_directory)

def __repr__(self):
running_workers = sum(len(value) for value in self.running_jobs.values())
running_cores = running_workers * self.worker_threads
Expand Down
6 changes: 6 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobqueue:
env-extra: []
resource-spec: null
job-extra: []
log-directory: null

pbs:
name: dask-worker
Expand All @@ -40,6 +41,7 @@ jobqueue:
env-extra: []
resource-spec: null
job-extra: []
log-directory: null

sge:
name: dask-worker
Expand All @@ -59,6 +61,7 @@ jobqueue:
walltime: '00:30:00'
extra: ""
env-extra: []
log-directory: null

resource-spec: null

Expand All @@ -83,6 +86,7 @@ jobqueue:
job-cpu: null
job-mem: null
job-extra: {}
log-directory: null

moab:
name: dask-worker
Expand All @@ -104,6 +108,7 @@ jobqueue:
env-extra: []
resource-spec: null
job-extra: []
log-directory: null

lsf:
name: dask-worker
Expand All @@ -126,3 +131,4 @@ jobqueue:
ncpus: null
mem: null
job-extra: []
log-directory: null
7 changes: 5 additions & 2 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None
# LSF header build
if self.name is not None:
header_lines.append('#BSUB -J %s' % self.name)
header_lines.append('#BSUB -e %s.err' % self.name)
header_lines.append('#BSUB -o %s.out' % self.name)
if self.log_directory is not None:
header_lines.append('#BSUB -e %s/%s-%%J.err' %
(self.log_directory, self.name or 'worker'))
header_lines.append('#BSUB -o %s/%s-%%J.out' %
(self.log_directory, self.name or 'worker'))
if queue is not None:
header_lines.append('#BSUB -q %s' % queue)
if project is not None:
Expand Down
3 changes: 3 additions & 0 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
header_lines.append('#PBS -l %s' % resource_spec)
if walltime is not None:
header_lines.append('#PBS -l walltime=%s' % walltime)
if self.log_directory is not None:
header_lines.append('#PBS -e %s/' % self.log_directory)
header_lines.append('#PBS -o %s/' % self.log_directory)
header_lines.extend(['#PBS %s' % arg for arg in job_extra])
header_lines.append('JOB_ID=${PBS_JOBID%.*}')

Expand Down
6 changes: 5 additions & 1 deletion dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
header_lines.append('#$ -l %(resource_spec)s')
if walltime is not None:
header_lines.append('#$ -l h_rt=%(walltime)s')
if self.log_directory is not None:
header_lines.append('#$ -e %(log_directory)s/')
header_lines.append('#$ -o %(log_directory)s/')
header_lines.extend(['#$ -cwd', '#$ -j y'])
header_template = '\n'.join(header_lines)

Expand All @@ -74,7 +77,8 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
'project': project,
'processes': self.worker_processes,
'walltime': walltime,
'resource_spec': resource_spec}
'resource_spec': resource_spec,
'log_directory': self.log_directory}
self.job_header = header_template % config

logger.debug("Job script: \n %s" % self.job_script())
7 changes: 5 additions & 2 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ def __init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_me
# SLURM header build
if self.name is not None:
header_lines.append('#SBATCH -J %s' % self.name)
header_lines.append('#SBATCH -e %s.err' % self.name)
header_lines.append('#SBATCH -o %s.out' % self.name)
if self.log_directory is not None:
header_lines.append('#SBATCH -e %s/%s-%%J.err' %
(self.log_directory, self.name or 'worker'))
header_lines.append('#SBATCH -o %s/%s-%%J.out' %
(self.log_directory, self.name or 'worker'))
if queue is not None:
header_lines.append('#SBATCH -p %s' % queue)
if project is not None:
Expand Down
12 changes: 12 additions & 0 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import absolute_import, division, print_function

import os
import pytest
import shutil
import socket

from dask_jobqueue import (JobQueueCluster, PBSCluster, MoabCluster,
Expand Down Expand Up @@ -79,3 +81,13 @@ def test_job_id_error_handling(Cluster):
return_string = 'Job <12345> submited to <normal>.'
cluster.job_id_regexp = r'(\d+)'
cluster._job_id_from_submit_output(return_string)


def test_log_directory(tmpdir):
shutil.rmtree(tmpdir.strpath, ignore_errors=True)
with PBSCluster(cores=1, memory='1GB'):
assert not os.path.exists(tmpdir.strpath)

with PBSCluster(cores=1, memory='1GB',
log_directory=tmpdir.strpath):
assert os.path.exists(tmpdir.strpath)