From b753d1389bc667ac56bd798e27f5735de182aad6 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Thu, 1 Sep 2022 12:21:01 +0400 Subject: [PATCH 1/2] Do ont skip job_extra_directives with header_skip. Rename header_skip to job_directives_skip. --- dask_jobqueue/core.py | 37 ++++++---- dask_jobqueue/htcondor.py | 5 ++ dask_jobqueue/jobqueue.yaml | 8 +++ dask_jobqueue/lsf.py | 10 +++ dask_jobqueue/oar.py | 10 +++ dask_jobqueue/pbs.py | 10 +++ dask_jobqueue/sge.py | 37 +++++----- dask_jobqueue/slurm.py | 10 +++ dask_jobqueue/tests/test_job.py | 92 +++++++++++++++++++++++- docs/source/advanced-tips-and-tricks.rst | 19 ++--- 10 files changed, 198 insertions(+), 40 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 4f97c076..5d085999 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -55,8 +55,11 @@ job_script_prologue : list Other commands to add to script before launching worker. header_skip : list - Lines to skip in the header. - Header lines matching this text will be removed + Deprecated: use ``job_directives_skip`` instead. This parameter will be removed in a future version. + job_directives_skip : list + Directives to skip in the generated job script header. + Directives lines matching this text will be removed. + Directives added trhough job_extra_directives won't be affected. log_directory : str Directory to use for job scheduler logs. shebang : str @@ -169,6 +172,7 @@ def __init__( env_extra=None, job_script_prologue=None, header_skip=None, + job_directives_skip=None, log_directory=None, shebang=None, python=sys.executable, @@ -272,8 +276,25 @@ def __init__( job_script_prologue = env_extra if header_skip is None: header_skip = dask.config.get( - "jobqueue.%s.header-skip" % self.config_name, () + "jobqueue.%s.header-skip" % self.config_name, None ) + if job_directives_skip is None: + job_directives_skip = dask.config.get( + "jobqueue.%s.job-directives-skip" % self.config_name, () + ) + if header_skip is not None: + warn = ( + "header_skip has been renamed to job_directives_skip. " + "You are still using it (even if only set to (); please also check config files). " + "If you did not set job_directives_skip yet, header_skip will be respected for now, " + "but it will be removed in a future release. " + "If you already set job_directives_skip, header_skip is ignored and you can remove it." + ) + warnings.warn(warn, FutureWarning) + if not job_directives_skip: + job_directives_skip = header_skip + self.job_directives_skip = set(job_directives_skip) + if log_directory is None: log_directory = dask.config.get( "jobqueue.%s.log-directory" % self.config_name @@ -309,7 +330,6 @@ def __init__( self.shebang = shebang self._job_script_prologue = job_script_prologue - self.header_skip = set(header_skip) # dask-worker command line build dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict( @@ -352,16 +372,9 @@ def default_config_name(cls): def job_script(self): """Construct a job submission script""" - header = "\n".join( - [ - line - for line in self.job_header.split("\n") - if not any(skip in line for skip in self.header_skip) - ] - ) pieces = { "shebang": self.shebang, - "job_header": header, + "job_header": self.job_header, "job_script_prologue": "\n".join(filter(None, self._job_script_prologue)), "worker_command": self._command_template, } diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index b7a7b235..4ab5c07a 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -87,6 +87,11 @@ def __init__( "Stream_Error": True, } ) + + if self.job_directives_skip: + for skip in self.job_directives_skip: + self.job_header_dict.pop(skip) + if self.job_extra_directives: self.job_header_dict.update(self.job_extra_directives) diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index 5deae272..a9f57b53 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -24,6 +24,7 @@ jobqueue: resource-spec: null job-extra: null job-extra-directives: [] + job-directives-skip: [] log-directory: null # Scheduler options @@ -54,6 +55,7 @@ jobqueue: resource-spec: null job-extra: null job-extra-directives: [] + job-directives-skip: [] log-directory: null # Scheduler options @@ -83,6 +85,7 @@ jobqueue: job-script-prologue: [] job-extra: null job-extra-directives: [] + job-directives-skip: [] log-directory: null resource-spec: null @@ -115,6 +118,7 @@ jobqueue: job-mem: null job-extra: null job-extra-directives: [] + job-directives-skip: [] log-directory: null # Scheduler options @@ -145,6 +149,7 @@ jobqueue: resource-spec: null job-extra: null job-extra-directives: [] + job-directives-skip: [] log-directory: null # Scheduler options @@ -176,6 +181,7 @@ jobqueue: mem: null job-extra: null job-extra-directives: [] + job-directives-skip: [] log-directory: null lsf-units: null use-stdin: True # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh' @@ -204,6 +210,7 @@ jobqueue: job-script-prologue: [] job-extra: null # Extra submit attributes job-extra-directives: {} # Extra submit attributes + job-directives-skip: [] submit-command-extra: [] # Extra condor_submit arguments cancel-command-extra: [] # Extra condor_rm arguments log-directory: null @@ -230,6 +237,7 @@ jobqueue: job-script-prologue: [] job-extra: null job-extra-directives: [] + job-directives-skip: [] log-directory: null # Scheduler options diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 0d6c7db0..b469a3b9 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -93,6 +93,16 @@ def __init__( header_lines.append("#BSUB -M %s" % memory_string) if walltime is not None: header_lines.append("#BSUB -W %s" % walltime) + + # Skip requested header directives + header_lines = list( + filter( + lambda line: not any(skip in line for skip in self.job_directives_skip), + header_lines, + ) + ) + + # Add extra header directives header_lines.extend(["#BSUB %s" % arg for arg in self.job_extra_directives]) # Declare class attribute that shall be overridden diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 0e8c0d87..9d693857 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -64,6 +64,16 @@ def __init__( full_resource_spec = ",".join(resource_spec_list) header_lines.append("#OAR -l %s" % full_resource_spec) + + # Skip requested header directives + header_lines = list( + filter( + lambda line: not any(skip in line for skip in self.job_directives_skip), + header_lines, + ) + ) + + # Add extra header directives header_lines.extend(["#OAR %s" % arg for arg in self.job_extra_directives]) self.job_header = "\n".join(header_lines) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 289d9e9f..e3569c24 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -95,6 +95,16 @@ def __init__( if self.log_directory is not None: header_lines.append("#PBS -e %s/" % self.log_directory) header_lines.append("#PBS -o %s/" % self.log_directory) + + # Skip requested header directives + header_lines = list( + filter( + lambda line: not any(skip in line for skip in self.job_directives_skip), + header_lines, + ) + ) + + # Add extra header directives header_lines.extend(["#PBS %s" % arg for arg in self.job_extra_directives]) # Declare class attribute that shall be overridden diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 9cb113c6..26dfccca 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -40,32 +40,31 @@ def __init__( header_lines = [] if self.job_name is not None: - header_lines.append("#$ -N %(job-name)s") + header_lines.append("#$ -N %s" % self.job_name) if queue is not None: - header_lines.append("#$ -q %(queue)s") + header_lines.append("#$ -q %s" % queue) if project is not None: - header_lines.append("#$ -P %(project)s") + header_lines.append("#$ -P %s" % project) if resource_spec is not None: - header_lines.append("#$ -l %(resource_spec)s") + header_lines.append("#$ -l %s" % resource_spec) if walltime is not None: - header_lines.append("#$ -l h_rt=%(walltime)s") + header_lines.append("#$ -l h_rt=%s" % walltime) if self.log_directory is not None: - header_lines.append("#$ -e %(log_directory)s/") - header_lines.append("#$ -o %(log_directory)s/") + header_lines.append("#$ -e %s/" % self.log_directory) + header_lines.append("#$ -o %s/" % self.log_directory) header_lines.extend(["#$ -cwd", "#$ -j y"]) + + # Skip requested header directives + header_lines = list( + filter( + lambda line: not any(skip in line for skip in self.job_directives_skip), + header_lines, + ) + ) + + # Add extra header directives header_lines.extend(["#$ %s" % arg for arg in self.job_extra_directives]) - header_template = "\n".join(header_lines) - - config = { - "job-name": self.job_name, - "queue": queue, - "project": project, - "processes": self.worker_processes, - "walltime": walltime, - "resource_spec": resource_spec, - "log_directory": self.log_directory, - } - self.job_header = header_template % config + self.job_header = "\n".join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 4cea7a5e..571b8d69 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -74,6 +74,16 @@ def __init__( if walltime is not None: header_lines.append("#SBATCH -t %s" % walltime) + + # Skip requested header directives + header_lines = list( + filter( + lambda line: not any(skip in line for skip in self.job_directives_skip), + header_lines, + ) + ) + + # Add extra header directives header_lines.extend(["#SBATCH %s" % arg for arg in self.job_extra_directives]) # Declare class attribute that shall be overridden diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index d531317d..3325bbfd 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -1,6 +1,8 @@ import asyncio from time import time + +from dask_jobqueue import PBSCluster, SLURMCluster, SGECluster, OARCluster from dask_jobqueue.local import LocalCluster from dask_jobqueue.pbs import PBSJob from dask_jobqueue.core import JobQueueCluster @@ -109,10 +111,98 @@ def test_header_lines_skip(): job = PBSJob(cores=1, memory="1GB", job_name="foobar") assert "foobar" in job.job_script() - job = PBSJob(cores=1, memory="1GB", job_name="foobar", header_skip=["-N"]) + job = PBSJob(cores=1, memory="1GB", job_name="foobar", job_directives_skip=["-N"]) assert "foobar" not in job.job_script() +def test_header_lines_dont_skip_extra_directives(): + job = PBSJob( + cores=1, memory="1GB", job_name="foobar", job_extra_directives=["-N 123"] + ) + assert "foobar" in job.job_script() + assert "-N 123" in job.job_script() + + job = PBSJob( + cores=1, + memory="1GB", + job_name="foobar", + job_directives_skip=["-N"], + job_extra_directives=["-N 123"], + ) + assert "foobar" not in job.job_script() + assert "-N 123" in job.job_script() + + +# Test only header_skip for the cluster implementation that uses job_name. + + +@pytest.mark.parametrize("Cluster", [PBSCluster, SLURMCluster, SGECluster, OARCluster]) +def test_deprecation_header_skip(Cluster): + import warnings + + # test issuing of warning + warnings.simplefilter("always") + + job_cls = Cluster.job_cls + with warnings.catch_warnings(record=True) as w: + # should give a warning + job = job_cls(cores=1, memory="1 GB", header_skip=["old_param"]) + assert len(w) == 1 + assert issubclass(w[0].category, FutureWarning) + assert "header_skip has been renamed" in str(w[0].message) + with warnings.catch_warnings(record=True) as w: + # should give a warning + job = job_cls( + cores=1, + memory="1 GB", + header_skip=["old_param"], + job_directives_skip=["new_param"], + ) + assert len(w) == 1 + assert issubclass(w[0].category, FutureWarning) + assert "header_skip has been renamed" in str(w[0].message) + with warnings.catch_warnings(record=True) as w: + # should not give a warning + job = job_cls( + cores=1, + memory="1 GB", + job_directives_skip=["new_param"], + ) + assert len(w) == 0 + + # the rest is not about the warning but about behaviour: if job_directives_skip is not + # set, header_skip should still be used if provided + warnings.simplefilter("ignore") + job = job_cls( + cores=1, + memory="1 GB", + job_name="jobname", + header_skip=["jobname"], + job_directives_skip=["new_param"], + ) + job_script = job.job_script() + assert "jobname" in job_script + + job = job_cls( + cores=1, + memory="1 GB", + job_name="jobname", + header_skip=["jobname"], + ) + job_script = job.job_script() + assert "jobname" not in job_script + + job = job_cls( + cores=1, + memory="1 GB", + job_name="jobname", + header_skip=["jobname"], + job_directives_skip=(), + ) + job_script = job.job_script() + assert "jobname" not in job_script + + @pytest.mark.asyncio async def test_nworkers_scale(): async with LocalCluster( diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/advanced-tips-and-tricks.rst index bd084d1e..8100d7f2 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/advanced-tips-and-tricks.rst @@ -10,25 +10,28 @@ clusters. This page is an attempt to document tips and tricks that are likely to be useful on some clusters (strictly more than one ideally although hard to be sure ...). -Skipping unrecognised line in submission script with ``header_skip`` +Skipping unrecognised line in submission script with ``job_directives_skip`` -------------------------------------------------------------------- -On some clusters the submission script generated by Dask-Jobqueue (you can look -at it with ``print(cluster.job_script())``) may not work on your HPC cluster -because on some configuration quirk of your HPC cluster. Probably there are +*Note: the parameter* ``job_directives_skip`` *was named* ``header_skip`` *until version 0.8.0.* ``header_skip`` *can still +be used, but is considered deprecated and will be removed in a future version.* + +On some clusters, the submission script generated by Dask-Jobqueue (you can look +at it with ``print(cluster.job_script())``) may not work +because on some configuration quirk of this HPC cluster. Probably there are some reasons behind this configuration quirk of course. -You'll get an error when doing ``cluster.scale`` (i.e. where you actually -submit some jobs) that will tell you your job scheduler is not happy with your +You'll get an error when calling ``cluster.scale`` (i.e. where you actually +submit some jobs) that will tell you the job scheduler is not happy with your job submission script (see examples below). The main parameter you can use to -work-around this is ``header_skip``: +work-around this is ``job_directives_skip``: .. code-block:: python # this will remove any line containing either '--mem' or # 'another-string' from the job submission script cluster = YourCluster( - header_skip=['--mem', 'another-string'], + job_directives_skip=['--mem', 'another-string'], **other_options_go_here) From 42ff51161bfb291cbc1eaff248514004379fd228 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sat, 3 Sep 2022 21:42:28 +0400 Subject: [PATCH 2/2] Update doc with suggestions --- dask_jobqueue/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5d085999..ff7ecd1c 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -58,8 +58,8 @@ Deprecated: use ``job_directives_skip`` instead. This parameter will be removed in a future version. job_directives_skip : list Directives to skip in the generated job script header. - Directives lines matching this text will be removed. - Directives added trhough job_extra_directives won't be affected. + Directives lines containing the specified strings will be removed. + Directives added by job_extra_directives won't be affected. log_directory : str Directory to use for job scheduler logs. shebang : str