Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not skip job_extra_directives with header_skip. Rename header_skp. #584

Merged
merged 2 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@
job_script_prologue : list
Other commands to add to script before launching worker.
header_skip : list
Lines to skip in the header.
Header lines matching this text will be removed
Deprecated: use ``job_directives_skip`` instead. This parameter will be removed in a future version.
job_directives_skip : list
Directives to skip in the generated job script header.
Directives lines containing the specified strings will be removed.
Directives added by job_extra_directives won't be affected.
log_directory : str
Directory to use for job scheduler logs.
shebang : str
Expand Down Expand Up @@ -169,6 +172,7 @@ def __init__(
env_extra=None,
job_script_prologue=None,
header_skip=None,
job_directives_skip=None,
log_directory=None,
shebang=None,
python=sys.executable,
Expand Down Expand Up @@ -272,8 +276,25 @@ def __init__(
job_script_prologue = env_extra
if header_skip is None:
header_skip = dask.config.get(
"jobqueue.%s.header-skip" % self.config_name, ()
"jobqueue.%s.header-skip" % self.config_name, None
)
if job_directives_skip is None:
job_directives_skip = dask.config.get(
"jobqueue.%s.job-directives-skip" % self.config_name, ()
)
if header_skip is not None:
warn = (
"header_skip has been renamed to job_directives_skip. "
"You are still using it (even if only set to (); please also check config files). "
"If you did not set job_directives_skip yet, header_skip will be respected for now, "
"but it will be removed in a future release. "
"If you already set job_directives_skip, header_skip is ignored and you can remove it."
)
warnings.warn(warn, FutureWarning)
if not job_directives_skip:
job_directives_skip = header_skip
self.job_directives_skip = set(job_directives_skip)

if log_directory is None:
log_directory = dask.config.get(
"jobqueue.%s.log-directory" % self.config_name
Expand Down Expand Up @@ -309,7 +330,6 @@ def __init__(
self.shebang = shebang

self._job_script_prologue = job_script_prologue
self.header_skip = set(header_skip)

# dask-worker command line build
dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict(
Expand Down Expand Up @@ -352,16 +372,9 @@ def default_config_name(cls):

def job_script(self):
"""Construct a job submission script"""
header = "\n".join(
[
line
for line in self.job_header.split("\n")
if not any(skip in line for skip in self.header_skip)
]
)
pieces = {
"shebang": self.shebang,
"job_header": header,
"job_header": self.job_header,
"job_script_prologue": "\n".join(filter(None, self._job_script_prologue)),
"worker_command": self._command_template,
}
Expand Down
5 changes: 5 additions & 0 deletions dask_jobqueue/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ def __init__(
"Stream_Error": True,
}
)

if self.job_directives_skip:
for skip in self.job_directives_skip:
self.job_header_dict.pop(skip)

if self.job_extra_directives:
self.job_header_dict.update(self.job_extra_directives)

Expand Down
8 changes: 8 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobqueue:
resource-spec: null
job-extra: null
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
Expand Down Expand Up @@ -54,6 +55,7 @@ jobqueue:
resource-spec: null
job-extra: null
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
Expand Down Expand Up @@ -83,6 +85,7 @@ jobqueue:
job-script-prologue: []
job-extra: null
job-extra-directives: []
job-directives-skip: []
log-directory: null
resource-spec: null

Expand Down Expand Up @@ -115,6 +118,7 @@ jobqueue:
job-mem: null
job-extra: null
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
Expand Down Expand Up @@ -145,6 +149,7 @@ jobqueue:
resource-spec: null
job-extra: null
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
Expand Down Expand Up @@ -176,6 +181,7 @@ jobqueue:
mem: null
job-extra: null
job-extra-directives: []
job-directives-skip: []
log-directory: null
lsf-units: null
use-stdin: True # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh'
Expand Down Expand Up @@ -204,6 +210,7 @@ jobqueue:
job-script-prologue: []
job-extra: null # Extra submit attributes
job-extra-directives: {} # Extra submit attributes
job-directives-skip: []
submit-command-extra: [] # Extra condor_submit arguments
cancel-command-extra: [] # Extra condor_rm arguments
log-directory: null
Expand All @@ -230,6 +237,7 @@ jobqueue:
job-script-prologue: []
job-extra: null
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
Expand Down
10 changes: 10 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ def __init__(
header_lines.append("#BSUB -M %s" % memory_string)
if walltime is not None:
header_lines.append("#BSUB -W %s" % walltime)

# Skip requested header directives
header_lines = list(
filter(
lambda line: not any(skip in line for skip in self.job_directives_skip),
header_lines,
)
)

# Add extra header directives
header_lines.extend(["#BSUB %s" % arg for arg in self.job_extra_directives])

# Declare class attribute that shall be overridden
Expand Down
10 changes: 10 additions & 0 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ def __init__(

full_resource_spec = ",".join(resource_spec_list)
header_lines.append("#OAR -l %s" % full_resource_spec)

# Skip requested header directives
header_lines = list(
filter(
lambda line: not any(skip in line for skip in self.job_directives_skip),
header_lines,
)
)

# Add extra header directives
header_lines.extend(["#OAR %s" % arg for arg in self.job_extra_directives])

self.job_header = "\n".join(header_lines)
Expand Down
10 changes: 10 additions & 0 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ def __init__(
if self.log_directory is not None:
header_lines.append("#PBS -e %s/" % self.log_directory)
header_lines.append("#PBS -o %s/" % self.log_directory)

# Skip requested header directives
header_lines = list(
filter(
lambda line: not any(skip in line for skip in self.job_directives_skip),
header_lines,
)
)

# Add extra header directives
header_lines.extend(["#PBS %s" % arg for arg in self.job_extra_directives])

# Declare class attribute that shall be overridden
Expand Down
37 changes: 18 additions & 19 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,31 @@ def __init__(

header_lines = []
if self.job_name is not None:
header_lines.append("#$ -N %(job-name)s")
header_lines.append("#$ -N %s" % self.job_name)
guillaumeeb marked this conversation as resolved.
Show resolved Hide resolved
if queue is not None:
header_lines.append("#$ -q %(queue)s")
header_lines.append("#$ -q %s" % queue)
if project is not None:
header_lines.append("#$ -P %(project)s")
header_lines.append("#$ -P %s" % project)
if resource_spec is not None:
header_lines.append("#$ -l %(resource_spec)s")
header_lines.append("#$ -l %s" % resource_spec)
if walltime is not None:
header_lines.append("#$ -l h_rt=%(walltime)s")
header_lines.append("#$ -l h_rt=%s" % walltime)
if self.log_directory is not None:
header_lines.append("#$ -e %(log_directory)s/")
header_lines.append("#$ -o %(log_directory)s/")
header_lines.append("#$ -e %s/" % self.log_directory)
header_lines.append("#$ -o %s/" % self.log_directory)
header_lines.extend(["#$ -cwd", "#$ -j y"])

# Skip requested header directives
header_lines = list(
filter(
lambda line: not any(skip in line for skip in self.job_directives_skip),
header_lines,
)
)

# Add extra header directives
header_lines.extend(["#$ %s" % arg for arg in self.job_extra_directives])
header_template = "\n".join(header_lines)

config = {
"job-name": self.job_name,
"queue": queue,
"project": project,
"processes": self.worker_processes,
"walltime": walltime,
"resource_spec": resource_spec,
"log_directory": self.log_directory,
}
self.job_header = header_template % config
self.job_header = "\n".join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())

Expand Down
10 changes: 10 additions & 0 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ def __init__(

if walltime is not None:
header_lines.append("#SBATCH -t %s" % walltime)

# Skip requested header directives
header_lines = list(
filter(
lambda line: not any(skip in line for skip in self.job_directives_skip),
header_lines,
)
)

# Add extra header directives
header_lines.extend(["#SBATCH %s" % arg for arg in self.job_extra_directives])

# Declare class attribute that shall be overridden
Expand Down
92 changes: 91 additions & 1 deletion dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from time import time


from dask_jobqueue import PBSCluster, SLURMCluster, SGECluster, OARCluster
from dask_jobqueue.local import LocalCluster
from dask_jobqueue.pbs import PBSJob
from dask_jobqueue.core import JobQueueCluster
Expand Down Expand Up @@ -109,10 +111,98 @@ def test_header_lines_skip():
job = PBSJob(cores=1, memory="1GB", job_name="foobar")
assert "foobar" in job.job_script()

job = PBSJob(cores=1, memory="1GB", job_name="foobar", header_skip=["-N"])
job = PBSJob(cores=1, memory="1GB", job_name="foobar", job_directives_skip=["-N"])
assert "foobar" not in job.job_script()


def test_header_lines_dont_skip_extra_directives():
job = PBSJob(
cores=1, memory="1GB", job_name="foobar", job_extra_directives=["-N 123"]
)
assert "foobar" in job.job_script()
assert "-N 123" in job.job_script()

job = PBSJob(
cores=1,
memory="1GB",
job_name="foobar",
job_directives_skip=["-N"],
job_extra_directives=["-N 123"],
)
assert "foobar" not in job.job_script()
assert "-N 123" in job.job_script()


# Test only header_skip for the cluster implementation that uses job_name.


@pytest.mark.parametrize("Cluster", [PBSCluster, SLURMCluster, SGECluster, OARCluster])
def test_deprecation_header_skip(Cluster):
import warnings

# test issuing of warning
warnings.simplefilter("always")

job_cls = Cluster.job_cls
with warnings.catch_warnings(record=True) as w:
# should give a warning
job = job_cls(cores=1, memory="1 GB", header_skip=["old_param"])
assert len(w) == 1
assert issubclass(w[0].category, FutureWarning)
assert "header_skip has been renamed" in str(w[0].message)
with warnings.catch_warnings(record=True) as w:
# should give a warning
job = job_cls(
cores=1,
memory="1 GB",
header_skip=["old_param"],
job_directives_skip=["new_param"],
)
assert len(w) == 1
assert issubclass(w[0].category, FutureWarning)
assert "header_skip has been renamed" in str(w[0].message)
with warnings.catch_warnings(record=True) as w:
# should not give a warning
job = job_cls(
cores=1,
memory="1 GB",
job_directives_skip=["new_param"],
)
assert len(w) == 0

# the rest is not about the warning but about behaviour: if job_directives_skip is not
# set, header_skip should still be used if provided
warnings.simplefilter("ignore")
job = job_cls(
cores=1,
memory="1 GB",
job_name="jobname",
header_skip=["jobname"],
job_directives_skip=["new_param"],
)
job_script = job.job_script()
assert "jobname" in job_script

job = job_cls(
cores=1,
memory="1 GB",
job_name="jobname",
header_skip=["jobname"],
)
job_script = job.job_script()
assert "jobname" not in job_script

job = job_cls(
cores=1,
memory="1 GB",
job_name="jobname",
header_skip=["jobname"],
job_directives_skip=(),
)
job_script = job.job_script()
assert "jobname" not in job_script


@pytest.mark.asyncio
async def test_nworkers_scale():
async with LocalCluster(
Expand Down
Loading