Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hide waiting tasks from n=0. #3823

Merged
merged 6 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 13 additions & 25 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
TASK_STATUSES_SUCCESS,
TASK_STATUS_FAILED)
from cylc.flow.templatevars import load_template_vars
from cylc.flow import __version__ as CYLC_VERSION
Expand Down Expand Up @@ -1525,7 +1524,7 @@ async def update_data_structure(self):
updated_nodes = set(updated_tasks).union(
self.pool.get_pool_change_tasks())
if (
has_updated or
updated_nodes or
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The datastore wasn't updating on some changes (noticed via tests that use cylc dump, which now goes via GraphQL to the datastore).

self.data_store_mgr.updates_pending or
self.job_pool.updates_pending
):
Expand Down Expand Up @@ -1587,10 +1586,8 @@ def check_suite_stalled(self):
return
self.is_stalled = self.pool.is_stalled()
if self.is_stalled:
message = 'suite stalled'
LOG.warning(message)
self.run_event_handlers(self.EVENT_STALLED, message)
self.pool.report_stalled_task_deps()
self.run_event_handlers(self.EVENT_STALLED, 'suite stalled')
self.pool.report_unmet_deps()
if self._get_events_conf('abort on stalled'):
raise SchedulerError('Abort on suite stalled is set')
# Start suite timeout timer
Expand Down Expand Up @@ -1672,6 +1669,9 @@ async def shutdown(self, reason):
self.proc_pool.process()

if self.pool is not None:
if not self.is_stalled:
# (else already reported)
self.pool.report_unmet_deps()
self.pool.warn_stop_orphans()
try:
self.suite_db_mgr.put_task_event_timers(self.task_events_mgr)
Expand Down Expand Up @@ -1747,30 +1747,18 @@ def stop_clock_done(self):
return False

def check_auto_shutdown(self):
"""Check if we should do a normal automatic shutdown."""
"""Check if we should do an automatic shutdown: main pool empty."""
if not self.can_auto_stop:
return False
can_shutdown = True
for itask in self.pool.get_all_tasks():
if self.pool.stop_point is None:
# Don't if any unsucceeded task exists.
if not itask.state(*TASK_STATUSES_SUCCESS):
can_shutdown = False
break
elif (
itask.point <= self.pool.stop_point
and not itask.state(*TASK_STATUSES_SUCCESS)
):
# Don't if any unsucceeded task exists < stop point...
if itask.identity not in self.pool.stuck_future_tasks:
# ...unless it has a future trigger extending > stop point.
can_shutdown = False
break
if can_shutdown and self.pool.stop_point:
self.pool.release_runahead_tasks()
if self.pool.get_tasks():
return False
# can shut down
if self.pool.stop_point:
self.options.stopcp = None
self.pool.stop_point = None
self.suite_db_mgr.delete_suite_stop_cycle_point()
return can_shutdown
return True

def hold_suite(self, point=None):
"""Hold all tasks in suite."""
Expand Down
158 changes: 82 additions & 76 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NOT_STALLED,
TASK_STATUSES_FAILURE,
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED,
TASK_STATUS_QUEUED,
Expand Down Expand Up @@ -176,7 +176,6 @@ def __init__(self, config, suite_db_mgr, task_events_mgr, job_pool):

self.is_held = False
self.hold_point = None
self.stuck_future_tasks = []
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stuck future tasks are another artifact of the old SoS algorithm: a waiting task gets spawned even though it depends on another task that is beyond the stop point. Under SoD they don't get spawned (or if they have other non-future prereqs as well, they will only be spawned as hidden partially-satisfied tasks).

self.abs_outputs_done = set()

self.stop_task_id = None
Expand Down Expand Up @@ -241,7 +240,11 @@ def add_to_runahead_pool(self, itask, is_new=True):
return itask

def release_runahead_tasks(self):
"""Restrict the number of active cycle points.
"""Release tasks from the runahead pool to the main pool.

