Skip to content

Commit

Permalink
Move processing of stop cycle point to config.py (#4827)
Browse files Browse the repository at this point in the history
Move processing of stop cycle point to config.py
* Testing: add dump_format arg to set_cycling_type fixture
* Fix point_parse cache issue
* Add test for empty DB when `--stopcp` is bad
* Improve docs
  * remove potentially misleading line from cylc-play usage
  * clear up versionchanged notice for runahead limit/max active cycle points setting
  • Loading branch information
MetRonnie authored May 3, 2022
1 parent 532d2ae commit 36dfa25
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 253 deletions.
32 changes: 16 additions & 16 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,12 +559,13 @@ def get_script_common_text(this: str, example: Optional[str] = None):
host (adjusted to UTC if workflow is in UTC mode but the host is
not) to minute resolution. Minutes (or hours, etc.) may be
ignored depending on the value of
:cylc:conf:`flow.cylc[scheduler]cycle point format`.
For more information on setting the initial cycle point relative
to the current time see :ref:`setting-the-icp-relative-to-now`.
''')
# NOTE: final cycle point is not a V_CYCLE_POINT to allow expressions
# such as '+P1Y' (relative to initial cycle point)
Conf('final cycle point', VDR.V_STRING, desc='''
The (optional) last cycle point at which tasks are run.
Expand Down Expand Up @@ -626,30 +627,29 @@ def get_script_common_text(this: str, example: Optional[str] = None):
Conf('hold after cycle point', VDR.V_CYCLE_POINT, desc=f'''
Hold all tasks that pass this cycle point.
.. versionchanged:: 8.0.0
{REPLACES}``[scheduling]hold after point``.
Unlike the final
cycle point, the workflow does not shut down once all tasks have
passed this point. If this item is set you can override it on the
command line using ``--hold-after``.
.. versionchanged:: 8.0.0
{REPLACES}``[scheduling]hold after point``.
''')
Conf('stop after cycle point', VDR.V_CYCLE_POINT, desc='''
Shut down workflow after all tasks **pass** this cycle point.
.. versionadded:: 8.0.0
The stop cycle point can be overridden on the command line using
``cylc play --stop-cycle-point=POINT``
.. note:
Not to be confused with :cylc:conf:`[..]final cycle point`:
There can be more graph beyond this point, but you are
choosing not to run that part of the graph. You can play
the workflow and continue.
Not to be confused with :cylc:conf:`[..]final cycle point`:
There can be more graph beyond this point, but you are
choosing not to run that part of the graph. You can play
the workflow and continue.
.. versionadded:: 8.0.0
''')
Conf('cycling mode', VDR.V_STRING, Calendar.MODE_GREGORIAN,
options=list(Calendar.MODES) + ['integer'], desc='''
Expand All @@ -665,11 +665,6 @@ def get_script_common_text(this: str, example: Optional[str] = None):
Conf('runahead limit', VDR.V_STRING, 'P5', desc='''
How many cycles ahead of the slowest tasks the fastest may run.
.. versionchanged:: 8.0.0
The deprecated ``[scheduling]max active cycle points`` setting
was merged into this one.
Runahead limiting prevents the fastest tasks in a workflow from
getting too far ahead of the slowest ones, as documented in
:ref:`RunaheadLimit`.
Expand Down Expand Up @@ -697,6 +692,11 @@ def get_script_common_text(this: str, example: Optional[str] = None):
The runahead limit may be automatically raised if this is
necessary to allow a future task to be triggered, preventing
the workflow from stalling.
.. versionchanged:: 8.0.0
The integer (``Pn``) type limit was introduced to replace the
deprecated ``[scheduling]max active cycle points = n`` setting.
''')

with Conf('queues', desc='''
Expand Down
37 changes: 33 additions & 4 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def __init__(

self.initial_point: 'PointBase'
self.start_point: 'PointBase'
self.stop_point: Optional['PointBase'] = None
self.final_point: Optional['PointBase'] = None
self.sequences: List['SequenceBase'] = []
self.actual_first_point: Optional['PointBase'] = None
Expand Down Expand Up @@ -384,6 +385,7 @@ def __init__(
self.process_initial_cycle_point()
self.process_start_cycle_point()
self.process_final_cycle_point()
self.process_stop_cycle_point()

# Parse special task cycle point offsets, and replace family names.
LOG.debug("Parsing [special tasks]")
Expand Down Expand Up @@ -693,10 +695,9 @@ def process_final_cycle_point(self) -> None:
Raises:
WorkflowConfigError - if it fails to validate
"""
if (
self.cfg['scheduling']['final cycle point'] is not None and
not self.cfg['scheduling']['final cycle point'].strip()
):
if self.cfg['scheduling']['final cycle point'] == '':
# (Unlike other cycle point settings in config, fcp is treated as
# a string by parsec to allow for expressions like '+P1Y')
self.cfg['scheduling']['final cycle point'] = None
fcp_str = getattr(self.options, 'fcp', None)
if fcp_str == 'reload':
Expand Down Expand Up @@ -743,6 +744,34 @@ def process_final_cycle_point(self) -> None:
f"Final cycle point {self.final_point} does not "
f"meet the constraints {constraints}")

def process_stop_cycle_point(self) -> None:
"""Set the stop after cycle point.
In decreasing priority, it is set:
* From the command line option (``--stopcp=XYZ``) or database.
* From the flow.cylc file (``[scheduling]stop after cycle point``).
However, if ``--stopcp=reload`` on the command line during restart,
the ``[scheduling]stop after cycle point`` value is used.
"""
stopcp_str: Optional[str] = getattr(self.options, 'stopcp', None)
if stopcp_str == 'reload':
stopcp_str = self.options.stopcp = None
if stopcp_str is None:
stopcp_str = self.cfg['scheduling']['stop after cycle point']

if stopcp_str is not None:
self.stop_point = get_point(stopcp_str).standardise()
if self.final_point and (self.stop_point > self.final_point):
LOG.warning(
f"Stop cycle point '{self.stop_point}' will have no "
"effect as it is after the final cycle "
f"point '{self.final_point}'."
)
self.stop_point = None
stopcp_str = str(self.stop_point) if self.stop_point else None
self.cfg['scheduling']['stop after cycle point'] = stopcp_str

def _check_implicit_tasks(self) -> None:
"""Raise WorkflowConfigError if implicit tasks are found in graph or
queue config, unless allowed by config."""
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _cmp(self, other) -> int:
"""Compare self to other point, returning a 'cmp'-like result."""
pass

def standardise(self):
def standardise(self) -> 'PointBase':
"""Format self.value into a standard representation and check it."""
return self

Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ def get_point_relative(offset_string, base_point):
except IsodatetimeError:
return ISO8601Point(str(
WorkflowSpecifics.abbrev_util.parse_timepoint(
offset_string, context_point=_point_parse(base_point.value))
offset_string, context_point=point_parse(base_point.value))
))
else:
return base_point + interval
Expand Down Expand Up @@ -897,13 +897,13 @@ def _interval_parse(interval_string):
return WorkflowSpecifics.interval_parser.parse(interval_string)


def point_parse(point_string):
def point_parse(point_string: str) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object."""
return _point_parse(point_string)
return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT)


@lru_cache(10000)
def _point_parse(point_string):
def _point_parse(point_string, _dump_fmt):
"""Parse a point_string into a proper TimePoint object."""
if "%" in WorkflowSpecifics.DUMP_FORMAT:
# May be a custom not-quite ISO 8601 dump format.
Expand Down
51 changes: 7 additions & 44 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import WorkflowConfig
from cylc.flow.cycling.loader import get_point
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.id import Tokens
from cylc.flow.flow_mgr import FLOW_NONE, FlowMgr, FLOW_NEW
Expand Down Expand Up @@ -493,8 +492,6 @@ async def configure(self):
self._load_pool_from_tasks()
else:
self._load_pool_from_point()

self.process_stop_cycle_point()
self.profiler.log_memory("scheduler.py: after load_tasks")

self.workflow_db_mgr.put_workflow_params(self)
Expand Down Expand Up @@ -601,8 +598,10 @@ async def log_start(self):
'Initial point: %s', self.config.initial_point, extra=log_extra)
if self.config.start_point != self.config.initial_point:
LOG.info(
'Start point: %s', self.config.start_point, extra=log_extra)
LOG.info('Final point: %s', self.config.final_point, extra=log_extra)
f'Start point: {self.config.start_point}', extra=log_extra)
LOG.info(f'Final point: {self.config.final_point}', extra=log_extra)
if self.config.stop_point:
LOG.info(f'Stop point: {self.config.stop_point}', extra=log_extra)

if is_quiet:
LOG.info("Quiet mode on")
Expand Down Expand Up @@ -1973,34 +1972,6 @@ def _check_startup_opts(self) -> None:
f"option --{opt}=reload is only valid for restart"
)

