Skip to content

Commit

Permalink
Merge pull request #4494 from jasonb5/fix_workflow_dependencies
Browse files Browse the repository at this point in the history
Fixes workflow job dependencies
  • Loading branch information
jedwards4b authored Oct 9, 2023
2 parents b8d6d2b + ce7b423 commit c20dec5
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 13 deletions.
52 changes: 40 additions & 12 deletions CIME/XML/env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,20 +795,10 @@ def submit_jobs(
batch_job_id = None
for _ in range(num_submit):
for job, dependency in jobs:
if dependency is not None:
deps = dependency.split()
else:
deps = []
dep_jobs = []
if user_prereq is not None:
dep_jobs.append(user_prereq)
for dep in deps:
if dep in depid.keys() and depid[dep] is not None:
dep_jobs.append(str(depid[dep]))
if prev_job is not None:
dep_jobs.append(prev_job)
dep_jobs = get_job_deps(dependency, depid, prev_job, user_prereq)

logger.debug("job {} depends on {}".format(job, dep_jobs))

result = self._submit_single_job(
case,
job,
Expand Down Expand Up @@ -1399,3 +1389,41 @@ def make_all_batch_files(self, case):
input_batch_script, job
)
)


def get_job_deps(dependency, depid, prev_job=None, user_prereq=None):
"""
Gather list of job batch ids that a job depends on.
Parameters
----------
dependency : str
List of dependent job names.
depid : dict
Lookup where keys are job names and values are the batch id.
user_prereq : str
User requested dependency.
Returns
-------
list
List of batch ids that job depends on.
"""
deps = []
dep_jobs = []

if user_prereq is not None:
dep_jobs.append(user_prereq)

if dependency is not None:
# Match all words, excluding "and" and "or"
deps = re.findall(r"\b(?!and\b|or\b)\w+(?:\.\w+)?\b", dependency)

for dep in deps:
if dep in depid and depid[dep] is not None:
dep_jobs.append(str(depid[dep]))

if prev_job is not None:
dep_jobs.append(prev_job)

return dep_jobs
179 changes: 178 additions & 1 deletion CIME/tests/test_unit_xml_env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,189 @@
import tempfile
from unittest import mock

from CIME.XML.env_batch import EnvBatch
from CIME.utils import CIMEError
from CIME.XML.env_batch import EnvBatch, get_job_deps

# pylint: disable=unused-argument


class TestXMLEnvBatch(unittest.TestCase):
@mock.patch("CIME.XML.env_batch.EnvBatch._submit_single_job")
def test_submit_jobs(self, _submit_single_job):
case = mock.MagicMock()

case.get_value.side_effect = [
False,
]

env_batch = EnvBatch()

with self.assertRaises(CIMEError):
env_batch.submit_jobs(case)

@mock.patch("CIME.XML.env_batch.os.path.isfile")
@mock.patch("CIME.XML.env_batch.get_batch_script_for_job")
@mock.patch("CIME.XML.env_batch.EnvBatch._submit_single_job")
def test_submit_jobs_dependency(
self, _submit_single_job, get_batch_script_for_job, isfile
):
case = mock.MagicMock()

case.get_env.return_value.get_jobs.return_value = [
"case.build",
"case.run",
]

case.get_env.return_value.get_value.side_effect = [
None,
"",
None,
"case.build",
]

case.get_value.side_effect = [
False,
]

_submit_single_job.side_effect = ["0", "1"]

isfile.return_value = True

get_batch_script_for_job.side_effect = [".case.build", ".case.run"]

env_batch = EnvBatch()

depid = env_batch.submit_jobs(case)

_submit_single_job.assert_any_call(
case,
"case.build",
skip_pnl=False,
resubmit_immediate=False,
dep_jobs=[],
allow_fail=False,
no_batch=False,
mail_user=None,
mail_type=None,
batch_args=None,
dry_run=False,
workflow=True,
)
_submit_single_job.assert_any_call(
case,
"case.run",
skip_pnl=False,
resubmit_immediate=False,
dep_jobs=[
"0",
],
allow_fail=False,
no_batch=False,
mail_user=None,
mail_type=None,
batch_args=None,
dry_run=False,
workflow=True,
)
assert depid == {"case.build": "0", "case.run": "1"}

@mock.patch("CIME.XML.env_batch.os.path.isfile")
@mock.patch("CIME.XML.env_batch.get_batch_script_for_job")
@mock.patch("CIME.XML.env_batch.EnvBatch._submit_single_job")
def test_submit_jobs_single(
self, _submit_single_job, get_batch_script_for_job, isfile
):
case = mock.MagicMock()

case.get_env.return_value.get_jobs.return_value = [
"case.run",
]

case.get_env.return_value.get_value.return_value = None

case.get_value.side_effect = [
False,
]

_submit_single_job.return_value = "0"

isfile.return_value = True

get_batch_script_for_job.side_effect = [
".case.run",
]

env_batch = EnvBatch()

depid = env_batch.submit_jobs(case)

_submit_single_job.assert_any_call(
case,
"case.run",
skip_pnl=False,
resubmit_immediate=False,
dep_jobs=[],
allow_fail=False,
no_batch=False,
mail_user=None,
mail_type=None,
batch_args=None,
dry_run=False,
workflow=True,
)
assert depid == {"case.run": "0"}

def test_get_job_deps(self):
# no jobs
job_deps = get_job_deps("", {})

assert job_deps == []

# dependency doesn't exist
job_deps = get_job_deps("case.run", {})

assert job_deps == []

job_deps = get_job_deps("case.run", {"case.run": 0})

assert job_deps == [
"0",
]

job_deps = get_job_deps(
"case.run case.post_run_io", {"case.run": 0, "case.post_run_io": 1}
)

assert job_deps == ["0", "1"]

# old syntax
job_deps = get_job_deps("case.run and case.post_run_io", {"case.run": 0})

assert job_deps == [
"0",
]

# old syntax
job_deps = get_job_deps(
"(case.run and case.post_run_io) or case.test", {"case.run": 0}
)

assert job_deps == [
"0",
]

job_deps = get_job_deps("", {}, user_prereq="2")

assert job_deps == [
"2",
]

job_deps = get_job_deps("", {}, prev_job="1")

assert job_deps == [
"1",
]

def test_get_submit_args_job_queue(self):
with tempfile.NamedTemporaryFile() as tfile:
tfile.write(
Expand Down

0 comments on commit c20dec5

Please sign in to comment.