Skip to content

Commit

Permalink
Simplify logging of prerequisites and orphans.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Oct 28, 2020
1 parent f9d5e79 commit 00b9f93
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 61 deletions.
11 changes: 5 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,9 +1586,7 @@ def check_suite_stalled(self):
return
self.is_stalled = self.pool.is_stalled()
if self.is_stalled:
message = 'suite stalled'
LOG.warning(message)
self.run_event_handlers(self.EVENT_STALLED, message)
self.run_event_handlers(self.EVENT_STALLED, 'suite stalled')
self.pool.report_unmet_deps()
if self._get_events_conf('abort on stalled'):
raise SchedulerError('Abort on suite stalled is set')
Expand Down Expand Up @@ -1671,7 +1669,9 @@ async def shutdown(self, reason):
self.proc_pool.process()

if self.pool is not None:
self.pool.report_unmet_deps()
if not self.is_stalled:
# (else already reported)
self.pool.report_unmet_deps()
self.pool.warn_stop_orphans()
try:
self.suite_db_mgr.put_task_event_timers(self.task_events_mgr)
Expand Down Expand Up @@ -1747,8 +1747,7 @@ def stop_clock_done(self):
return False

def check_auto_shutdown(self):
"""Check if we should do an automatic shutdown: main pool empty.
"""
"""Check if we should do an automatic shutdown: main pool empty."""
if not self.can_auto_stop:
return False
self.pool.release_runahead_tasks()
Expand Down
83 changes: 47 additions & 36 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ def __init__(self, config, suite_db_mgr, task_events_mgr, job_pool):

self.is_held = False
self.hold_point = None
self.stuck_future_tasks = []
self.abs_outputs_done = set()

self.stop_task_id = None
Expand Down Expand Up @@ -734,15 +733,6 @@ def get_ready_tasks(self):

return ready_tasks

def task_has_future_trigger_overrun(self, itask):
"""Check for future triggers extending beyond the final cycle."""
if not self.stop_point:
return False
for pct in itask.state.prerequisites_get_target_points():
if pct > self.stop_point:
return True
return False

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
cycles = list(self.pool)
Expand Down Expand Up @@ -920,16 +910,23 @@ def can_stop(self, stop_mode):

def warn_stop_orphans(self):
"""Log (warning) orphaned tasks on suite stop."""
orphans = []
orphans_kill_failed = []
for itask in self.get_tasks():
if (
itask.state(*TASK_STATUSES_ACTIVE)
and itask.state.kill_failed
):
LOG.warning("%s: orphaned task (%s, kill failed)" % (
itask.identity, itask.state.status))
elif itask.state(*TASK_STATUSES_ACTIVE):
LOG.warning("%s: orphaned task (%s)" % (
itask.identity, itask.state.status))
if itask.state(*TASK_STATUSES_ACTIVE):
if itask.state.kill_failed:
orphans_kill_failed.append(itask)
else:
orphans.append(itask)
if orphans_kill_failed:
LOG.warning("Orphaned task jobs (kill failed):")
for itask in orphans_kill_failed:
LOG.warning(f"* {itask.identity} ({itask.state.status})")
if orphans:
LOG.warning("Orphaned task jobs:")
for itask in orphans:
LOG.warning(f"* {itask.identity} ({itask.state.status})")

