From 0d105f728d7775656dc8a090c89eccd86b99948b Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 28 Oct 2020 12:32:27 +1300 Subject: [PATCH] Simplify logging of prerequisites and orphans. --- cylc/flow/scheduler.py | 11 ++- cylc/flow/task_pool.py | 83 +++++++++++-------- .../events/26-suite-stalled-dump-prereq.t | 23 +++-- .../26-suite-stalled-dump-prereq/flow.cylc | 2 +- .../events/27-suite-stalled-dump-prereq-fam.t | 27 ++++-- .../flow.cylc | 3 +- .../hold-release/02-hold-on-spawn.t | 2 +- 7 files changed, 92 insertions(+), 59 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index d68c6c20146..dfcdcdd15ba 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1586,9 +1586,7 @@ def check_suite_stalled(self): return self.is_stalled = self.pool.is_stalled() if self.is_stalled: - message = 'suite stalled' - LOG.warning(message) - self.run_event_handlers(self.EVENT_STALLED, message) + self.run_event_handlers(self.EVENT_STALLED, 'suite stalled') self.pool.report_unmet_deps() if self._get_events_conf('abort on stalled'): raise SchedulerError('Abort on suite stalled is set') @@ -1671,7 +1669,9 @@ async def shutdown(self, reason): self.proc_pool.process() if self.pool is not None: - self.pool.report_unmet_deps() + if not self.is_stalled: + # (else already reported) + self.pool.report_unmet_deps() self.pool.warn_stop_orphans() try: self.suite_db_mgr.put_task_event_timers(self.task_events_mgr) @@ -1747,8 +1747,7 @@ def stop_clock_done(self): return False def check_auto_shutdown(self): - """Check if we should do an automatic shutdown: main pool empty. - """ + """Check if we should do an automatic shutdown: main pool empty.""" if not self.can_auto_stop: return False self.pool.release_runahead_tasks() diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 10c2fbed115..a737f7ec2ab 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -175,7 +175,6 @@ def __init__(self, config, suite_db_mgr, task_events_mgr, job_pool): self.is_held = False self.hold_point = None - self.stuck_future_tasks = [] self.abs_outputs_done = set() self.stop_task_id = None @@ -734,15 +733,6 @@ def get_ready_tasks(self): return ready_tasks - def task_has_future_trigger_overrun(self, itask): - """Check for future triggers extending beyond the final cycle.""" - if not self.stop_point: - return False - for pct in itask.state.prerequisites_get_target_points(): - if pct > self.stop_point: - return True - return False - def get_min_point(self): """Return the minimum cycle point currently in the pool.""" cycles = list(self.pool) @@ -920,16 +910,23 @@ def can_stop(self, stop_mode): def warn_stop_orphans(self): """Log (warning) orphaned tasks on suite stop.""" + orphans = [] + orphans_kill_failed = [] for itask in self.get_tasks(): - if ( - itask.state(*TASK_STATUSES_ACTIVE) - and itask.state.kill_failed - ): - LOG.warning("%s: orphaned task (%s, kill failed)" % ( - itask.identity, itask.state.status)) - elif itask.state(*TASK_STATUSES_ACTIVE): - LOG.warning("%s: orphaned task (%s)" % ( - itask.identity, itask.state.status)) + if itask.state(*TASK_STATUSES_ACTIVE): + if itask.state.kill_failed: + orphans_kill_failed.append(itask) + else: + orphans.append(itask) + if orphans_kill_failed: + LOG.warning("Orphaned task jobs (kill failed):") + for itask in orphans_kill_failed: + LOG.warning(f"* {itask.identity} ({itask.state.status})") + if orphans: + LOG.warning("Orphaned task jobs:") + for itask in orphans: + LOG.warning(f"* {itask.identity} ({itask.state.status})") + for key1, point, name, submit_num in self.task_events_mgr.event_timers: LOG.warning("%s/%s/%s: incomplete task event handler %s" % ( point, name, submit_num, key1)) @@ -943,17 +940,22 @@ def is_stalled(self): """ if self.is_held: return False + unhandled_failed = [] for itask in self.get_tasks(): if itask.state(TASK_STATUS_FAILED): - return True - return False + unhandled_failed.append(itask) + if unhandled_failed: + LOG.warning("Suite stalled with unhandled failed tasks:") + for itask in unhandled_failed: + LOG.warning(f"* {itask.identity}") + return True + else: + return False def report_unmet_deps(self): - """Log unmet dependencies on stall or shutdown. - - (The runahead pool contains only partially satisfied tasks.) - """ + """Log unmet dependencies on stall or shutdown.""" prereqs_map = {} + # Partially satisfied tasks are hidden in the runahead pool. for itask in self.get_rh_tasks(): prereqs_map[itask.identity] = [] for prereq_str, is_met in itask.state.prerequisites_dump(): @@ -962,6 +964,10 @@ def report_unmet_deps(self): # prune tree to ignore items that are elsewhere in it for id_, prereqs in list(prereqs_map.copy().items()): + if not prereqs: + # (tasks in runahead pool that are not unsatisfied) + del prereqs_map[id_] + continue for prereq in prereqs: prereq_strs = prereq.split() if prereq_strs[0] == "LABEL:": @@ -975,10 +981,12 @@ def report_unmet_deps(self): del prereqs_map[id_] break - for id_, prereqs in prereqs_map.items(): - LOG.warning("Unmet prerequisites for %s:" % id_) - for prereq in prereqs: - LOG.warning(" * %s" % prereq) + if prereqs_map: + LOG.warning("Some partially satisfied prerequisites left over:") + for id_, prereqs in prereqs_map.items(): + LOG.warning("%s is waiting on:" % id_) + for prereq in prereqs: + LOG.warning("* %s" % prereq) def set_hold_point(self, point): """Set the point after which tasks must be held.""" @@ -1194,13 +1202,16 @@ def spawn_task(self, name, point, flow_label=None, reflow=True, "[%s] -holding (beyond suite hold point) %s", itask, self.hold_point) itask.state.reset(is_held=True) - elif (self.stop_point and itask.point <= self.stop_point and - self.task_has_future_trigger_overrun(itask)): - # Record tasks waiting on a future trigger beyond the stop point. - # (We ignore these waiting tasks when considering shutdown). - LOG.info("[%s] -holding (future trigger beyond stop point)", itask) - self.stuck_future_tasks.append(itask.identity) - elif (self.is_held + if self.stop_point and itask.point <= self.stop_point: + future_trigger_overrun = False + for pct in itask.state.prerequisites_get_target_points(): + if pct > self.stop_point: + future_trigger_overrun = True + break + if future_trigger_overrun: + LOG.warning("[%s] -won't run: depends on a " + "task beyond the stop point", itask) + if (self.is_held and itask.state(TASK_STATUS_WAITING, is_held=False)): # Hold newly-spawned tasks in a held suite (e.g. due to manual # triggering of a held task). diff --git a/tests/functional/events/26-suite-stalled-dump-prereq.t b/tests/functional/events/26-suite-stalled-dump-prereq.t index 21e8672ccc3..1f630c3b0bc 100755 --- a/tests/functional/events/26-suite-stalled-dump-prereq.t +++ b/tests/functional/events/26-suite-stalled-dump-prereq.t @@ -17,19 +17,28 @@ #------------------------------------------------------------------------------- # Test suite event handler, dump unmet prereqs on stall . "$(dirname "$0")/test_header" -set_test_number 5 +set_test_number 8 install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" \ cylc validate "${SUITE_NAME}" + suite_run_fail "${TEST_NAME_BASE}-run" \ cylc run --reference-test --debug --no-detach "${SUITE_NAME}" -grep_ok "Abort on suite stalled is set" \ - "${TEST_NAME_BASE}-run.stderr" -grep_ok "WARNING - Unmet prerequisites for foo.20100101T0600Z:" \ - "${TEST_NAME_BASE}-run.stderr" -grep_ok "WARNING - \\* bar.20100101T0000Z succeeded" \ - "${TEST_NAME_BASE}-run.stderr" + +grep_ok "Abort on suite stalled is set" "${TEST_NAME_BASE}-run.stderr" + +grep_ok "WARNING - Suite stalled with unhandled failed tasks:" \ + "${TEST_NAME_BASE}-run.stderr" +grep_ok "WARNING - \* bar.20100101T0000Z" \ + "${TEST_NAME_BASE}-run.stderr" + +grep_ok "WARNING - Some partially satisfied prerequisites left over:" \ + "${TEST_NAME_BASE}-run.stderr" +grep_ok "WARNING - foo.20100101T0600Z is waiting on:" \ + "${TEST_NAME_BASE}-run.stderr" +grep_ok "WARNING - \* bar.20100101T0000Z succeeded" \ + "${TEST_NAME_BASE}-run.stderr" purge_suite "${SUITE_NAME}" exit diff --git a/tests/functional/events/26-suite-stalled-dump-prereq/flow.cylc b/tests/functional/events/26-suite-stalled-dump-prereq/flow.cylc index 5e418725c94..3c8230591cf 100644 --- a/tests/functional/events/26-suite-stalled-dump-prereq/flow.cylc +++ b/tests/functional/events/26-suite-stalled-dump-prereq/flow.cylc @@ -7,7 +7,7 @@ [scheduling] initial cycle point = 20100101T0000Z [[graph]] - # will abort on stalled with failed bar, waiting foo, at T00 + # will abort on stalled with unhandled failed bar T00, T06, T12, T18 = foo[-PT6H] & bar[-PT6H] => foo => bar => qux T12 = qux[-PT6H] => baz [runtime] diff --git a/tests/functional/events/27-suite-stalled-dump-prereq-fam.t b/tests/functional/events/27-suite-stalled-dump-prereq-fam.t index 4563036cc62..636fbe8897b 100755 --- a/tests/functional/events/27-suite-stalled-dump-prereq-fam.t +++ b/tests/functional/events/27-suite-stalled-dump-prereq-fam.t @@ -17,22 +17,37 @@ #------------------------------------------------------------------------------- # Test suite event handler, dump unmet prereqs on stall . "$(dirname "$0")/test_header" -set_test_number 7 +set_test_number 12 install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + run_ok "${TEST_NAME_BASE}-validate" \ cylc validate "${SUITE_NAME}" + suite_run_fail "${TEST_NAME_BASE}-run" \ cylc run --reference-test --debug --no-detach "${SUITE_NAME}" -grep_ok "Abort on suite stalled is set" \ + +grep_ok "Abort on suite stalled is set" "${TEST_NAME_BASE}-run.stderr" + +grep_ok "WARNING - Suite stalled with unhandled failed tasks:" \ + "${TEST_NAME_BASE}-run.stderr" +grep_ok "WARNING - \* foo.1" \ "${TEST_NAME_BASE}-run.stderr" -grep_ok "WARNING - Unmet prerequisites for f_1.1:" \ + +grep_ok "WARNING - Some partially satisfied prerequisites left over:" \ + "${TEST_NAME_BASE}-run.stderr" +grep_ok "WARNING - f_1.1 is waiting on:" \ + "${TEST_NAME_BASE}-run.stderr" +grep_ok "WARNING - \* foo.1 succeeded" \ "${TEST_NAME_BASE}-run.stderr" -grep_ok "WARNING - Unmet prerequisites for f_3.1:" \ +grep_ok "WARNING - f_2.1 is waiting on:" \ "${TEST_NAME_BASE}-run.stderr" -grep_ok "WARNING - Unmet prerequisites for f_2.1" \ +grep_ok "WARNING - \* foo.1 succeeded" \ "${TEST_NAME_BASE}-run.stderr" -grep_ok "WARNING - \\* foo.1 succeeded" \ +grep_ok "WARNING - f_3.1 is waiting on:" \ "${TEST_NAME_BASE}-run.stderr" +grep_ok "WARNING - \* foo.1 succeeded" \ + "${TEST_NAME_BASE}-run.stderr" + purge_suite "${SUITE_NAME}" exit diff --git a/tests/functional/events/27-suite-stalled-dump-prereq-fam/flow.cylc b/tests/functional/events/27-suite-stalled-dump-prereq-fam/flow.cylc index d23faa16e39..0626d58f4ed 100644 --- a/tests/functional/events/27-suite-stalled-dump-prereq-fam/flow.cylc +++ b/tests/functional/events/27-suite-stalled-dump-prereq-fam/flow.cylc @@ -6,8 +6,7 @@ expected task failures = foo.1 [scheduling] [[graph]] - # Goo added to spawn waiting FAM and thereby cause a stall with - # unsatisfied waiting tasks. + # will abort on stalled with unhandled failed foo R1 = """foo & goo => FAM FAM:succeed-any => bar""" [runtime] diff --git a/tests/functional/hold-release/02-hold-on-spawn.t b/tests/functional/hold-release/02-hold-on-spawn.t index 76903ec8a65..cbbc2e7c49f 100755 --- a/tests/functional/hold-release/02-hold-on-spawn.t +++ b/tests/functional/hold-release/02-hold-on-spawn.t @@ -30,7 +30,7 @@ __FLOW_CONFIG__ suite_run_ok "${TEST_NAME_BASE}-run" cylc run --hold "${SUITE_NAME}" cylc release "${SUITE_NAME}" foo.1 -# foo.1 should run and spawn bar.1 as waiting +# foo.1 should run and spawn bar.1 as waiting and held poll_grep_suite_log 'spawned bar\.1'