Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Use regex for job-id lookup #131

Merged
merged 25 commits into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f0566c8
Use regex for job-id lookup
willirath Aug 22, 2018
9fb1ca0
Make jobid regex configurable
willirath Aug 22, 2018
06a9f02
Centralize jobid lookup and use same regex for all
willirath Aug 23, 2018
aab9a6e
Fix string formatter
willirath Aug 23, 2018
d67e68e
Promote jobid parsing to function
willirath Aug 23, 2018
c1604c8
Adapt documentation to new location of jobid parsing
willirath Aug 23, 2018
b11e2c9
Fix import for parsing function
willirath Aug 23, 2018
c50fe05
Promote jobid parsing to function
willirath Aug 23, 2018
e904f0a
Test jobid lookup against more example outputs
willirath Aug 23, 2018
4dcb2e1
Make parser a static method
willirath Aug 23, 2018
2602192
Re-word trouble shooting of job_id parser
willirath Aug 24, 2018
f72a7f1
Use method with regexp in class var
willirath Aug 24, 2018
6fda4c9
Fix typo in test
willirath Aug 24, 2018
3f61dd9
Skip Cluster base class and properly init clusters
willirath Aug 24, 2018
9acf92d
Fix parens
willirath Aug 24, 2018
f6240b7
Use full named group in regexp at class level
willirath Aug 24, 2018
f39f116
Handle parsing errors
willirath Aug 24, 2018
4fc5318
Fix regexp to really include pattern
willirath Aug 24, 2018
dcbe350
Test for error handling
willirath Aug 24, 2018
9c83c29
Don't assign job_id in test
willirath Aug 24, 2018
5463a1d
Use more spectific error messages
willirath Aug 27, 2018
23e54fc
Simplify test for error message
willirath Aug 27, 2018
d5a9e57
Fix error message test
willirath Aug 27, 2018
a71c293
Small tweaks
lesteve Aug 28, 2018
cb13919
Use longer match for clarity
lesteve Aug 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import math
import re
import shlex
import subprocess
import sys
Expand Down Expand Up @@ -138,6 +139,7 @@ class JobQueueCluster(Cluster):
cancel_command = None
scheduler_name = ''
_adaptive_options = {'worker_key': lambda ws: _job_id_from_worker_name(ws.name)}
job_id_regexp = r'(?P<job_id>\d+)'

