Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for environment #18

Merged
merged 8 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions jobspec/transformer/flux/steps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import copy
import json
import os
import re
import shlex
import uuid

import jobspec.utils as utils
Expand Down Expand Up @@ -75,6 +77,21 @@ def cleanup(self, filename):
if os.path.exists(filename):
os.remove(filename)

def write_job_script(self, command):
"""
Given a bash, shell, or python command, write
into a script.
"""
command = command.strip()
match = re.match("#!/bin/(?P<executable>bash|sh|python)", command)
terms = match.groupdict()
tmpfile = utils.get_tmpfile(prefix="jobscript-", suffix=".sh")

# Clean self up (commented out because makes me nervous)
command += f"\n# rm -rf {tmpfile}"
utils.write_file(command, tmpfile)
return [terms["executable"], tmpfile]

def prepare(self, command=None, waitable=False):
"""
Return the command, without flux submit|batch
Expand All @@ -83,10 +100,8 @@ def prepare(self, command=None, waitable=False):

# We can get the resources from options
resources = self.options.get("resources")

# These aren't used yet - they need to go into flux
attributes = self.options.get("attributes") or {}
task = self.options.get("task") or {}
attributes = task.get("attributes") or {}

# This flattens to be what we ask flux for
slot = resources.flatten_slot()
Expand All @@ -100,6 +115,10 @@ def prepare(self, command=None, waitable=False):
cwd = attributes.get("cwd")
watch = attributes.get("watch")

# Environment
for key, value in attributes.get("environment", {}).items():
cmd += [f"--env={key}={value}"]

# Note that you need to install our frobnicator plugin
# for this to work. See the examples/depends_on directory
for depends_on in task.get("depends_on") or []:
Expand Down Expand Up @@ -135,8 +154,15 @@ def prepare(self, command=None, waitable=False):
# Right now assume command is required
if not command:
command = task["command"]

# Case 1: we are given a script to write
if isinstance(command, str) and re.search("#!/bin/(bash|sh|python)", command):
command = self.write_job_script(command)

# String that should be a list
if isinstance(command, str):
command = [command]
command = shlex.split(command)

cmd += command
return cmd

Expand Down Expand Up @@ -266,7 +292,8 @@ def run(self, *args, **kwargs):
cmd = self.generate_command()

# Are we watching?
attributes = self.options.get("attributes") or {}
task = self.options.get("task") or {}
attributes = task.get("attributes") or {}
watch = attributes.get("watch")
res = utils.run_command(cmd, check_output=True, stream=watch)

Expand Down
8 changes: 4 additions & 4 deletions jobspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def recursive_find(base, pattern="[.]py"):
yield filepath


def get_tmpfile(tmpdir=None, prefix=""):
def get_tmpfile(tmpdir=None, prefix="", suffix=None):
"""
Get a temporary file with an optional prefix.
"""
Expand All @@ -51,7 +51,7 @@ def get_tmpfile(tmpdir=None, prefix=""):
if tmpdir:
prefix = os.path.join(tmpdir, os.path.basename(prefix))

fd, tmp_file = tempfile.mkstemp(prefix=prefix)
fd, tmp_file = tempfile.mkstemp(prefix=prefix, suffix=suffix)
os.close(fd)

return tmp_file
Expand All @@ -62,7 +62,7 @@ def get_tmpdir(tmpdir=None, prefix="", create=True):
Get a temporary directory for an operation.
"""
tmpdir = tmpdir or tempfile.gettempdir()
prefix = prefix or "jobspec-"
prefix = prefix or "jobspec"
prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names()))
tmpdir = os.path.join(tmpdir, prefix)

Expand Down Expand Up @@ -120,7 +120,7 @@ def run_command(cmd, stream=False, check_output=False, return_code=0):
If check_output is True, check against an expected return code.
"""
stdout = subprocess.PIPE if not stream else None
output = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=stdout)
output = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=stdout, env=os.environ.copy())
t = output.communicate()[0], output.returncode
output = {"message": t[0], "return_code": t[1]}

Expand Down
2 changes: 1 addition & 1 deletion jobspec/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.1.14"
__version__ = "0.1.15"
AUTHOR = "Vanessa Sochat"
AUTHOR_EMAIL = "vsoch@users.noreply.github.com"
NAME = "jobspec"
Expand Down