diff --git a/changes.d/6067.fix.md b/changes.d/6067.fix.md
new file mode 100644
index 00000000000..bea01066dae
--- /dev/null
+++ b/changes.d/6067.fix.md
@@ -0,0 +1 @@
+Fixed a bug that sometimes allowed suicide-triggered or manually removed tasks to be added back later.
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 14f376dfb74..e3275d76aed 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -1788,6 +1788,7 @@ def _reset_job_timers(self, itask):
itask.timeout = None
itask.poll_timer = None
return
+
ctx = (itask.submit_num, itask.state.status)
if itask.poll_timer and itask.poll_timer.ctx == ctx:
return
@@ -1844,7 +1845,7 @@ def _reset_job_timers(self, itask):
message += '%d*' % (num + 1)
message += '%s,' % intvl_as_str(item)
message += '...'
- LOG.info(f"[{itask}] {message}")
+ LOG.debug(f"[{itask}] {message}")
# Set next poll time
self.check_poll_time(itask)
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index 2b214d70943..2f49de1e158 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -103,7 +103,7 @@ class TaskPool:
ERR_TMPL_NO_TASKID_MATCH = "No matching tasks found: {0}"
ERR_PREFIX_TASK_NOT_ON_SEQUENCE = "Invalid cycle point for task: {0}, {1}"
- SUICIDE_MSG = "suicide"
+ SUICIDE_MSG = "suicide trigger"
def __init__(
self,
@@ -221,7 +221,7 @@ def add_to_pool(self, itask) -> None:
self.active_tasks.setdefault(itask.point, {})
self.active_tasks[itask.point][itask.identity] = itask
self.active_tasks_changed = True
- LOG.info(f"[{itask}] added to active task pool")
+ LOG.debug(f"[{itask}] added to active task pool")
self.create_data_store_elements(itask)
@@ -807,10 +807,11 @@ def remove(self, itask, reason=None):
itask.flow_nums
)
+ msg = "removed from active task pool"
if reason is None:
- msg = "task completed"
+ msg += ": completed"
else:
- msg = f"removed ({reason})"
+ msg += f": {reason}"
if itask.is_xtrigger_sequential:
self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity)
@@ -837,7 +838,17 @@ def remove(self, itask, reason=None):
# Event-driven final update of task_states table.
# TODO: same for datastore (still updated by scheduler loop)
self.workflow_db_mgr.put_update_task_state(itask)
- LOG.info(f"[{itask}] {msg}")
+
+ level = logging.DEBUG
+ if itask.state(
+ TASK_STATUS_PREPARING,
+ TASK_STATUS_SUBMITTED,
+ TASK_STATUS_RUNNING,
+ ):
+ level = logging.WARNING
+ msg += " - active job orphaned"
+
+ LOG.log(level, f"[{itask}] {msg}")
del itask
def get_tasks(self) -> List[TaskProxy]:
@@ -1392,14 +1403,12 @@ def spawn_on_output(self, itask, output, forced=False):
suicide.append(t)
for c_task in suicide:
- msg = self.__class__.SUICIDE_MSG
- if c_task.state(
- TASK_STATUS_PREPARING,
- TASK_STATUS_SUBMITTED,
- TASK_STATUS_RUNNING,
- is_held=False):
- msg += " suiciding while active"
- self.remove(c_task, msg)
+ self.remove(c_task, self.__class__.SUICIDE_MSG)
+
+ if suicide:
+ # Update DB now in case of very quick respawn attempt.
+ # See https://github.com/cylc/cylc-flow/issues/6066
+ self.workflow_db_mgr.process_queued_ops()
self.remove_if_complete(itask, output)
@@ -1555,17 +1564,33 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:
def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
- ) -> Tuple[int, str, bool]:
- """Get history of previous submits for this task."""
+ ) -> Tuple[bool, int, str, bool]:
+ """Get history of previous submits for this task.
+
+ Args:
+ name: task name
+ point: task cycle point
+ flow_nums: task flow numbers
+
+ Returns:
+ never_spawned: if task never spawned before
+ submit_num: submit number of previous submit
+ prev_status: task status of previous sumbit
+ prev_flow_wait: if previous submit was a flow-wait task
+ """
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num: int = max(s[0] for s in info)
except ValueError:
- # never spawned before in any flow
+ # never spawned in any flow
submit_num = 0
+ never_spawned = True
+ else:
+ never_spawned = False
+ # (submit_num could still be zero, if removed before submit)
prev_status: str = TASK_STATUS_WAITING
prev_flow_wait = False
@@ -1582,7 +1607,7 @@ def _get_task_history(
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the finished one, if any.
- return submit_num, prev_status, prev_flow_wait
+ return never_spawned, submit_num, prev_status, prev_flow_wait
def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
@@ -1619,10 +1644,19 @@ def spawn_task(
if not self.can_be_spawned(name, point):
return None
- submit_num, prev_status, prev_flow_wait = (
+ never_spawned, submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)
+ if (
+ not never_spawned and
+ not prev_flow_wait and
+ submit_num == 0
+ ):
+ # Previous instance removed before completing any outputs.
+ LOG.debug(f"Not spawning {point}/{name} - task removed")
+ return None
+
itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
@@ -1653,8 +1687,6 @@ def spawn_task(
if itask.transient and not force:
return None
- # (else not previously finishedr, so run it)
-
if not itask.transient:
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
@@ -2117,8 +2149,9 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue
- submit_num, _prev_status, prev_fwait = self._get_task_history(
- name, point, flow_nums)
+ _, submit_num, _prev_status, prev_fwait = (
+ self._get_task_history(name, point, flow_nums)
+ )
itask = TaskProxy(
self.tokens,
diff --git a/tests/functional/cylc-set/02-off-flow-out.t b/tests/functional/cylc-set/02-off-flow-out.t
index a18d3e61fbf..cdb21fbb078 100644
--- a/tests/functional/cylc-set/02-off-flow-out.t
+++ b/tests/functional/cylc-set/02-off-flow-out.t
@@ -31,14 +31,14 @@ reftest_run
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/a_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/a_cold.* setting implied output: started'
-grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* task completed'
+grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/b_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/b_cold.* setting implied output: started'
-grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* task completed'
+grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/c_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/c_cold.* setting implied output: started'
-grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* task completed'
+grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* completed'
purge
diff --git a/tests/functional/execution-time-limit/04-polling-intervals.t b/tests/functional/execution-time-limit/04-polling-intervals.t
index e1df403f155..f15e4e8a74d 100644
--- a/tests/functional/execution-time-limit/04-polling-intervals.t
+++ b/tests/functional/execution-time-limit/04-polling-intervals.t
@@ -41,7 +41,7 @@ __FLOW__
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
-cylc play "${WORKFLOW_NAME}"
+cylc play --debug "${WORKFLOW_NAME}"
poll_grep_workflow_log "INFO - DONE"
diff --git a/tests/functional/hold-release/05-release.t b/tests/functional/hold-release/05-release.t
index 26f4e22d414..ea4cc3d95a5 100755
--- a/tests/functional/hold-release/05-release.t
+++ b/tests/functional/hold-release/05-release.t
@@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
inherit = STOP
script = """
cylc__job__poll_grep_workflow_log -E \
- '1/dog1/01:succeeded.* task completed'
+ '1/dog1/01:succeeded.* completed'
cylc stop "${CYLC_WORKFLOW_ID}"
"""
__FLOW_CONFIG__
diff --git a/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc b/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc
index a40e6e9be33..85138715d1b 100644
--- a/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc
+++ b/tests/functional/spawn-on-demand/09-set-outputs/flow.cylc
@@ -44,7 +44,7 @@
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/foo"
# Set bar outputs after it is gone from the pool.
- cylc__job__poll_grep_workflow_log -E "1/bar.* task completed"
+ cylc__job__poll_grep_workflow_log -E "1/bar.* completed"
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/bar"
"""
[[qux, quw, fux, fuw]]
diff --git a/tests/functional/triggering/15-suicide.t b/tests/functional/triggering/15-suicide.t
deleted file mode 100644
index 3027e742a76..00000000000
--- a/tests/functional/triggering/15-suicide.t
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env bash
-# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
-# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-#-------------------------------------------------------------------------------
-# Test suicide triggering
-# (this is currently just a copy of the tutorial suicide example, but I
-# anticipate making it more fiendish at some point).
-. "$(dirname "$0")/test_header"
-set_test_number 2
-reftest
-exit
diff --git a/tests/functional/triggering/15-suicide/flow.cylc b/tests/functional/triggering/15-suicide/flow.cylc
deleted file mode 100644
index eed1d934b8c..00000000000
--- a/tests/functional/triggering/15-suicide/flow.cylc
+++ /dev/null
@@ -1,23 +0,0 @@
-[meta]
- title = "Hello, Goodbye, Suicide"
-[scheduler]
- allow implicit tasks = True
- [[events]]
- expected task failures = 1/goodbye
-
-[scheduling]
- [[graph]]
- R1 = """
- hello => goodbye?
- goodbye:fail? => really_goodbye
- goodbye? => !really_goodbye
- really_goodbye => !goodbye
- """
-[runtime]
- [[hello]]
- script = echo Hello World!
- [[goodbye]]
- script = """
- echo Goodbye ... oops!
- false
- """
diff --git a/tests/functional/triggering/15-suicide/reference.log b/tests/functional/triggering/15-suicide/reference.log
deleted file mode 100644
index c58830189ab..00000000000
--- a/tests/functional/triggering/15-suicide/reference.log
+++ /dev/null
@@ -1,5 +0,0 @@
-Initial point: 1
-Final point: 1
-1/hello -triggered off []
-1/goodbye -triggered off ['1/hello']
-1/really_goodbye -triggered off ['1/goodbye']
diff --git a/tests/functional/triggering/18-suicide-active.t b/tests/functional/triggering/18-suicide-active.t
deleted file mode 100644
index d4df07b1275..00000000000
--- a/tests/functional/triggering/18-suicide-active.t
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/env bash
-# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
-# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-#-------------------------------------------------------------------------------
-
-# Test warning for "suiciding while active"
-
-. "$(dirname "$0")/test_header"
-
-set_test_number 3
-
-install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
-
-run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
-
-workflow_run_ok "${TEST_NAME_BASE}-run" \
- cylc play --debug --no-detach "${WORKFLOW_NAME}"
-
-grep_workflow_log_ok "${TEST_NAME_BASE}-grep" "suiciding while active"
-
-purge
diff --git a/tests/functional/triggering/18-suicide-active/flow.cylc b/tests/functional/triggering/18-suicide-active/flow.cylc
deleted file mode 100644
index a76b82a15e6..00000000000
--- a/tests/functional/triggering/18-suicide-active/flow.cylc
+++ /dev/null
@@ -1,11 +0,0 @@
-# test "suiciding while active" warning
-[scheduler]
- [[events]]
- inactivity timeout = PT20S
- abort on inactivity timeout = True
-[scheduling]
- [[graph]]
- R1 = "foo:start => !foo"
-[runtime]
- [[foo]]
- script = sleep 10
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index 7f3fa488162..62994487624 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -17,6 +17,7 @@
from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler
+import logging
from typing import Any as Fixture
@@ -51,7 +52,7 @@ async def test__reset_job_timers(
process_execution_polling_intervals.
"""
schd = scheduler(flow(one_conf))
- async with start(schd):
+ async with start(schd, level=logging.DEBUG):
itask = schd.pool.get_tasks()[0]
itask.state.status = 'running'
itask.platform['execution polling intervals'] = [25]
diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py
index 3d75074cf15..43d6d50b520 100644
--- a/tests/integration/test_task_pool.py
+++ b/tests/integration/test_task_pool.py
@@ -35,7 +35,8 @@
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
- TASK_OUTPUT_SUCCEEDED
+ TASK_OUTPUT_SUCCEEDED,
+ TASK_OUTPUT_FAILED
)
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NONE
@@ -147,7 +148,7 @@ async def mod_example_flow(
"""
id_ = mod_flow(EXAMPLE_FLOW_CFG)
schd: 'Scheduler' = mod_scheduler(id_, paused_start=True)
- async with mod_run(schd):
+ async with mod_run(schd, level=logging.DEBUG):
yield schd
@@ -168,7 +169,7 @@ async def example_flow(
caplog.set_level(logging.INFO, CYLC_LOG)
id_ = flow(EXAMPLE_FLOW_CFG)
schd: 'Scheduler' = scheduler(id_)
- async with start(schd):
+ async with start(schd, level=logging.DEBUG):
yield schd
@@ -1197,7 +1198,7 @@ async def test_detect_incomplete_tasks(
}
})
schd = scheduler(id_)
- async with start(schd) as log:
+ async with start(schd, level=logging.DEBUG) as log:
itasks = schd.pool.get_tasks()
for itask in itasks:
itask.state_reset(is_queued=False)
@@ -1212,7 +1213,7 @@ async def test_detect_incomplete_tasks(
if itask.tdef.name == TASK_STATUS_EXPIRED:
assert log_filter(
log,
- contains=f"[{itask}] removed (expired)"
+ contains=f"[{itask}] removed from active task pool: expired"
)
# the task should have been removed
assert itask not in schd.pool.get_tasks()
@@ -1278,7 +1279,7 @@ async def test_set_failed_complete(
"""Test manual completion of an incomplete failed task."""
id_ = flow(one_conf)
schd = scheduler(id_)
- async with start(schd) as log:
+ async with start(schd, level=logging.DEBUG) as log:
one = schd.pool.get_tasks()[0]
one.state_reset(is_queued=False)
@@ -1294,7 +1295,7 @@ async def test_set_failed_complete(
schd.pool.set_prereqs_and_outputs([one.identity], None, None, ['all'])
assert log_filter(
- log, contains=f'[{one}] task completed')
+ log, contains=f'[{one}] removed from active task pool: completed')
db_outputs = db_select(
schd, True, 'task_outputs', 'outputs',
@@ -1874,3 +1875,159 @@ def max_cycle(tasks):
mod_blah.pool.compute_runahead()
after = mod_blah.pool.runahead_limit_point
assert bool(before != after) == expected
+
+
+async def test_fast_respawn(
+ example_flow: 'Scheduler',
+ caplog: pytest.LogCaptureFixture,
+) -> None:
+ """Immediate re-spawn of removed tasks is not allowed.
+
+ An immediate DB update is required to stop the respawn.
+ https://github.com/cylc/cylc-flow/pull/6067
+
+ """
+ task_pool = example_flow.pool
+
+ # find task 1/foo in the pool
+ foo = task_pool.get_task(IntegerPoint("1"), "foo")
+
+ # remove it from the pool
+ task_pool.remove(foo)
+ assert foo not in task_pool.get_tasks()
+
+ # attempt to spawn it again
+ itask = task_pool.spawn_task("foo", IntegerPoint("1"), {1})
+ assert itask is None
+ assert "Not spawning 1/foo - task removed" in caplog.text
+
+
+async def test_remove_active_task(
+ example_flow: 'Scheduler',
+ caplog: pytest.LogCaptureFixture,
+ log_filter: Callable,
+) -> None:
+ """Test warning on removing an active task."""
+
+ task_pool = example_flow.pool
+
+ # find task 1/foo in the pool
+ foo = task_pool.get_task(IntegerPoint("1"), "foo")
+
+ foo.state_reset(TASK_STATUS_RUNNING)
+ task_pool.remove(foo, "request")
+ assert foo not in task_pool.get_tasks()
+
+ assert log_filter(
+ caplog,
+ regex=(
+ "1/foo.*removed from active task pool:"
+ " request - active job orphaned"
+ ),
+ level=logging.WARNING
+ )
+
+
+async def test_remove_by_suicide(
+ flow,
+ scheduler,
+ start,
+ log_filter
+):
+ """Test task removal by suicide trigger.
+
+ * Suicide triggers should remove tasks from the pool.
+ * It should be possible to bring them back by manually triggering them.
+ * Removing a task manually (cylc remove) should work the same.
+ """
+ id_ = flow({
+ 'scheduler': {'allow implicit tasks': 'True'},
+ 'scheduling': {
+ 'graph': {
+ 'R1': '''
+ a? & b
+ a:failed? => !b
+ '''
+ },
+ }
+ })
+ schd: 'Scheduler' = scheduler(id_)
+ async with start(schd, level=logging.DEBUG) as log:
+ # it should start up with 1/a and 1/b
+ assert pool_get_task_ids(schd.pool) == ["1/a", "1/b"]
+ a = schd.pool.get_task(IntegerPoint("1"), "a")
+
+ # mark 1/a as failed and ensure 1/b is removed by suicide trigger
+ schd.pool.spawn_on_output(a, TASK_OUTPUT_FAILED)
+ assert log_filter(
+ log,
+ regex="1/b.*removed from active task pool: suicide trigger"
+ )
+ assert pool_get_task_ids(schd.pool) == ["1/a"]
+
+ # ensure that we are able to bring 1/b back by triggering it
+ log.clear()
+ schd.pool.force_trigger_tasks(['1/b'], ['1'])
+ assert log_filter(
+ log,
+ regex='1/b.*added to active task pool',
+ )
+
+ # remove 1/b by request (cylc remove)
+ schd.command_remove_tasks(['1/b'])
+ assert log_filter(
+ log,
+ regex='1/b.*removed from active task pool: request',
+ )
+
+ # ensure that we are able to bring 1/b back by triggering it
+ log.clear()
+ schd.pool.force_trigger_tasks(['1/b'], ['1'])
+ assert log_filter(
+ log,
+ regex='1/b.*added to active task pool',
+ )
+
+
+async def test_remove_no_respawn(flow, scheduler, start, log_filter):
+ """Ensure that removed tasks stay removed.
+
+ If a task is removed by suicide trigger or "cylc remove", then it should
+ not be automatically spawned at a later time.
+ """
+ id_ = flow({
+ 'scheduling': {
+ 'graph': {
+ 'R1': 'a & b => z',
+ },
+ },
+ })
+ schd: 'Scheduler' = scheduler(id_)
+ async with start(schd, level=logging.DEBUG) as log:
+ a1 = schd.pool.get_task(IntegerPoint("1"), "a")
+ b1 = schd.pool.get_task(IntegerPoint("1"), "b")
+ assert a1, '1/a should have been spawned on startup'
+ assert b1, '1/b should have been spawned on startup'
+
+ # mark one of the upstream tasks as succeeded, 1/z should spawn
+ schd.pool.spawn_on_output(a1, TASK_OUTPUT_SUCCEEDED)
+ schd.workflow_db_mgr.process_queued_ops()
+ z1 = schd.pool.get_task(IntegerPoint("1"), "z")
+ assert z1, '1/z should have been spawned after 1/a succeeded'
+
+ # manually remove 1/z, it should be removed from the pool
+ schd.command_remove_tasks(['1/z'])
+ schd.workflow_db_mgr.process_queued_ops()
+ z1 = schd.pool.get_task(IntegerPoint("1"), "z")
+ assert z1 is None, '1/z should have been removed (by request)'
+
+ # mark the other upstream task as succeeded, 1/z should not be
+ # respawned as a result
+ schd.pool.spawn_on_output(b1, TASK_OUTPUT_SUCCEEDED)
+ assert log_filter(
+ log, contains='Not spawning 1/z - task removed'
+ )
+ z1 = schd.pool.get_task(IntegerPoint("1"), "z")
+ assert (
+ z1 is None
+ ), '1/z should have stayed removed (but has been added back into the pool'