Skip to content

Commit

Permalink
Merge pull request #3823 from hjoliver/hide-waiting-tasks
Browse files Browse the repository at this point in the history
Hide waiting tasks from n=0.
  • Loading branch information
hjoliver authored Nov 2, 2020
2 parents c40d657 + 131ec15 commit 30b0b52
Show file tree
Hide file tree
Showing 22 changed files with 184 additions and 260 deletions.
38 changes: 13 additions & 25 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
TASK_STATUSES_SUCCESS,
TASK_STATUS_FAILED)
from cylc.flow.templatevars import load_template_vars
from cylc.flow import __version__ as CYLC_VERSION
Expand Down Expand Up @@ -1524,7 +1523,7 @@ async def update_data_structure(self):
updated_nodes = set(updated_tasks).union(
self.pool.get_pool_change_tasks())
if (
has_updated or
updated_nodes or
self.data_store_mgr.updates_pending or
self.job_pool.updates_pending
):
Expand Down Expand Up @@ -1586,10 +1585,8 @@ def check_suite_stalled(self):
return
self.is_stalled = self.pool.is_stalled()
if self.is_stalled:
message = 'suite stalled'
LOG.warning(message)
self.run_event_handlers(self.EVENT_STALLED, message)
self.pool.report_stalled_task_deps()
self.run_event_handlers(self.EVENT_STALLED, 'suite stalled')
self.pool.report_unmet_deps()
if self._get_events_conf('abort on stalled'):
raise SchedulerError('Abort on suite stalled is set')
# Start suite timeout timer
Expand Down Expand Up @@ -1671,6 +1668,9 @@ async def shutdown(self, reason):
self.proc_pool.process()

if self.pool is not None:
if not self.is_stalled:
# (else already reported)
self.pool.report_unmet_deps()
self.pool.warn_stop_orphans()
try:
self.suite_db_mgr.put_task_event_timers(self.task_events_mgr)
Expand Down Expand Up @@ -1746,30 +1746,18 @@ def stop_clock_done(self):
return False

def check_auto_shutdown(self):
"""Check if we should do a normal automatic shutdown."""
"""Check if we should do an automatic shutdown: main pool empty."""
if not self.can_auto_stop:
return False
can_shutdown = True
for itask in self.pool.get_all_tasks():
if self.pool.stop_point is None:
# Don't if any unsucceeded task exists.
if not itask.state(*TASK_STATUSES_SUCCESS):
can_shutdown = False
break
elif (
itask.point <= self.pool.stop_point
and not itask.state(*TASK_STATUSES_SUCCESS)
):
# Don't if any unsucceeded task exists < stop point...
if itask.identity not in self.pool.stuck_future_tasks:
# ...unless it has a future trigger extending > stop point.
can_shutdown = False
break
if can_shutdown and self.pool.stop_point:
self.pool.release_runahead_tasks()
if self.pool.get_tasks():
return False
# can shut down
if self.pool.stop_point:
self.options.stopcp = None
self.pool.stop_point = None
self.suite_db_mgr.delete_suite_stop_cycle_point()
return can_shutdown
return True

def hold_suite(self, point=None):
"""Hold all tasks in suite."""
Expand Down
158 changes: 82 additions & 76 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NOT_STALLED,
TASK_STATUSES_FAILURE,
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED,
TASK_STATUS_QUEUED,
Expand Down Expand Up @@ -176,7 +176,6 @@ def __init__(self, config, suite_db_mgr, task_events_mgr, job_pool):

self.is_held = False
self.hold_point = None
self.stuck_future_tasks = []
self.abs_outputs_done = set()

self.stop_task_id = None
Expand Down Expand Up @@ -241,7 +240,11 @@ def add_to_runahead_pool(self, itask, is_new=True):
return itask