This serves to:
- restrict the number of active cycle points
- keep partially-satisfied waiting tasks out of the n=0 active pool

Compute runahead limit, and release tasks to the main pool if they are
below that point (and <= the stop point, if there is a stop point).
Expand Down Expand Up @@ -347,6 +350,9 @@ def release_runahead_tasks(self):
for point, itask_id_map in self.runahead_pool.copy().items():
if point <= latest_allowed_point:
for itask in itask_id_map.copy().values():
if itask.is_task_prereqs_not_done():
# Only release if all prerequisites are satisfied.
continue
self.release_runahead_task(itask)
released = True
return released
Expand Down Expand Up @@ -728,15 +734,6 @@ def get_ready_tasks(self):

return ready_tasks

def task_has_future_trigger_overrun(self, itask):
"""Check for future triggers extending beyond the final cycle."""
if not self.stop_point:
return False
for pct in itask.state.prerequisites_get_target_points():
if pct > self.stop_point:
return True
return False

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
cycles = list(self.pool)
Expand Down Expand Up @@ -914,77 +911,77 @@ def can_stop(self, stop_mode):

def warn_stop_orphans(self):
"""Log (warning) orphaned tasks on suite stop."""
orphans = []
orphans_kill_failed = []
for itask in self.get_tasks():
if (
itask.state(*TASK_STATUSES_ACTIVE)
and itask.state.kill_failed
):
LOG.warning("%s: orphaned task (%s, kill failed)" % (
itask.identity, itask.state.status))
elif itask.state(*TASK_STATUSES_ACTIVE):
LOG.warning("%s: orphaned task (%s)" % (
itask.identity, itask.state.status))
if itask.state(*TASK_STATUSES_ACTIVE):
if itask.state.kill_failed:
orphans_kill_failed.append(itask)
else:
orphans.append(itask)
if orphans_kill_failed:
LOG.warning(
"Orphaned task jobs (kill failed):\n"
+ "\n".join(
f"* {itask.identity} ({itask.state.status})"
for itask in orphans_kill_failed
)
)
if orphans:
LOG.warning(
"Orphaned task jobs:\n"
+ "\n".join(
f"* {itask.identity} ({itask.state.status})"
for itask in orphans
)
)

for key1, point, name, submit_num in self.task_events_mgr.event_timers:
LOG.warning("%s/%s/%s: incomplete task event handler %s" % (
point, name, submit_num, key1))

def is_stalled(self):
"""Return True if the suite is stalled.

A suite is stalled when:
* It is not held.
* It has no active tasks.
* It has waiting tasks with unmet prerequisites
(ignoring clock triggers).
A suite is stalled if it is not held and the active pool contains only
unhandled failed tasks.
"""
if self.is_held:
return False
can_be_stalled = False
unhandled_failed = []
for itask in self.get_tasks():
if (
self.stop_point
and itask.point > self.stop_point
or itask.state(
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED,
)
):
# Ignore: Task beyond stop point.
# Ignore: Succeeded and expired tasks.
continue
if itask.state(*TASK_STATUSES_NOT_STALLED):
# Pool contains active tasks (or held active tasks)
# Return "not stalled" immediately.
return False
if (
itask.state(TASK_STATUS_WAITING)
and itask.state.prerequisites_all_satisfied()
):
# Waiting tasks with all prerequisites satisfied,
# probably waiting for clock trigger only.
# This task can be considered active.
# Return "not stalled" immediately.
if itask.state(*TASK_STATUSES_FAILURE):
unhandled_failed.append(itask)
else:
return False
# We should be left with (submission) failed tasks and
# waiting tasks with unsatisfied prerequisites.
can_be_stalled = True
return can_be_stalled
if unhandled_failed:
LOG.warning(
"Suite stalled with unhandled failed tasks:\n"
+ "\n".join(
f"* {itask.identity} ({itask.state.status})"
for itask in unhandled_failed
)
)
return True
else:
return False