def __init__(self,
name=None,
Expand Down Expand Up @@ -407,5 +409,16 @@ def _del_pending_jobs(self):
return jobs

def _job_id_from_submit_output(self, out):
raise NotImplementedError('_job_id_from_submit_output must be implemented when JobQueueCluster is '
'inherited. It should convert the stdout from submit_command to the job id')
msg = ('Could not parse job id from submission command output. Job id '
'regexp is {}, submission command output is: {}'.format(
self.job_id_regexp, out))

match = re.search(self.job_id_regexp, out)
if match is None:
raise ValueError(msg)

job_id = match.group('job_id')
Copy link
Member

@lesteve lesteve Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use job_id = match.groupdict().get('job_id') to have a better error as I mentioned (probably folded away in an outdated diff).

Thinking about this it would be great to have a better error in the case when there is a match but no job_id named group, i.e. something like:

"You need to use a `job_id` named group in your regexp, e.g. '(?P<job_id>\d+)'. Your regexp was: {}".format(self.job_id_regexp) 

if job_id is None:
raise ValueError(msg)

return job_id
3 changes: 0 additions & 3 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('<')[1].split('>')[0].strip()

def _submit_job(self, script_filename):
piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null']
return self._call(piped_cmd, shell=True)
Expand Down
3 changes: 0 additions & 3 deletions dask_jobqueue/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,3 @@ class MoabCluster(PBSCluster):
submit_command = 'msub'
cancel_command = 'canceljob'
scheduler_name = 'moab'

def _job_id_from_submit_output(self, out):
return out.strip()
3 changes: 0 additions & 3 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('.')[0].strip()


def pbs_format_bytes_ceil(n):
""" Format bytes as text.
Expand Down
3 changes: 0 additions & 3 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,3 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
self.job_header = header_template % config

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.strip()
3 changes: 0 additions & 3 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ def __init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_me

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split(';')[0].strip()


def slurm_format_bytes_ceil(n):
""" Format bytes as text.
Expand Down
47 changes: 47 additions & 0 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,50 @@ def test_forward_ip():
with PBSCluster(walltime='00:02:00', processes=4, cores=8, memory='28GB',
name='dask-worker') as cluster:
assert cluster.local_cluster.scheduler.ip == default_ip


@pytest.mark.parametrize('Cluster', [PBSCluster, MoabCluster, SLURMCluster,
SGECluster, LSFCluster])
@pytest.mark.parametrize(
'qsub_return_string',
['{job_id}.admin01',
'Request {job_id}.asdf was sumbitted to queue: standard.',
'sbatch: Submitted batch job {job_id}',
'{job_id};cluster',
'Job <{job_id}> is submitted to default queue <normal>.',
'{job_id}'])
def test_job_id_from_qsub(Cluster, qsub_return_string):
original_job_id = '654321'
qsub_return_string = qsub_return_string.format(job_id=original_job_id)
with Cluster(walltime='00:02:00', processes=4, cores=8, memory='28GB',
name='dask-worker') as cluster:
assert (original_job_id
== cluster._job_id_from_submit_output(qsub_return_string))


@pytest.mark.parametrize('Cluster', [PBSCluster, MoabCluster, SLURMCluster,
SGECluster, LSFCluster])
@pytest.mark.parametrize(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General remark, for the error cases I would remove this parametrize and just test with something like this:

  • an output without match: e.g. 'there is no number here'
  • a regexp that match but that does not have a job_id named group, e.g. cluster.job_id_regexp = r'\d+' and an output 'Job <12345> is submitted to default queue <normal>.

'qsub_return_string',
['Request {job_id}.asdf was sumbitted to queue: standard.',
'sbatch: Submitted batch job {job_id}',
'{job_id};cluster',
'Job <{job_id}> is submitted to default queue <normal>.',
'{job_id}',
pytest.param('{job_id}.admin01', marks=pytest.mark.xfail)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot you could do something like this with parametrize, nice! Can you explain why it is xfail though?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case {job_id} expands to XXXXX (see https://github.com/dask/dask-jobqueue/pull/131/files/9c83c296c33421c2ec496580b67a81feaa58f6b1#diff-6715b450c58d2e22119574769a68a1d6R92). Then, the 01 in admin01 would be interpreted as the job_id.

def test_job_id_error_handling(Cluster, qsub_return_string):

# test for broken regexp
with Cluster(walltime='00:02:00', processes=4, cores=8, memory='28GB',
name='dask-worker') as cluster:
with pytest.raises(ValueError, match="Could not parse job id"):
cluster.job_id_regexp = r'(?P<job_id>XXXX)'
return_string = qsub_return_string.format(job_id='654321')
cluster._job_id_from_submit_output(return_string)

# test for missing job_id (Will fail for return string w/ number.)
with Cluster(walltime='00:02:00', processes=4, cores=8, memory='28GB',
name='dask-worker') as cluster:
with pytest.raises(ValueError, match="Could not parse job id"):
return_string = qsub_return_string.format(job_id='XXXXX')
cluster._job_id_from_submit_output(return_string)
3 changes: 1 addition & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -285,5 +285,4 @@ problems are the following:
We use submit command stdout to parse the job_id corresponding to the
launched group of worker. If the parsing fails, then dask-jobqueue won't work
as expected and may throw exceptions. You can have a look at the parsing
function in every ``JobQueueCluster`` implementation, see
``_job_id_from_submit_output`` function.
function ``JobQueueCluster._job_id_from_submit_output``.