Skip to content

Commit

Permalink
Extend to ext-triggers.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jun 17, 2023
1 parent 038dbe9 commit 8066cca
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
40 changes: 28 additions & 12 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1666,8 +1666,6 @@ def force_trigger_tasks(
Triggering an active task has no effect (it is already triggered).
TODO - extend to push ext-triggers.
"""
# Process provided flow options.
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
Expand Down Expand Up @@ -1714,11 +1712,11 @@ def force_trigger_tasks(
continue
itask.is_manual_submit = True
msg = "Force-satisfying prerequisites."
if itask.state.xtriggers:
# xtriggers not satisfied as we just spawned the task
if itask.state.xtriggers or itask.state.external_triggers:
# these can't be satisfied as we just spawned the task
msg += (
"\nYou may need to trigger this task again"
" to force-satisfy xtriggers."
" to force-satisfy external triggers."
)
msg += (
"\nYou may need to trigger this task again"
Expand All @@ -1744,13 +1742,16 @@ def force_trigger_tasks(
self.data_store_mgr.delta_task_state(itask)
itask.reset_try_timers()

# Satisfy prerequisites, if any.
if itask.is_task_prereqs_not_done():
msg = "Force-satisfying prerequisites."
if not itask.state.xtriggers_all_satisfied():
# xtriggers not satisfied as we just spawned the task
if (
not itask.state.xtriggers_all_satisfied()
or not itask.state.external_triggers_all_satisfied()
):
msg += (
"\nYou may need to trigger this task again"
" to force-satisfy xtriggers."
" to force-satisfy external triggers."
)
msg += (
"\nYou may need to trigger this task again"
Expand All @@ -1761,20 +1762,35 @@ def force_trigger_tasks(
self.runahead_release(itask)
continue

# Satisfy xtriggers and ext-triggers, if any.
# (Let's log both kinds as "external triggers')
extrigs_forced = False
if (
itask.state.xtriggers and
not itask.state.xtriggers_all_satisfied()
):
itask.state.xtriggers_set_all_satisfied()
self.xtrigger_mgr.housekeep(self.get_tasks())
self.runahead_release(itask)
extrigs_forced = True

if (
itask.state.external_triggers and
not itask.state.external_triggers_all_satisfied()
):
itask.state.external_triggers_set_all_satisfied()
self.runahead_release(itask)
extrigs_forced = True

if extrigs_forced:
LOG.warning(
f"[{itask}] - Force-satisfying xtriggers."
f"[{itask}] - Force-satisfying external triggers."
"\nYou may need to trigger this task again"
" to force queue-release."
)
itask.state.xtriggers_set_all_satisfied()
self.xtrigger_mgr.housekeep(self.get_tasks())
self.runahead_release(itask)
continue

# Queue, or queue-release if already queued.
if itask.state.is_queued:
LOG.warning(f"[{itask}] - Forcing queue release.")
self.task_queue_mgr.force_release_task(itask)
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def xtriggers_all_satisfied(self):
return all(self.xtriggers.values())

def xtriggers_set_all_satisfied(self):
"""Set all xtriggers to satisfied.
"""Force-satisfy all xtriggers.
WARNING: you must invoke xtrigger manager housekeeping after this.
"""
Expand All @@ -332,6 +332,11 @@ def external_triggers_all_satisfied(self):
"""Return True if all external triggers are satisfied."""
return all(self.external_triggers.values())

def external_triggers_set_all_satisfied(self):
"""Force-satisfy all external triggers."""
for key in self.external_triggers.keys():
self.external_triggers[key] = True

def prerequisites_all_satisfied(self):
"""Return True if (non-suicide) prerequisites are fully satisfied."""
if self._is_satisfied is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
limit = 1
[[xtriggers]]
never = xrandom("0","0")
[[special tasks]]
external-trigger = foo("Ain't never gonna happen.")
[[graph]]
R1 = """
triggerer
Expand All @@ -29,7 +31,7 @@
cylc trigger "${CYLC_WORKFLOW_ID}//1/foo"
cylc__job__poll_grep_workflow_log -E "1/foo .* Force-satisfying prerequisites"
cylc trigger "${CYLC_WORKFLOW_ID}//1/foo"
cylc__job__poll_grep_workflow_log -E "1/foo .* Force-satisfying xtriggers"
cylc__job__poll_grep_workflow_log -E "1/foo .* Force-satisfying external triggers"
cylc trigger "${CYLC_WORKFLOW_ID}//1/foo"
cylc__job__poll_grep_workflow_log -E "1/foo .* Forcing queue release"
cylc trigger "${CYLC_WORKFLOW_ID}//1/foo"
Expand Down

0 comments on commit 8066cca

Please sign in to comment.