def report_stalled_task_deps(self):
"""Log unmet dependencies on stalled."""
def report_unmet_deps(self):
"""Log unmet dependencies on stall or shutdown."""
prereqs_map = {}
for itask in self.get_tasks():
if (
itask.state(TASK_STATUS_WAITING)
and itask.state.prerequisites_are_not_all_satisfied()
):
prereqs_map[itask.identity] = []
for prereq_str, is_met in itask.state.prerequisites_dump():
if not is_met:
prereqs_map[itask.identity].append(prereq_str)
# Partially satisfied tasks are hidden in the runahead pool.
for itask in self.get_rh_tasks():
prereqs_map[itask.identity] = []
for prereq_str, is_met in itask.state.prerequisites_dump():
if not is_met:
prereqs_map[itask.identity].append(prereq_str)

# prune tree to ignore items that are elsewhere in it
for id_, prereqs in list(prereqs_map.copy().items()):
if not prereqs:
# (tasks in runahead pool that are not unsatisfied)
del prereqs_map[id_]
continue
for prereq in prereqs:
prereq_strs = prereq.split()
if prereq_strs[0] == "LABEL:":
Expand All @@ -998,10 +995,16 @@ def report_stalled_task_deps(self):
del prereqs_map[id_]
break

for id_, prereqs in prereqs_map.items():
LOG.warning("Unmet prerequisites for %s:" % id_)
for prereq in prereqs:
LOG.warning(" * %s" % prereq)
if prereqs_map:
LOG.warning(
"Some partially satisfied prerequisites left over:\n"
+ "\n".join(
f"{id_} is waiting on:"
+ "\n".join(
f"\n* {prereq}" for prereq in prereqs
) for id_, prereqs in prereqs_map.items()
)
)

def set_hold_point(self, point):
"""Set the point after which tasks must be held."""
Expand Down Expand Up @@ -1217,13 +1220,16 @@ def spawn_task(self, name, point, flow_label=None, reflow=True,
"[%s] -holding (beyond suite hold point) %s",
itask, self.hold_point)
itask.state.reset(is_held=True)
elif (self.stop_point and itask.point <= self.stop_point and
self.task_has_future_trigger_overrun(itask)):
# Record tasks waiting on a future trigger beyond the stop point.
# (We ignore these waiting tasks when considering shutdown).
LOG.info("[%s] -holding (future trigger beyond stop point)", itask)
self.stuck_future_tasks.append(itask.identity)
elif (self.is_held
if self.stop_point and itask.point <= self.stop_point:
future_trigger_overrun = False
for pct in itask.state.prerequisites_get_target_points():
if pct > self.stop_point:
future_trigger_overrun = True
break
if future_trigger_overrun:
LOG.warning("[%s] -won't run: depends on a "
"task beyond the stop point", itask)
if (self.is_held
and itask.state(TASK_STATUS_WAITING, is_held=False)):
# Hold newly-spawned tasks in a held suite (e.g. due to manual
# triggering of a held task).
Expand Down
9 changes: 0 additions & 9 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,12 @@
TASK_STATUS_READY,
])

# Task statuses that are to be externally active
TASK_STATUSES_TO_BE_ACTIVE = set([
TASK_STATUS_QUEUED,
TASK_STATUS_READY,
])

# Task statuses that are externally active
TASK_STATUSES_ACTIVE = set([
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
])

# Task statuses in which tasks cannot be considered stalled
TASK_STATUSES_NOT_STALLED = TASK_STATUSES_ACTIVE | TASK_STATUSES_TO_BE_ACTIVE

# Task statuses that can be manually triggered.
TASK_STATUSES_TRIGGERABLE = set([
TASK_STATUS_WAITING,
Expand Down
55 changes: 0 additions & 55 deletions tests/flakyfunctional/cylc-take-checkpoints/00-basic.t

This file was deleted.

Loading