Skip to content

Commit

Permalink
First hack at no-cycle start-up and shut-down graphs.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Aug 22, 2022
1 parent 38c932e commit b4e5429
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 21 deletions.
17 changes: 16 additions & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@
get_sequence, get_sequence_cls, init_cyclers, get_dump_format,
INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE
)
from cylc.flow.cycling.nocycle import NocycleSequence
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval
from cylc.flow.exceptions import (
CylcError,
WorkflowConfigError,
IntervalParsingError,
SequenceParsingError,
TaskDefError,
ParamExpandError,
InputError
Expand Down Expand Up @@ -269,7 +271,9 @@ def __init__(
self.start_point: 'PointBase'
self.stop_point: Optional['PointBase'] = None
self.final_point: Optional['PointBase'] = None
self.sequence_initial: Optional['SequenceBase'] = None
self.sequences: List['SequenceBase'] = []
self.sequence_final: Optional['SequenceBase'] = None
self.actual_first_point: Optional['PointBase'] = None
self._start_point_for_actual_first_point: Optional['PointBase'] = None

Expand Down Expand Up @@ -2054,6 +2058,14 @@ def load_graph(self):
for section, graph in sections:
try:
seq = get_sequence(section, icp, fcp)
except SequenceParsingError:
seq = NocycleSequence(section)
if section == "startup":
self.sequence_initial = seq
elif section == "shutdown":
self.sequence_final = seq
else:
raise
except (AttributeError, TypeError, ValueError, CylcError) as exc:
if cylc.flow.flags.verbosity > 1:
traceback.print_exc()
Expand All @@ -2063,7 +2075,10 @@ def load_graph(self):
if isinstance(exc, CylcError):
msg += ' %s' % exc.args[0]
raise WorkflowConfigError(msg)
self.sequences.append(seq)

else:
self.sequences.append(seq)

parser = GraphParser(
family_map,
self.parameters,
Expand Down
6 changes: 5 additions & 1 deletion cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,11 @@ def TYPE_SORT_KEY(self) -> int:
@classmethod
@abstractmethod # Note: stacked decorator not strictly enforced in Py2.x
def get_async_expr(cls, start_point=0):
"""Express a one-off sequence at the initial cycle point."""
"""Express a one-off sequence at the initial cycle point.
Note "async" has nothing to do with asyncio. It was a (bad)
name for one-off (non-cycling) graphs in early Cylc versions.
"""
pass

@abstractmethod
Expand Down
55 changes: 55 additions & 0 deletions cylc/flow/cycling/nocycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
No-cycle logic for isolated start-up and shutdown graphs.
"""

ALLOWED_VALUES = ("startup", "shutdown")

class NocyclePoint:
"""A string-valued no-cycle point."""

def __init__(self, value: str) -> None:
if value not in ALLOWED_VALUES:
raise ValueError(f"Illegal Nocycle value {value}")
self.value = value

def __hash__(self):
return hash(self.value)

def __le__(self, other):
return str(other) == str(self.value)

def __lt__(self, other):
return False

def __str__(self):
return self.value


class NocycleSequence:
"""A no-cycle sequence is just a point."""

def __init__(self, dep_section, p_context_start=None, p_context_stop=None):
"""blah"""
self.point = NocyclePoint(dep_section)

def is_valid(self, point):
"""Is point on-sequence and in-bounds?"""
return True

def __str__(self):
return str(self.point)
37 changes: 25 additions & 12 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,17 +460,6 @@ async def configure(self):
self.is_reloaded = False
self.data_store_mgr.initiate_data_model()

self.profiler.log_memory("scheduler.py: before load_tasks")
if self.is_restart:
self._load_pool_from_db()
if self.restored_stop_task_id is not None:
self.pool.set_stop_task(self.restored_stop_task_id)
elif self.options.starttask:
self._load_pool_from_tasks()
else:
self._load_pool_from_point()
self.profiler.log_memory("scheduler.py: after load_tasks")

self.workflow_db_mgr.put_workflow_params(self)
self.workflow_db_mgr.put_workflow_template_vars(self.template_vars)
self.workflow_db_mgr.put_runtime_inheritance(self.config)
Expand Down Expand Up @@ -601,7 +590,29 @@ async def run_scheduler(self):
# Non-async sleep - yield to other threads rather than event loop
sleep(0)
self.profiler.start()
await self.main_loop()

# TODO: ALLOW RESTARTS...
from cylc.flow.cycling.nocycle import NocyclePoint

if self.config.sequence_initial is not None:
self.pool.load_nocycle_graph(self.config.sequence_initial)
await self.main_loop()

if self.config.sequences:
if self.is_restart:
self._load_pool_from_db()
if self.restored_stop_task_id is not None:
self.pool.set_stop_task(self.restored_stop_task_id)
elif self.options.starttask:
self._load_pool_from_tasks()
else:
self._load_pool_from_point()

await self.main_loop()

if self.config.sequence_final is not None:
self.pool.load_nocycle_graph(self.config.sequence_final)
await self.main_loop()

except SchedulerStop as exc:
# deliberate stop
Expand Down Expand Up @@ -1601,6 +1612,8 @@ async def main_loop(self):
# Shutdown workflow if timeouts have occurred
self.timeout_check()

if self.check_auto_shutdown():
break
# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

Expand Down
21 changes: 21 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,22 @@ def _swap_out(self, itask):
self.main_pool[itask.point][itask.identity] = itask
self.main_pool_changed = True

def load_nocycle_graph(self, seq):
"""blah """
flow_num = self.flow_mgr.get_new_flow(
f"original flow from {seq}")
for name in self.config.get_task_name_list():
tdef = self.config.get_taskdef(name)
if str(seq) not in [str(s) for s in tdef.sequences]:
continue
if tdef.is_parentless(seq.point, seq):
ntask = self._get_spawned_or_merged_task(
seq.point, tdef.name, {flow_num}
)
if ntask is not None:
self.add_to_pool(ntask)
self.rh_release_and_queue(ntask)

def load_from_point(self):
"""Load the task pool for the workflow start point.
Expand Down Expand Up @@ -326,6 +342,11 @@ def compute_runahead(self, force=False) -> bool:
return False
base_point = min(points)

from cylc.flow.cycling.nocycle import NocyclePoint
if type(base_point) is NocyclePoint:
# TODO USE CYCLING_TYPE PROPERLY
return False

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point

Expand Down
23 changes: 16 additions & 7 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point):
self.initial_point = initial_point

self.sequences = []

self.used_in_offset_trigger = False

# some defaults
Expand Down Expand Up @@ -264,15 +265,19 @@ def check_for_explicit_cycling(self):
raise TaskDefError(
"No cycling sequences defined for %s" % self.name)

def get_parent_points(self, point):
def get_parent_points(self, point, seq=None):
"""Return the cycle points of my parents, at point."""
parent_points = set()
for seq in self.sequences:
if not seq.is_valid(point):
if seq:
sequences = [seq]
else:
sequences = self.sequences
for sequence in sequences:
if not sequence.is_valid(point):
continue
if seq in self.dependencies:
if sequence in self.dependencies:
# task has prereqs in this sequence
for dep in self.dependencies[seq]:
for dep in self.dependencies[sequence]:
if dep.suicide:
continue
for trig in dep.task_triggers:
Expand Down Expand Up @@ -309,9 +314,12 @@ def is_valid_point(self, point: 'PointBase') -> bool:

def first_point(self, icp):
"""Return the first point for this task."""
from cylc.flow.cycling.nocycle import NocycleSequence
point = None
adjusted = []
for seq in self.sequences:
if type(seq) is NocycleSequence:
continue
pt = seq.get_first_point(icp)
if pt:
# may be None if beyond the sequence bounds
Expand All @@ -333,7 +341,7 @@ def next_point(self, point):
p_next = min(adjusted)
return p_next

def is_parentless(self, point):
def is_parentless(self, point, seq=None):
"""Return True if task has no parents at point.
Tasks are considered parentless if they have:
Expand All @@ -352,7 +360,8 @@ def is_parentless(self, point):
if self.sequential:
# Implicit parents
return False
parent_points = self.get_parent_points(point)

parent_points = self.get_parent_points(point, seq)
return (
not parent_points
or all(x < self.start_point for x in parent_points)
Expand Down

0 comments on commit b4e5429

Please sign in to comment.