Skip to content

Commit

Permalink
Distinguish never-spawned from never-submitted.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Apr 15, 2024
1 parent 4b70574 commit da9ac3a
Showing 1 changed file with 35 additions and 9 deletions.
44 changes: 35 additions & 9 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,12 @@ def spawn_on_output(self, itask, output, forced=False):
msg += " suiciding while active"
self.remove(c_task, msg)

if suicide:
# Update the DB immediately to ensure a record exists in case of
# very quick removal and respawn, due to suicide triggers.
# See https://github.com/cylc/cylc-flow/issues/6066
self.workflow_db_mgr.process_queued_ops()

Check warning on line 1408 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1408

Added line #L1408 was not covered by tests

self.remove_if_complete(itask, output)

def remove_if_complete(
Expand Down Expand Up @@ -1555,17 +1561,33 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
) -> Tuple[int, str, bool]:
"""Get history of previous submits for this task."""
) -> Tuple[bool, int, str, bool]:
"""Get history of previous submits for this task.
Args:
name: task name
point: task cycle point
flow_nums: task flow numbers
Returns:
never_spawned: if task never spawned before
submit_num: submit number of previous submit
prev_status: task status of previous sumbit
prev_flow_wait: if previous submit was a flow-wait task
"""
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num: int = max(s[0] for s in info)
except ValueError:
# never spawned before in any flow
# never spawned in any flow
submit_num = 0
never_spawned = True
else:
never_spawned = False
# (submit_num could still be zero, if removed before submit)

prev_status: str = TASK_STATUS_WAITING
prev_flow_wait = False
Expand All @@ -1582,7 +1604,7 @@ def _get_task_history(
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the finished one, if any.

return submit_num, prev_status, prev_flow_wait
return never_spawned, submit_num, prev_status, prev_flow_wait

def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
Expand Down Expand Up @@ -1619,10 +1641,15 @@ def spawn_task(
if not self.can_be_spawned(name, point):
return None

submit_num, prev_status, prev_flow_wait = (
never_spawned, submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

if not never_spawned and submit_num == 0:
# Previous spawn suicided before completing any outputs.
LOG.debug(f"{point}/{name} already spawned in this flow")
return None

Check warning on line 1651 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1650-L1651

Added lines #L1650 - L1651 were not covered by tests

itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
Expand Down Expand Up @@ -1653,8 +1680,6 @@ def spawn_task(
if itask.transient and not force:
return None

# (else not previously finishedr, so run it)

if not itask.transient:
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
Expand Down Expand Up @@ -2117,8 +2142,9 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue

submit_num, _prev_status, prev_fwait = self._get_task_history(
name, point, flow_nums)
_, submit_num, _prev_status, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)

itask = TaskProxy(
self.tokens,
Expand Down

0 comments on commit da9ac3a

Please sign in to comment.