def process_stop_cycle_point(self) -> None:
"""Set stop after cycle point.
In decreasing priority, stop cycle point (``stopcp``) is set:
* From the command line (``cylc play --stopcp=XYZ``).
* From the database.
* From the flow.cylc file (``[scheduling]stop after cycle point``).
However, if ``--stopcp=reload`` on the command line during restart,
the ``[scheduling]stop after cycle point`` value is used.
"""
stoppoint = self.config.cfg['scheduling'].get('stop after cycle point')
if self.options.stopcp != 'reload':
if self.options.stopcp:
stoppoint = self.options.stopcp
# Tests whether pool has stopcp from database on restart.
elif (
self.pool.stop_point and
self.pool.stop_point != self.config.final_point
):
stoppoint = self.pool.stop_point

if stoppoint is not None:
self.options.stopcp = str(stoppoint)
self.pool.set_stop_point(get_point(self.options.stopcp))
self.validate_finalcp()
self.update_data_store()

async def handle_exception(self, exc: Exception) -> NoReturn:
"""Gracefully shut down the scheduler given a caught exception.
Expand All @@ -2012,22 +1983,14 @@ async def handle_exception(self, exc: Exception) -> NoReturn:
await self.shutdown(exc)
raise exc from None

def validate_finalcp(self) -> None:
"""Warn if Stop Cycle point is on or after the final cycle point
"""
if self.config.final_point is None:
return
if get_point(self.options.stopcp) > self.config.final_point:
LOG.warning(
f"Stop cycle point '{self.options.stopcp}' will have no "
"effect as it is after the final cycle "
f"point '{self.config.final_point}'.")

def update_data_store(self):
"""Sets the update flag on the data store.
Call this method whenever the Scheduler's state has changed in a way
that requires a data store update.
See cylc.flow.workflow_status.get_workflow_status() for a
(non-exhaustive?) list of properties that if changed will require
this update.
This call should often be associated with a database update.
Expand Down
2 changes: 0 additions & 2 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ def get_option_parser(add_std_opts: bool = False) -> COP:
"Set the final cycle point. "
"This command line option overrides the workflow "
"config option '[scheduling]final cycle point'. "
"Use a value of 'reload' to reload from flow.cylc in a restart."
),
metavar="CYCLE_POINT", action="store", dest="fcp")

