Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish never-spawned from never-submitted. #6067

Merged
merged 13 commits into from
Apr 29, 2024
1 change: 1 addition & 0 deletions changes.d/6067.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed very quick respawn after task removal.
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
75 changes: 54 additions & 21 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class TaskPool:

ERR_TMPL_NO_TASKID_MATCH = "No matching tasks found: {0}"
ERR_PREFIX_TASK_NOT_ON_SEQUENCE = "Invalid cycle point for task: {0}, {1}"
SUICIDE_MSG = "suicide"
SUICIDE_MSG = "suicide trigger"

def __init__(
self,
Expand Down Expand Up @@ -807,10 +807,11 @@ def remove(self, itask, reason=None):
itask.flow_nums
)

msg = "removed from active task pool"
if reason is None:
msg = "task completed"
msg += ": completed"
else:
msg = f"removed ({reason})"
msg += f": {reason}"

if itask.is_xtrigger_sequential:
self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity)
Expand All @@ -837,7 +838,17 @@ def remove(self, itask, reason=None):
# Event-driven final update of task_states table.
# TODO: same for datastore (still updated by scheduler loop)
self.workflow_db_mgr.put_update_task_state(itask)
LOG.info(f"[{itask}] {msg}")

level = logging.INFO
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
):
level = logging.WARNING
msg += " - active job orphaned"

LOG.log(level, f"[{itask}] {msg}")
del itask

def get_tasks(self) -> List[TaskProxy]:
Expand Down Expand Up @@ -1392,14 +1403,12 @@ def spawn_on_output(self, itask, output, forced=False):
suicide.append(t)

for c_task in suicide:
msg = self.__class__.SUICIDE_MSG
if c_task.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
is_held=False):
msg += " suiciding while active"
self.remove(c_task, msg)
self.remove(c_task, self.__class__.SUICIDE_MSG)

if suicide:
# Update DB now in case of very quick respawn attempt.
# See https://github.com/cylc/cylc-flow/issues/6066
self.workflow_db_mgr.process_queued_ops()

self.remove_if_complete(itask, output)

Expand Down Expand Up @@ -1555,17 +1564,33 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
) -> Tuple[int, str, bool]:
"""Get history of previous submits for this task."""
) -> Tuple[bool, int, str, bool]:
"""Get history of previous submits for this task.

Args:
name: task name
point: task cycle point
flow_nums: task flow numbers

Returns:
never_spawned: if task never spawned before
submit_num: submit number of previous submit
prev_status: task status of previous sumbit
prev_flow_wait: if previous submit was a flow-wait task

"""
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num: int = max(s[0] for s in info)
except ValueError:
# never spawned before in any flow
# never spawned in any flow
submit_num = 0
never_spawned = True
else:
never_spawned = False
# (submit_num could still be zero, if removed before submit)