def release_runahead_tasks(self):
"""Restrict the number of active cycle points.
"""Release tasks from the runahead pool to the main pool.
This serves to:
- restrict the number of active cycle points
- keep partially-satisfied waiting tasks out of the n=0 active pool
Compute runahead limit, and release tasks to the main pool if they are
below that point (and <= the stop point, if there is a stop point).
Expand Down Expand Up @@ -347,6 +350,9 @@ def release_runahead_tasks(self):
for point, itask_id_map in self.runahead_pool.copy().items():
if point <= latest_allowed_point:
for itask in itask_id_map.copy().values():
if itask.is_task_prereqs_not_done():
# Only release if all prerequisites are satisfied.
continue
self.release_runahead_task(itask)
released = True
return released
Expand Down Expand Up @@ -728,15 +734,6 @@ def get_ready_tasks(self):

return ready_tasks

def task_has_future_trigger_overrun(self, itask):
"""Check for future triggers extending beyond the final cycle."""
if not self.stop_point:
return False
for pct in itask.state.prerequisites_get_target_points():
if pct > self.stop_point:
return True
return False

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
cycles = list(self.pool)
Expand Down Expand Up @@ -914,77 +911,77 @@ def can_stop(self, stop_mode):

def warn_stop_orphans(self):
"""Log (warning) orphaned tasks on suite stop."""
orphans = []
orphans_kill_failed = []
for itask in self.get_tasks():
if (
itask.state(*TASK_STATUSES_ACTIVE)
and itask.state.kill_failed
):
LOG.warning("%s: orphaned task (%s, kill failed)" % (
itask.identity, itask.state.status))
elif itask.state(*TASK_STATUSES_ACTIVE):
LOG.warning("%s: orphaned task (%s)" % (
itask.identity, itask.state.status))
if itask.state(*TASK_STATUSES_ACTIVE):
if itask.state.kill_failed:
orphans_kill_failed.append(itask)
else:
orphans.append(itask)
if orphans_kill_failed:
LOG.warning(
"Orphaned task jobs (kill failed):\n"
+ "\n".join(
f"* {itask.identity} ({itask.state.status})"
for itask in orphans_kill_failed
)
)
if orphans:
LOG.warning(
"Orphaned task jobs:\n"
+ "\n".join(
f"* {itask.identity} ({itask.state.status})"
for itask in orphans
)
)

for key1, point, name, submit_num in self.task_events_mgr.event_timers:
LOG.warning("%s/%s/%s: incomplete task event handler %s" % (
point, name, submit_num, key1))

def is_stalled(self):
"""Return True if the suite is stalled.
A suite is stalled when:
* It is not held.
* It has no active tasks.
* It has waiting tasks with unmet prerequisites
(ignoring clock triggers).
A suite is stalled if it is not held and the active pool contains only
unhandled failed tasks.
"""
if self.is_held:
return False
can_be_stalled = False
unhandled_failed = []
for itask in self.get_tasks():
if (
self.stop_point
and itask.point > self.stop_point
or itask.state(
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED,
)
):
# Ignore: Task beyond stop point.
# Ignore: Succeeded and expired tasks.
continue
if itask.state(*TASK_STATUSES_NOT_STALLED):
# Pool contains active tasks (or held active tasks)
# Return "not stalled" immediately.
return False
if (
itask.state(TASK_STATUS_WAITING)
and itask.state.prerequisites_all_satisfied()
):
# Waiting tasks with all prerequisites satisfied,
# probably waiting for clock trigger only.
# This task can be considered active.
# Return "not stalled" immediately.
if itask.state(*TASK_STATUSES_FAILURE):
unhandled_failed.append(itask)
else:
return False
# We should be left with (submission) failed tasks and
# waiting tasks with unsatisfied prerequisites.
can_be_stalled = True
return can_be_stalled
if unhandled_failed:
LOG.warning(
"Suite stalled with unhandled failed tasks:\n"
+ "\n".join(
f"* {itask.identity} ({itask.state.status})"
for itask in unhandled_failed
)
)
return True
else:
return False

