Skip to content

Commit

Permalink
Merge pull request #212 from jmchilton/job_script_extensions
Browse files Browse the repository at this point in the history
Add extension point allowing writing up a job script file for tool commands.
  • Loading branch information
jmchilton authored Oct 20, 2016
2 parents 2352d16 + 879c254 commit 5c8ee81
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 36 deletions.
2 changes: 2 additions & 0 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self): # type: () -> None
self.fs_access = None # type: StdFsAccess
self.job = None # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
self.requirements = None # type: List[Dict[Text, Any]]
self.hints = None # type: List[Dict[Text, Any]]
self.outdir = None # type: Text
self.tmpdir = None # type: Text
self.resources = None # type: Dict[Text, Union[int, Text]]
Expand All @@ -34,6 +35,7 @@ def __init__(self): # type: () -> None
self.pathmapper = None # type: PathMapper
self.stagedir = None # type: Text
self.make_fs_access = None # type: Type[StdFsAccess]
self.build_job_script = None # type: Callable[[List[str]], Text]

def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]):
# type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]]
Expand Down
214 changes: 178 additions & 36 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import stat
import re
import shellescape
import string
from .docker_uid import docker_vm_uid
from .builder import Builder
from typing import (Any, Callable, Union, Iterable, Mapping, MutableMapping,
Expand All @@ -25,6 +26,58 @@

needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")

FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"

SHELL_COMMAND_TEMPLATE = """#!/bin/bash
python "run_job.py" "job.json"
"""

PYTHON_RUN_SCRIPT = """
import json
import sys
import subprocess
with open(sys.argv[1], "r") as f:
popen_description = json.load(f)
commands = popen_description["commands"]
cwd = popen_description["cwd"]
env = popen_description["env"]
stdin_path = popen_description["stdin_path"]
stdout_path = popen_description["stdout_path"]
stderr_path = popen_description["stderr_path"]
if stdin_path is not None:
stdin = open(stdin_path, "rb")
else:
stdin = subprocess.PIPE
if stdout_path is not None:
stdout = open(stdout_path, "wb")
else:
stdout = sys.stderr
if stderr_path is not None:
stderr = open(stderr_path, "wb")
else:
stderr = sys.stderr
sp = subprocess.Popen(commands,
shell=False,
close_fds=True,
stdin=stdin,
stdout=stdout,
stderr=stderr,
env=env,
cwd=cwd)
if sp.stdin:
sp.stdin.close()
rcode = sp.wait()
if isinstance(stdin, file):
stdin.close()
if stdout is not sys.stderr:
stdout.close()
if stderr is not sys.stderr:
stderr.close()
sys.exit(rcode)
"""


def deref_links(outputs): # type: (Any) -> None
if isinstance(outputs, dict):
if outputs.get("class") == "File":
Expand Down Expand Up @@ -151,10 +204,6 @@ def run(self, dry_run=False, pull_image=True, rm_container=True,

stageFiles(self.pathmapper, os.symlink)

stdin = None # type: Union[IO[Any], int]
stderr = None # type: IO[Any]
stdout = None # type: IO[Any]

scr, _ = get_feature(self, "ShellCommandRequirement")

if scr:
Expand Down Expand Up @@ -191,51 +240,36 @@ def linkoutdir(src, tgt):
break
stageFiles(generatemapper, linkoutdir)

stdin_path = None
if self.stdin:
stdin = open(self.pathmapper.reversemap(self.stdin)[1], "rb")
else:
stdin = subprocess.PIPE
stdin_path = self.pathmapper.reversemap(self.stdin)[1]

stderr_path = None
if self.stderr:
abserr = os.path.join(self.outdir, self.stderr)
dnerr = os.path.dirname(abserr)
if dnerr and not os.path.exists(dnerr):
os.makedirs(dnerr)
stderr = open(abserr, "wb")
else:
stderr = sys.stderr
stderr_path = abserr

stdout_path = None
if self.stdout:
absout = os.path.join(self.outdir, self.stdout)
dn = os.path.dirname(absout)
if dn and not os.path.exists(dn):
os.makedirs(dn)
stdout = open(absout, "wb")
else:
stdout = sys.stderr

sp = subprocess.Popen([Text(x).encode('utf-8') for x in runtime + self.command_line],
shell=False,
close_fds=True,
stdin=stdin,
stderr=stderr,
stdout=stdout,
env=env,
cwd=self.outdir)

if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

if isinstance(stdin, file):
stdin.close()

if stderr is not sys.stderr:
stderr.close()

if stdout is not sys.stderr:
stdout.close()
stdout_path = absout

build_job_script = self.builder.build_job_script # type: Callable[[List[str]], Text]
rcode = _job_popen(
[Text(x).encode('utf-8') for x in runtime + self.command_line],
stdin_path=stdin_path,
stdout_path=stdout_path,
stderr_path=stderr_path,
env=env,
cwd=self.outdir,
build_job_script=build_job_script,
)

if self.successCodes and rcode in self.successCodes:
processStatus = "success"
Expand Down Expand Up @@ -294,3 +328,111 @@ def linkoutdir(src, tgt):
if move_outputs == "move" and empty_subtree(self.outdir):
_logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir)
shutil.rmtree(self.outdir, True)


def _job_popen(
commands, # type: List[str]
stdin_path, # type: Text
stdout_path, # type: Text
stderr_path, # type: Text
env, # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]]
cwd, # type: Text
job_dir=None, # type: Text
build_job_script=None, # type: Callable[[List[str]], Text]
):
# type: (...) -> int

job_script_contents = None # type: Text
if build_job_script:
job_script_contents = build_job_script(commands)

if not job_script_contents and not FORCE_SHELLED_POPEN:

stdin = None # type: Union[IO[Any], int]
stderr = None # type: IO[Any]
stdout = None # type: IO[Any]

if stdin_path is not None:
stdin = open(stdin_path, "rb")
else:
stdin = subprocess.PIPE

if stdout_path is not None:
stdout = open(stdout_path, "wb")
else:
stdout = sys.stderr

if stderr_path is not None:
stderr = open(stderr_path, "wb")
else:
stderr = sys.stderr

sp = subprocess.Popen(commands,
shell=False,
close_fds=True,
stdin=stdin,
stdout=stdout,
stderr=stderr,
env=env,
cwd=cwd)

if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

if isinstance(stdin, file):
stdin.close()

if stdout is not sys.stderr:
stdout.close()

if stderr is not sys.stderr:
stderr.close()

return rcode
else:
if job_dir is None:
job_dir = tempfile.mkdtemp(prefix="cwltooljob")

if not job_script_contents:
job_script_contents = SHELL_COMMAND_TEMPLATE

env_copy = {}
for key in env:
key = key.encode("utf-8")
env_copy[key] = env[key]

job_description = dict(
commands=commands,
cwd=cwd,
env=env_copy,
stdout_path=stdout_path,
stderr_path=stderr_path,
stdin_path=stdin_path,
)
with open(os.path.join(job_dir, "job.json"), "w") as f:
json.dump(job_description, f)
try:
job_script = os.path.join(job_dir, "run_job.bash")
with open(job_script, "w") as f:
f.write(job_script_contents)
job_run = os.path.join(job_dir, "run_job.py")
with open(job_run, "w") as f:
f.write(PYTHON_RUN_SCRIPT)
sp = subprocess.Popen(
["bash", job_script.encode("utf-8")],
shell=False,
cwd=job_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
)
if sp.stdin:
sp.stdin.close()

rcode = sp.wait()

return rcode
finally:
shutil.rmtree(job_dir)
1 change: 1 addition & 0 deletions cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def _init_job(self, joborder, **kwargs):
builder.schemaDefs = self.schemaDefs
builder.names = self.names
builder.requirements = self.requirements
builder.hints = self.hints
builder.resources = {}
builder.timeout = kwargs.get("eval_timeout")

Expand Down

0 comments on commit 5c8ee81

Please sign in to comment.