prev_status: str = TASK_STATUS_WAITING
prev_flow_wait = False
Expand All @@ -1582,7 +1607,7 @@ def _get_task_history(
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the finished one, if any.

return submit_num, prev_status, prev_flow_wait
return never_spawned, submit_num, prev_status, prev_flow_wait

def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
Expand Down Expand Up @@ -1619,10 +1644,19 @@ def spawn_task(
if not self.can_be_spawned(name, point):
return None

submit_num, prev_status, prev_flow_wait = (
never_spawned, submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

if (
not never_spawned and
not prev_flow_wait and
submit_num == 0
):
# Previous instance removed before completing any outputs.
LOG.info(f"Not spawning {point}/{name}: already used in this flow")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this message is easy to understand

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, but how to describe what's happening concisely?

Basically, the flow (on this branch of the graph) came to a halt at this point because the task was removed earlier by suicide trigger or cylc remove. That definitely warrants some kind of log message.

I'll change it to this (subject to further litigation, of course):

Suggested change
LOG.info(f"Not spawning {point}/{name}: already used in this flow")
LOG.info(f"Flow stopping at {point}/{name} - task previously removed")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, "flow blocked at" might be better.

Copy link
Member

@MetRonnie MetRonnie Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like "flow blocked" I'm afraid! How about:

Skipping {point}/{name} {stringify_flow_nums(flow_nums)} - task previously removed

Or just

Not spawning {point}/{name} {stringify_flow_nums(flow_nums)} - task previously removed

as it was the "already used in this flow" that I think doesn't make much sense

Copy link
Member Author

@hjoliver hjoliver Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "flow blocked" is better than your two suggestions - it correctly indicates that the flow will not continue here because the task was removed.

"Skipping" suggests the flow will skip over the removed task, which is not the case.

"Spawning" (as you'll have seen in the chat) is I think being expunged from user terminology.

Copy link
Member Author

@hjoliver hjoliver Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think options are as follows (or some close variant thereof):

  • flow blocked here, the task was removed earlier
    • (note addition of "here" to avoid implicating the entire flow)
  • not spawning task, it was removed earlier

Personally I think in this instance "flow blocked" is more user-friendly, in that it is more or less self-explanatory, compared to "not spawning".

But given that @oliver-sanders agrees "not spawning" is correct, and (as I've just shown) spawning probably has to remain a user-facing term, perhaps we could just stick with that.

It needs to be logged at INFO level though, to explain why the flow does not continue here, at this time.

Copy link
Member

@oliver-sanders oliver-sanders Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, the term "spawn" ... was never a user facing term in the first place.

Ah, really?!

That's Cylc 7!

I'm very confused by these comments. It was your own argument that users should not need to understand the management (insertion or removal) of task proxies or task pool logic in general which I got on board with. It was a primary goal of SoD to remove the need for us to communicate this internal implementation detail, and a necessary side effect as SoD would make it harder for users to monitor the task pool itself. That's why spawning was retired right, s/spawn/set-outputs, users shouldn't need to understand spawning, because SoD will ensure the task proxy is always there when needed right? E.G. #2143. I agreed with you on this at the time, it was a good idea!

To quote your own proposal:

Users should not have to know about the "task pool" and the concepts of task "insert" and "remove" anymore.

I appreciate this is going back a few years now, maybe reality crept in and the thinking has changed. Perhaps it was a bit ambitions to presume we could hide the internal logic. Maybe we should write up the SoD model and communicate it to users as essential reading? We could do with writing this up in any case.

It needs to be logged at INFO level though, to explain why the flow does not continue here, at this time.

I'm not convinced it does. We are trying to communicate to users the graph model, not the internal SoD implementation (or at least I thought we were).

Take this graph for example:

a => b => c => d
a => x
b => x
c => x
d => x

If the user runs cylc remove //x, they will get a message along the lines of "x removed".

They don't need to see four "not re-spawning x" messages in a row as the graph proceeds, there is no new information in these messages. Moreover, it isn't communicating anything to them about the graph. Of course x isn't be re-spawned, I removed it! On purpose!

Copy link
Member

@oliver-sanders oliver-sanders Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My approval still stands of course, I'll leave you two to find something you can agree on.

I was trying to overt a terminology argument not to stoke one, I accepted the original phrasing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think "flow blocked" sounds too negative and some users will see it and think something has gone wrong

Copy link
Member Author

@hjoliver hjoliver Apr 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK @MetRonnie, let's try to agree. I don't see "flow blocked" as negative, but perhaps we could add more information to explain the reason for it.

We've established that "skipping" isn't right. Are you happy with either of these, or do you have another suggestion?:

  • "flow blocked by previous removal of [task]"
  • "not spawning [task] (previously removed)"

I like "flow blocked" because it automatically conveys the fact that the flow will not continue from this point. But having argued that "spawn" is still needed to explain how flows evolve, I don't mind if we use that here too.

return None

itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
Expand Down Expand Up @@ -1653,8 +1687,6 @@ def spawn_task(
if itask.transient and not force:
return None

# (else not previously finishedr, so run it)

if not itask.transient:
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
Expand Down Expand Up @@ -2117,8 +2149,9 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue

submit_num, _prev_status, prev_fwait = self._get_task_history(
name, point, flow_nums)
_, submit_num, _prev_status, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)

itask = TaskProxy(
self.tokens,
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/cylc-set/02-off-flow-out.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ reftest_run

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/a_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/a_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/b_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/b_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/c_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/c_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* completed'

purge
2 changes: 1 addition & 1 deletion tests/functional/hold-release/05-release.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
inherit = STOP
script = """
cylc__job__poll_grep_workflow_log -E \
'1/dog1/01:succeeded.* task completed'
'1/dog1/01:succeeded.* completed'
cylc stop "${CYLC_WORKFLOW_ID}"
"""
__FLOW_CONFIG__
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/spawn-on-demand/09-set-outputs/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/foo"

# Set bar outputs after it is gone from the pool.
cylc__job__poll_grep_workflow_log -E "1/bar.* task completed"
cylc__job__poll_grep_workflow_log -E "1/bar.* completed"
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/bar"
"""
[[qux, quw, fux, fuw]]
Expand Down
24 changes: 0 additions & 24 deletions tests/functional/triggering/15-suicide.t

This file was deleted.

23 changes: 0 additions & 23 deletions tests/functional/triggering/15-suicide/flow.cylc

This file was deleted.

5 changes: 0 additions & 5 deletions tests/functional/triggering/15-suicide/reference.log

This file was deleted.

34 changes: 0 additions & 34 deletions tests/functional/triggering/18-suicide-active.t

This file was deleted.

11 changes: 0 additions & 11 deletions tests/functional/triggering/18-suicide-active/flow.cylc

This file was deleted.

Loading
Loading