Expand All @@ -159,7 +158,6 @@ def get_option_parser(add_std_opts: bool = False) -> COP:
"(Not to be confused with the final cycle point.) "
"This command line option overrides the workflow "
"config option '[scheduling]stop after cycle point'. "
"Use a value of 'reload' to reload from flow.cylc in a restart."
),
metavar="CYCLE_POINT", action="store", dest="stopcp")

Expand Down
39 changes: 19 additions & 20 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
) -> None:

self.config: 'WorkflowConfig' = config
self.stop_point = config.final_point
self.stop_point = config.stop_point or config.final_point
self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr
self.task_events_mgr: 'TaskEventsManager' = task_events_mgr
# TODO this is ugly:
Expand Down Expand Up @@ -787,13 +787,10 @@ def set_max_future_offset(self):
max_offset = itask.tdef.max_future_prereq_offset
self.max_future_offset = max_offset

def set_do_reload(self, config):
def set_do_reload(self, config: 'WorkflowConfig') -> None:
"""Set the task pool to reload mode."""
self.config = config
if config.options.stopcp:
self.stop_point = get_point(config.options.stopcp)
else:
self.stop_point = config.final_point
self.stop_point = config.stop_point or config.final_point
self.do_reload = True

self.custom_runahead_limit = self.config.get_custom_runahead_limit()
Expand Down Expand Up @@ -882,29 +879,31 @@ def reload_taskdefs(self) -> None:

self.do_reload = False

def set_stop_point(self, stop_point):
def set_stop_point(
self, stop_point: Optional['PointBase']
) -> Optional['PointBase']:
"""Set the global workflow stop point."""
if self.stop_point == stop_point:
return
return None
LOG.info("Setting stop cycle point: %s", stop_point)
self.stop_point = stop_point
for itask in self.get_tasks():
# check cycle stop or hold conditions
if (
self.stop_point
and itask.point > self.stop_point
if self.stop_point:
for itask in self.get_tasks():
# check cycle stop or hold conditions
if (
itask.point > self.stop_point
and itask.state(
TASK_STATUS_WAITING,
is_queued=True,
is_held=False
)
):
LOG.warning(
f"[{itask}] not running (beyond workflow stop cycle) "
f"{self.stop_point}"
)
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_held(itask)
):
LOG.warning(
f"[{itask}] not running (beyond workflow stop cycle) "
f"{self.stop_point}"
)
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_held(itask)
return self.stop_point

def can_stop(self, stop_mode):
Expand Down
Loading

0 comments on commit 36dfa25

Please sign in to comment.