for key1, point, name, submit_num in self.task_events_mgr.event_timers:
LOG.warning("%s/%s/%s: incomplete task event handler %s" % (
point, name, submit_num, key1))
Expand All @@ -943,17 +940,22 @@ def is_stalled(self):
"""
if self.is_held:
return False
unhandled_failed = []
for itask in self.get_tasks():
if itask.state(TASK_STATUS_FAILED):
return True
return False
unhandled_failed.append(itask)
if unhandled_failed:
LOG.warning("Suite stalled with unhandled failed tasks:")
for itask in unhandled_failed:
LOG.warning(f"* {itask.identity}")
return True
else:
return False

def report_unmet_deps(self):
"""Log unmet dependencies on stall or shutdown.
(The runahead pool contains only partially satisfied tasks.)
"""
"""Log unmet dependencies on stall or shutdown."""
prereqs_map = {}
# Partially satisfied tasks are hidden in the runahead pool.
for itask in self.get_rh_tasks():
prereqs_map[itask.identity] = []
for prereq_str, is_met in itask.state.prerequisites_dump():
Expand All @@ -962,6 +964,10 @@ def report_unmet_deps(self):

# prune tree to ignore items that are elsewhere in it
for id_, prereqs in list(prereqs_map.copy().items()):
if not prereqs:
# (tasks in runahead pool that are not unsatisfied)
del prereqs_map[id_]
continue
for prereq in prereqs:
prereq_strs = prereq.split()
if prereq_strs[0] == "LABEL:":
Expand All @@ -975,10 +981,12 @@ def report_unmet_deps(self):
del prereqs_map[id_]
break

for id_, prereqs in prereqs_map.items():
LOG.warning("Unmet prerequisites for %s:" % id_)
for prereq in prereqs:
LOG.warning(" * %s" % prereq)
if prereqs_map:
LOG.warning("Some partially satisfied prerequisites left over:")
for id_, prereqs in prereqs_map.items():
LOG.warning("%s is waiting on:" % id_)
for prereq in prereqs:
LOG.warning("* %s" % prereq)

def set_hold_point(self, point):
"""Set the point after which tasks must be held."""
Expand Down Expand Up @@ -1194,13 +1202,16 @@ def spawn_task(self, name, point, flow_label=None, reflow=True,
"[%s] -holding (beyond suite hold point) %s",
itask, self.hold_point)
itask.state.reset(is_held=True)
elif (self.stop_point and itask.point <= self.stop_point and
self.task_has_future_trigger_overrun(itask)):
# Record tasks waiting on a future trigger beyond the stop point.
# (We ignore these waiting tasks when considering shutdown).
LOG.info("[%s] -holding (future trigger beyond stop point)", itask)
self.stuck_future_tasks.append(itask.identity)
elif (self.is_held
if self.stop_point and itask.point <= self.stop_point:
future_trigger_overrun = False
for pct in itask.state.prerequisites_get_target_points():
if pct > self.stop_point:
future_trigger_overrun = True
break
if future_trigger_overrun:
LOG.warning("[%s] -won't run: depends on a "
"task beyond the stop point", itask)
if (self.is_held
and itask.state(TASK_STATUS_WAITING, is_held=False)):
# Hold newly-spawned tasks in a held suite (e.g. due to manual
# triggering of a held task).
Expand Down
23 changes: 16 additions & 7 deletions tests/functional/events/26-suite-stalled-dump-prereq.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@
#-------------------------------------------------------------------------------
# Test suite event handler, dump unmet prereqs on stall
. "$(dirname "$0")/test_header"
set_test_number 5
set_test_number 8

install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" \
cylc validate "${SUITE_NAME}"

suite_run_fail "${TEST_NAME_BASE}-run" \
cylc run --reference-test --debug --no-detach "${SUITE_NAME}"
grep_ok "Abort on suite stalled is set" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - Unmet prerequisites for foo.20100101T0600Z:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - \\* bar.20100101T0000Z succeeded" \
"${TEST_NAME_BASE}-run.stderr"

grep_ok "Abort on suite stalled is set" "${TEST_NAME_BASE}-run.stderr"

grep_ok "WARNING - Suite stalled with unhandled failed tasks:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - \* bar.20100101T0000Z" \
"${TEST_NAME_BASE}-run.stderr"

grep_ok "WARNING - Some partially satisfied prerequisites left over:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - foo.20100101T0600Z is waiting on:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - \* bar.20100101T0000Z succeeded" \
"${TEST_NAME_BASE}-run.stderr"

purge_suite "${SUITE_NAME}"
exit
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[scheduling]
initial cycle point = 20100101T0000Z
[[graph]]
# will abort on stalled with failed bar, waiting foo, at T00
# will abort on stalled with unhandled failed bar
T00, T06, T12, T18 = foo[-PT6H] & bar[-PT6H] => foo => bar => qux
T12 = qux[-PT6H] => baz
[runtime]
Expand Down
27 changes: 21 additions & 6 deletions tests/functional/events/27-suite-stalled-dump-prereq-fam.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,37 @@
#-------------------------------------------------------------------------------
# Test suite event handler, dump unmet prereqs on stall
. "$(dirname "$0")/test_header"
set_test_number 7
set_test_number 12

install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-validate" \
cylc validate "${SUITE_NAME}"

suite_run_fail "${TEST_NAME_BASE}-run" \
cylc run --reference-test --debug --no-detach "${SUITE_NAME}"
grep_ok "Abort on suite stalled is set" \

grep_ok "Abort on suite stalled is set" "${TEST_NAME_BASE}-run.stderr"

grep_ok "WARNING - Suite stalled with unhandled failed tasks:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - \* foo.1" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - Unmet prerequisites for f_1.1:" \

grep_ok "WARNING - Some partially satisfied prerequisites left over:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - f_1.1 is waiting on:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - \* foo.1 succeeded" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - Unmet prerequisites for f_3.1:" \
grep_ok "WARNING - f_2.1 is waiting on:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - Unmet prerequisites for f_2.1" \
grep_ok "WARNING - \* foo.1 succeeded" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - \\* foo.1 succeeded" \
grep_ok "WARNING - f_3.1 is waiting on:" \
"${TEST_NAME_BASE}-run.stderr"
grep_ok "WARNING - \* foo.1 succeeded" \
"${TEST_NAME_BASE}-run.stderr"

purge_suite "${SUITE_NAME}"
exit
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
expected task failures = foo.1
[scheduling]
[[graph]]
# Goo added to spawn waiting FAM and thereby cause a stall with
# unsatisfied waiting tasks.
# will abort on stalled with unhandled failed foo
R1 = """foo & goo => FAM
FAM:succeed-any => bar"""
[runtime]
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/02-hold-on-spawn.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ __FLOW_CONFIG__
suite_run_ok "${TEST_NAME_BASE}-run" cylc run --hold "${SUITE_NAME}"

cylc release "${SUITE_NAME}" foo.1
# foo.1 should run and spawn bar.1 as waiting
# foo.1 should run and spawn bar.1 as waiting and held

poll_grep_suite_log 'spawned bar\.1'

Expand Down
5 changes: 3 additions & 2 deletions tests/functional/shutdown/09-now2.t
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
# Test "cylc stop --now --now".
. "$(dirname "$0")/test_header"

set_test_number 8
set_test_number 9

install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"
suite_run_ok "${TEST_NAME_BASE}-run" cylc run --no-detach "${SUITE_NAME}"
LOGD="$RUN_DIR/${SUITE_NAME}/log"
grep_ok 'INFO - Suite shutting down - REQUEST(NOW-NOW)' "${LOGD}/suite/log"
grep_ok 'WARNING - t1.1: orphaned task (running)' "${LOGD}/suite/log"
grep_ok 'WARNING - Orphaned task jobs' "${LOGD}/suite/log"
grep_ok 'WARNING - \* t1.1 (running)' "${LOGD}/suite/log"
JLOGD="${LOGD}/job/1/t1/01"
# Check that t1.1 event handler runs
run_fail "${TEST_NAME_BASE}-activity-log-succeeded" \
Expand Down

0 comments on commit 00b9f93

Please sign in to comment.