Skip to content

Commit

Permalink
Merge pull request #70 from mrocklin/separate-config
Browse files Browse the repository at this point in the history
Separate configuration per resource manager
  • Loading branch information
Joe Hamman authored Jun 26, 2018
2 parents 13016ae + 9203840 commit 4cd24dc
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 67 deletions.
41 changes: 31 additions & 10 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,49 @@ class JobQueueCluster(Cluster):
# Following class attributes should be overriden by extending classes.
submit_command = None
cancel_command = None
scheduler_name = ''

def __init__(self,
name=dask.config.get('jobqueue.name'),
threads=dask.config.get('jobqueue.threads'),
processes=dask.config.get('jobqueue.processes'),
memory=dask.config.get('jobqueue.memory'),
interface=dask.config.get('jobqueue.interface'),
death_timeout=dask.config.get('jobqueue.death-timeout'),
local_directory=dask.config.get('jobqueue.local-directory'),
extra=dask.config.get('jobqueue.extra'),
env_extra=dask.config.get('jobqueue.env-extra'),
name=None,
threads=None,
processes=None,
memory=None,
interface=None,
death_timeout=None,
local_directory=None,
extra=None,
env_extra=None,
walltime=None,
**kwargs
):
""" """
# """
# This initializer should be considered as Abstract, and never used
# directly.
# """
if not self.cancel_command or not self.submit_command:
if not self.scheduler_name:
raise NotImplementedError('JobQueueCluster is an abstract class '
'that should not be instanciated.')

if name is None:
name = dask.config.get('jobqueue.%s.name' % self.scheduler_name)
if threads is None:
threads = dask.config.get('jobqueue.%s.threads' % self.scheduler_name)
if processes is None:
processes = dask.config.get('jobqueue.%s.processes' % self.scheduler_name)
if memory is None:
memory = dask.config.get('jobqueue.%s.memory' % self.scheduler_name)
if interface is None:
interface = dask.config.get('jobqueue.%s.interface' % self.scheduler_name)
if death_timeout is None:
death_timeout = dask.config.get('jobqueue.%s.death-timeout' % self.scheduler_name)
if local_directory is None:
local_directory = dask.config.get('jobqueue.%s.local-directory' % self.scheduler_name)
if extra is None:
extra = dask.config.get('jobqueue.%s.extra' % self.scheduler_name)
if env_extra is None:
env_extra = dask.config.get('jobqueue.%s.env-extra' % self.scheduler_name)

#This attribute should be overriden
self.job_header = None

Expand Down
83 changes: 69 additions & 14 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
@@ -1,26 +1,81 @@
jobqueue:
name: dask-worker
threads: 2
processes: 4
memory: 8GB
interface: null
death-timeout: 60
local-directory: null
extra: ""
env-extra: []

queue: null
project: null
walltime: '00:30:00'

pbs:
name: dask-worker

# Dask worker options
threads: 2
processes: 4
memory: 8GB
interface: null
death-timeout: 60
local-directory: null

# PBS resource manager options
queue: null
project: null
walltime: '00:30:00'
extra: ""
env-extra: []
resource-spec: null
job-extra: []

sge:
name: dask-worker

# Dask worker options
threads: 2
processes: 4
memory: 8GB
interface: null
death-timeout: 60
local-directory: null

# SGE resource manager options
queue: null
project: null
walltime: '00:30:00'
extra: ""
env-extra: []

resource-spec: null

slurm:
name: dask-worker

# Dask worker options
threads: 2
processes: 4
memory: 8GB
interface: null
death-timeout: 60
local-directory: null

# SLURM resource manager options
queue: null
project: null
walltime: '00:30:00'
extra: ""
env-extra: []
job-cpu: null
job-mem: null
job-extra: {}

moab:
name: dask-worker

# Dask worker options
threads: 2
processes: 4
memory: 8GB
interface: null
death-timeout: 60
local-directory: null

# PBS resource manager options
queue: null
project: null
walltime: '00:30:00'
extra: ""
env-extra: []
resource-spec: null
job-extra: []
1 change: 1 addition & 0 deletions dask_jobqueue/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class MoabCluster(PBSCluster):
""", 4)
submit_command = 'msub'
cancel_command = 'canceljob'
scheduler_name = 'moab'

def _job_id_from_submit_output(self, out):
return out.strip()
28 changes: 15 additions & 13 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,31 @@ class PBSCluster(JobQueueCluster):
and threads asked:
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',
local_directory=os.getenv('TMPDIR', '/tmp'),
threads=4, processes=6, memory='16GB',
resource_spec='select=1:ncpus=24:mem=100GB')
... local_directory=os.getenv('TMPDIR', '/tmp'),
... threads=4, processes=6, memory='16GB',
... resource_spec='select=1:ncpus=24:mem=100GB')
""", 4)

# Override class variables
submit_command = 'qsub'
cancel_command = 'qdel'
scheduler_name = 'pbs'

def __init__(self,
queue=dask.config.get('jobqueue.queue'),
project=dask.config.get('jobqueue.project'),
resource_spec=dask.config.get('jobqueue.pbs.resource-spec'),
walltime=dask.config.get('jobqueue.walltime'),
job_extra=dask.config.get('jobqueue.pbs.job-extra'),
**kwargs):
def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
if resource_spec is None:
resource_spec = dask.config.get('jobqueue.%s.resource-spec' % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name) or os.environ.get('PBS_ACCOUNT')

# Instantiate args and parameters from parent abstract class
super(PBSCluster, self).__init__(**kwargs)

# Try to find a project name from environment variable
project = project or os.environ.get('PBS_ACCOUNT')