def report_stalled_task_deps(self):
"""Log unmet dependencies on stalled."""
def report_unmet_deps(self):
"""Log unmet dependencies on stall or shutdown."""
prereqs_map = {}
for itask in self.get_tasks():
if (
itask.state(TASK_STATUS_WAITING)
and itask.state.prerequisites_are_not_all_satisfied()
):
prereqs_map[itask.identity] = []
for prereq_str, is_met in itask.state.prerequisites_dump():
if not is_met:
prereqs_map[itask.identity].append(prereq_str)
# Partially satisfied tasks are hidden in the runahead pool.
for itask in self.get_rh_tasks():
prereqs_map[itask.identity] = []
for prereq_str, is_met in itask.state.prerequisites_dump():
if not is_met:
prereqs_map[itask.identity].append(prereq_str)

# prune tree to ignore items that are elsewhere in it
for id_, prereqs in list(prereqs_map.copy().items()):
if not prereqs:
# (tasks in runahead pool that are not unsatisfied)
del prereqs_map[id_]
continue
for prereq in prereqs:
prereq_strs = prereq.split()
if prereq_strs[0] == "LABEL:":
Expand All @@ -998,10 +995,16 @@ def report_stalled_task_deps(self):
del prereqs_map[id_]
break

for id_, prereqs in prereqs_map.items():
LOG.warning("Unmet prerequisites for %s:" % id_)
for prereq in prereqs:
LOG.warning(" * %s" % prereq)
if prereqs_map:
LOG.warning(
"Some partially satisfied prerequisites left over:\n"
+ "\n".join(
f"{id_} is waiting on:"
+ "\n".join(
f"\n* {prereq}" for prereq in prereqs
) for id_, prereqs in prereqs_map.items()
)
)

def set_hold_point(self, point):
"""Set the point after which tasks must be held."""
Expand Down Expand Up @@ -1217,13 +1220,16 @@ def spawn_task(self, name, point, flow_label=None, reflow=True,
"[%s] -holding (beyond suite hold point) %s",
itask, self.hold_point)
itask.state.reset(is_held=True)
elif (self.stop_point and itask.point <= self.stop_point and
self.task_has_future_trigger_overrun(itask)):
# Record tasks waiting on a future trigger beyond the stop point.
# (We ignore these waiting tasks when considering shutdown).
LOG.info("[%s] -holding (future trigger beyond stop point)", itask)
self.stuck_future_tasks.append(itask.identity)
elif (self.is_held
if self.stop_point and itask.point <= self.stop_point:
future_trigger_overrun = False
for pct in itask.state.prerequisites_get_target_points():
if pct > self.stop_point:
future_trigger_overrun = True
break
if future_trigger_overrun:
LOG.warning("[%s] -won't run: depends on a "
"task beyond the stop point", itask)
if (self.is_held
and itask.state(TASK_STATUS_WAITING, is_held=False)):
# Hold newly-spawned tasks in a held suite (e.g. due to manual
# triggering of a held task).
Expand Down
9 changes: 0 additions & 9 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,12 @@
TASK_STATUS_READY,
])

# Task statuses that are to be externally active
TASK_STATUSES_TO_BE_ACTIVE = set([
TASK_STATUS_QUEUED,
TASK_STATUS_READY,
])

# Task statuses that are externally active
TASK_STATUSES_ACTIVE = set([
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
])

# Task statuses in which tasks cannot be considered stalled
TASK_STATUSES_NOT_STALLED = TASK_STATUSES_ACTIVE | TASK_STATUSES_TO_BE_ACTIVE

# Task statuses that can be manually triggered.
TASK_STATUSES_TRIGGERABLE = set([
TASK_STATUS_WAITING,
Expand Down
55 changes: 0 additions & 55 deletions tests/flakyfunctional/cylc-take-checkpoints/00-basic.t

This file was deleted.

Loading

0 comments on commit 30b0b52

Please sign in to comment.