header_lines = []
# PBS header build
if self.name is not None:
Expand Down
18 changes: 11 additions & 7 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ class SGECluster(JobQueueCluster):
# Override class variables
submit_command = 'qsub -terse'
cancel_command = 'qdel'

def __init__(self,
queue=dask.config.get('jobqueue.queue'),
project=dask.config.get('jobqueue.project'),
resource_spec=dask.config.get('jobqueue.sge.resource-spec'),
walltime=dask.config.get('jobqueue.walltime'),
**kwargs):
scheduler_name = 'sge'

def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name)
if resource_spec is None:
resource_spec = dask.config.get('jobqueue.%s.resource-spec' % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)

super(SGECluster, self).__init__(**kwargs)

Expand Down
25 changes: 16 additions & 9 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,22 @@ class SLURMCluster(JobQueueCluster):
#Override class variables
submit_command = 'sbatch --parsable'
cancel_command = 'scancel'

def __init__(self,
queue=dask.config.get('jobqueue.queue'),
project=dask.config.get('jobqueue.project'),
walltime=dask.config.get('jobqueue.walltime'),
job_cpu=dask.config.get('jobqueue.slurm.job-cpu'),
job_mem=dask.config.get('jobqueue.slurm.job-mem'),
job_extra=dask.config.get('jobqueue.slurm.job-extra'),
**kwargs):
scheduler_name = 'slurm'

def __init__(self, queue=None, project=None, walltime=None,
job_cpu=None, job_mem=None, job_extra=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.slurm.queue')
if project is None:
project = dask.config.get('jobqueue.slurm.project')
if walltime is None:
walltime = dask.config.get('jobqueue.slurm.walltime')
if job_cpu is None:
job_cpu = dask.config.get('jobqueue.slurm.job-cpu')
if job_mem is None:
job_mem = dask.config.get('jobqueue.slurm.job-mem')
if job_extra is None:
job_extra = dask.config.get('jobqueue.slurm.job-extra')

super(SLURMCluster, self).__init__(**kwargs)

Expand Down
39 changes: 25 additions & 14 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import sys
from time import sleep, time

import pytest
import sys
from distributed import Client
import dask
from dask.distributed import Client
from distributed.utils_test import loop # noqa: F401
import pytest

from dask_jobqueue import PBSCluster
from dask_jobqueue import PBSCluster, MoabCluster


def test_header():
with PBSCluster(walltime='00:02:00', processes=4, threads=2, memory='7GB') as cluster:
@pytest.mark.parametrize('Cluster', [PBSCluster, MoabCluster])
def test_header(Cluster):
with Cluster(walltime='00:02:00', processes=4, threads=2, memory='7GB') as cluster:

assert '#PBS' in cluster.job_header
assert '#PBS -N dask-worker' in cluster.job_header
Expand All @@ -18,8 +20,8 @@ def test_header():
assert '#PBS -q' not in cluster.job_header
assert '#PBS -A' not in cluster.job_header

with PBSCluster(queue='regular', project='DaskOnPBS', processes=4, threads=2, memory='7GB',
resource_spec='select=1:ncpus=24:mem=100GB') as cluster:
with Cluster(queue='regular', project='DaskOnPBS', processes=4, threads=2, memory='7GB',
resource_spec='select=1:ncpus=24:mem=100GB') as cluster:

assert '#PBS -q regular' in cluster.job_header
assert '#PBS -N dask-worker' in cluster.job_header
Expand All @@ -28,7 +30,7 @@ def test_header():
assert '#PBS -l walltime=' in cluster.job_header
assert '#PBS -A DaskOnPBS' in cluster.job_header

with PBSCluster() as cluster:
with Cluster() as cluster:

assert '#PBS -j oe' not in cluster.job_header
assert '#PBS -N' in cluster.job_header
Expand All @@ -37,7 +39,7 @@ def test_header():
assert '#PBS -A' not in cluster.job_header
assert '#PBS -q' not in cluster.job_header

with PBSCluster(job_extra=['-j oe']) as cluster:
with Cluster(job_extra=['-j oe']) as cluster:

assert '#PBS -j oe' in cluster.job_header
assert '#PBS -N' in cluster.job_header
Expand All @@ -47,8 +49,9 @@ def test_header():
assert '#PBS -q' not in cluster.job_header


def test_job_script():
with PBSCluster(walltime='00:02:00', processes=4, threads=2, memory='7GB') as cluster:
@pytest.mark.parametrize('Cluster', [PBSCluster, MoabCluster])
def test_job_script(Cluster):
with Cluster(walltime='00:02:00', processes=4, threads=2, memory='7GB') as cluster:

job_script = cluster.job_script()
assert '#PBS' in job_script
Expand All @@ -61,8 +64,8 @@ def test_job_script():
assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7GB' in job_script

with PBSCluster(queue='regular', project='DaskOnPBS', processes=4, threads=2, memory='7GB',
resource_spec='select=1:ncpus=24:mem=100GB') as cluster:
with Cluster(queue='regular', project='DaskOnPBS', processes=4, threads=2, memory='7GB',
resource_spec='select=1:ncpus=24:mem=100GB') as cluster:

job_script = cluster.job_script()
assert '#PBS -q regular' in job_script
Expand Down Expand Up @@ -131,3 +134,11 @@ def test_adaptive(loop):
#while cluster.jobs:
# sleep(0.100)
# assert time() < start + 10


def test_config(loop): # noqa: F811
with dask.config.set({'jobqueue.pbs.walltime': '00:02:00',
'jobqueue.pbs.local-directory': '/foo'}):
with PBSCluster(loop=loop) as cluster:
assert '00:02:00' in cluster.job_script()
assert '--local-directory /foo' in cluster.job_script()

0 comments on commit 4cd24dc

Please sign in to comment.