Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make --stopcp=reload use [scheduling]stop after cycle point instead of the final cycle point #4543

Merged
merged 7 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ of workflows inside other installed workflows.

### Fixes

[#4543](https://github.com/cylc/cylc-flow/pull/4543) -
`cylc play --stopcp=reload` now takes its value from
`[scheduling]stop after cycle point` instead of using the final cycle point.

-------------------------------------------------------------------------------
## __cylc-8.0b3 (<span actions:bind='release-date'>Released 2021-11-10</span>)__

Expand Down
72 changes: 27 additions & 45 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class Scheduler:
ext_trigger_queue: Queue

# configuration
config: WorkflowConfig # flow config
options: Values
cylc_config: DictTree # [scheduler] config

Expand All @@ -195,7 +196,6 @@ class Scheduler:
bad_hosts: Optional[Set[str]] = None

# configuration
config: Optional[WorkflowConfig] = None # flow config
flow_file: Optional[str] = None
flow_file_update_time: Optional[float] = None

Expand Down Expand Up @@ -480,7 +480,7 @@ async def configure(self):
else:
self._load_pool_from_point()

self.process_cylc_stop_point()
self.process_stop_cycle_point()
self.profiler.log_memory("scheduler.py: after load_tasks")

self.workflow_db_mgr.put_workflow_params(self)
Expand Down Expand Up @@ -678,17 +678,14 @@ def _load_pool_from_point(self):
released from runhead.)

"""
if self.config.start_point is not None:
start_type = "Warm" if self.options.startcp else "Cold"
LOG.info(f"{start_type} start from {self.config.start_point}")
LOG.info(
f"{'warm' if self.options.startcp else 'cold'} start from"
f"{self.config.start_point}.")

flow_num = self.flow_mgr.get_new_flow(
f"original flow from {self.config.start_point}"
)
for name in self.config.get_task_name_list():
if self.config.start_point is None:
# No start cycle point at which to load cycling tasks.
continue
Comment on lines -689 to -691
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this condition is no longer possible. And L691 showing as never hit in code coverage: https://app.codecov.io/gh/cylc/cylc-flow/blob/master/cylc/flow/scheduler.py

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked in practice. Confirmed I can't get it to be true.

tdef = self.config.get_taskdef(name)
try:
point = sorted([
Expand All @@ -708,9 +705,6 @@ def _load_pool_from_point(self):

def _load_pool_from_db(self):
"""Load task pool from DB, for a restart."""
if self.options.startcp:
self.config.start_point = TaskID.get_standardised_point(
self.options.startcp)
Comment on lines -711 to -713
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be unnecessary as duplication of

def process_start_cycle_point(self) -> None:
"""Set the start cycle point from options.
Sets:
self.options.startcp
self.start_point
"""
startcp = getattr(self.options, 'startcp', None)
starttask = getattr(self.options, 'starttask', None)
if startcp is not None and starttask is not None:
raise UserInputError(
"--start-cycle-point and --start-task are mutually exclusive"
)
if startcp:
# Start from a point later than initial point.
if self.options.startcp == 'now':
self.options.startcp = get_current_time_string()
self.start_point = get_point(self.options.startcp).standardise()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirm looks reasonable.

self.workflow_db_mgr.pri_dao.select_broadcast_states(
self.broadcast_mgr.load_db_broadcast_states)
self.workflow_db_mgr.pri_dao.select_task_job_run_times(
Expand Down Expand Up @@ -1125,10 +1119,6 @@ def load_flow_file(self, is_reload=False):
handle.write(b"# cylc-version: %s\n" % CYLC_VERSION.encode())
printcfg(self.config.cfg, none_str=None, handle=handle)

if not self.config.initial_point and not self.is_restart:
LOG.warning('No initial cycle point provided - no cycling tasks '
'will be loaded.')

Comment on lines -1128 to -1131
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again pretty sure this condition is no longer possible (config.initial_point should be set by this point)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legit.

# Pass static cylc and workflow variables to job script generation code
self.task_job_mgr.job_file_writer.set_workflow_env({
**verbosity_to_env(cylc.flow.flags.verbosity),
Expand Down Expand Up @@ -1159,23 +1149,17 @@ def _load_workflow_params(self, row_idx, row):
LOG.info('LOADING workflow parameters')
key, value = row
if key in self.workflow_db_mgr.KEY_INITIAL_CYCLE_POINT_COMPATS:
if self.is_restart and self.options.icp == 'reload':
LOG.debug(f"- initial point = {value} (ignored)")
elif self.options.icp is None:
self.options.icp = value
LOG.info(f"+ initial point = {value}")
self.options.icp = value
LOG.info(f"+ initial point = {value}")
elif key in self.workflow_db_mgr.KEY_START_CYCLE_POINT_COMPATS:
Comment on lines -1162 to 1154
Copy link
Member Author

@MetRonnie MetRonnie Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since #4313, initial and start cycle points cannot change on restart. But still need to set self.options.icp/startcp from DB as they are used later to set self.config.initial_point/start_point.

if self.is_restart and self.options.startcp == 'reload':
LOG.debug(f"- start point = {value} (ignored)")
elif self.options.startcp is None:
self.options.startcp = value
LOG.info(f"+ start point = {value}")
self.options.startcp = value
LOG.info(f"+ start point = {value}")
elif key in self.workflow_db_mgr.KEY_FINAL_CYCLE_POINT_COMPATS:
if self.is_restart and self.options.fcp == 'reload':
LOG.debug(f"- override final point = {value} (ignored)")
LOG.debug(f"- final point = {value} (ignored)")
wxtim marked this conversation as resolved.
Show resolved Hide resolved
elif self.options.fcp is None:
self.options.fcp = value
LOG.info(f"+ override final point = {value}")
LOG.info(f"+ final point = {value}")
elif key == self.workflow_db_mgr.KEY_STOP_CYCLE_POINT:
if self.is_restart and self.options.stopcp == 'reload':
LOG.debug(f"- stop point = {value} (ignored)")
Expand Down Expand Up @@ -1929,29 +1913,27 @@ def _check_startup_opts(self) -> None:
f"option --{opt}=reload is only valid for restart"
)

def process_cylc_stop_point(self):
"""
Set stop point.
def process_stop_cycle_point(self) -> None:
"""Set stop after cycle point.

In decreasing priority, stop cycle point (``stopcp``) is set:
* From the final point for ``cylc play --stopcp=reload``.
* From the command line (``cylc play --stopcp=XYZ``).
* From the database.
* From the flow.cylc file (``[scheduling]stop after cycle point``).

However, if ``--stopcp=reload`` on the command line during restart,
the ``[scheduling]stop after cycle point`` value is used.
"""
stoppoint = None
if self.is_restart and self.options.stopcp == 'reload':
stoppoint = self.config.final_point
elif self.options.stopcp:
stoppoint = self.options.stopcp
# Tests whether pool has stopcp from database on restart.
elif (
self.pool.stop_point and
self.pool.stop_point != self.config.final_point
):
stoppoint = self.pool.stop_point
elif 'stop after cycle point' in self.config.cfg['scheduling']:
stoppoint = self.config.cfg['scheduling']['stop after cycle point']
stoppoint = self.config.cfg['scheduling'].get('stop after cycle point')
if self.options.stopcp != 'reload':
if self.options.stopcp:
stoppoint = self.options.stopcp
# Tests whether pool has stopcp from database on restart.
elif (
self.pool.stop_point and
self.pool.stop_point != self.config.final_point
):
stoppoint = self.pool.stop_point

if stoppoint is not None:
self.options.stopcp = str(stoppoint)
Expand All @@ -1970,7 +1952,7 @@ async def handle_exception(self, exc: Exception) -> NoReturn:
await self.shutdown(exc)
raise exc from None

def validate_finalcp(self):
def validate_finalcp(self) -> None:
"""Warn if Stop Cycle point is on or after the final cycle point
"""
if self.config.final_point is None:
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ def get_option_parser(add_std_opts=False):
help=(
"Set the final cycle point. "
"This command line option overrides the workflow "
"config option '[scheduling]final cycle point'."
"config option '[scheduling]final cycle point'. "
"Use a value of 'reload' to reload from flow.cylc in a restart."
),
metavar="CYCLE_POINT", action="store", dest="fcp")

Expand All @@ -154,7 +155,8 @@ def get_option_parser(add_std_opts=False):
"Shut down after all tasks have PASSED this cycle point. "
"(Not to be confused with the final cycle point.) "
"This command line option overrides the workflow "
"config option '[scheduling]stop after cycle point'."
"config option '[scheduling]stop after cycle point'. "
"Use a value of 'reload' to reload from flow.cylc in a restart."
),
metavar="CYCLE_POINT", action="store", dest="stopcp")

Expand Down
38 changes: 2 additions & 36 deletions tests/functional/cylc-play/06-warnif-scp-after-fcp.t
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

. "$(dirname "$0")/test_header"

set_test_number 14

# integer cycling
set_test_number 7

init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[scheduler]
Expand All @@ -31,7 +29,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
initial cycle point = 1
final cycle point = 2
cycling mode = integer
[[dependencies]]
[[graph]]
P1 = foo
__FLOW_CONFIG__

Expand All @@ -53,35 +51,3 @@ for SCP in 1 2 3; do
done

purge

# Gregorian Cycling
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check this for different cycling types; it should be the responsibility of the tests/unit/cycling and tests/functional/cyclers tests to ensure cycle point comparison works


init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[scheduler]
allow implicit tasks = True
[scheduling]
initial cycle point = 1348
final cycle point = 1349
[[dependencies]]
P1Y = foo
__FLOW_CONFIG__

TEST_NAME="${TEST_NAME_BASE}-validate"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"


for SCP in 1348 1349 1350; do
TEST_NAME="${TEST_NAME_BASE}-play-gregorian-scp=${SCP}"
workflow_run_ok "${TEST_NAME}" cylc play "${WORKFLOW_NAME}" \
--no-detach --stopcp="${SCP}"

if [[ "${SCP}" -lt 1350 ]]; then
grep_ok "Stop cycle point '.*'.*after.*final cycle point '.*'" \
"${RUN_DIR}/${WORKFLOW_NAME}/log/workflow/log" "-v"
else
grep_ok "Stop cycle point '.*'.*after.*final cycle point '.*'" \
"${RUN_DIR}/${WORKFLOW_NAME}/log/workflow/log"
fi
done

purge
56 changes: 31 additions & 25 deletions tests/functional/restart/08-stop-after-cycle-point.t
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ dumpdbtables() {
'SELECT cycle, name, status FROM task_pool ORDER BY cycle, name;' > taskpool.out
}

set_test_number 13
set_test_number 17
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

# Check that the config stop point gets stored in DB
workflow_run_ok "${TEST_NAME_BASE}-1-run-no-cli-opts" \
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --no-detach "${WORKFLOW_NAME}" \
-s 'MANUAL_SHUTDOWN="1970"'
dumpdbtables
Expand All @@ -43,41 +43,47 @@ cmp_ok taskpool.out << '__OUT__'
1971|hello|waiting
__OUT__

# Check that the config stop point works (even after restart)
workflow_run_ok "${TEST_NAME_BASE}-1-restart" \
# Check that --stopcp=reload takes value from flow.cylc on restart
workflow_run_ok "${TEST_NAME_BASE}-restart-cli-stopcp-reload" \
cylc play --no-detach --stopcp=reload "${WORKFLOW_NAME}" \
-s 'MANUAL_SHUTDOWN="1971"' -s 'STOPCP="1973"'
dumpdbtables
cmp_ok stopcp.out <<< '1973'
cmp_ok taskpool.out << '__OUT__'
1972|hello|waiting
__OUT__

# Check that the stop point stored in DB works on restart
workflow_run_ok "${TEST_NAME_BASE}-restart-db-stopcp" \
cylc play --no-detach "${WORKFLOW_NAME}"
dumpdbtables
# Task hello.1973 (after stop point) should be spawned but not submitted
# Stop point should be removed from DB once reached
cmp_ok stopcp.out < /dev/null
# Task hello.1974 (after stop point) should be spawned but not submitted
cmp_ok taskpool.out <<'__OUT__'
1973|hello|waiting
1974|hello|waiting
__OUT__

delete_db

# Check that the command line stop point gets stored in DB.
workflow_run_ok "${TEST_NAME_BASE}-2-run-cli-stop" \
cylc play --no-detach --stopcp=1971 "${WORKFLOW_NAME}" \
-s 'MANUAL_SHUTDOWN="1970"'
workflow_run_ok "${TEST_NAME_BASE}-restart-cli-stopcp" \
cylc play --no-detach --stopcp=1975 "${WORKFLOW_NAME}" \
-s 'MANUAL_SHUTDOWN="1974"'
dumpdbtables
cmp_ok stopcp.out <<< '1971'
cmp_ok stopcp.out <<< '1975'
# Note we have manually stopped before the stop point
cmp_ok taskpool.out << '__OUT__'
1971|hello|waiting
1975|hello|waiting
__OUT__
# Check that the command line stop point works (even after restart)...
workflow_run_ok "${TEST_NAME_BASE}-2-restart" \
cylc play --no-detach "${WORKFLOW_NAME}"

# Check that workflow stops immediately if restarted past stopcp
workflow_run_ok "${TEST_NAME_BASE}-restart-final" \
cylc play --no-detach --stopcp=reload "${WORKFLOW_NAME}"
# Note the config value remains as 1973 (not 1972) because Jinja2 variables persist over restart
grep_workflow_log_ok "${TEST_NAME_BASE}-log-grep" "Setting stop cycle point: 1973"
dumpdbtables
cmp_ok stopcp.out < /dev/null
cmp_ok taskpool.out << '__OUT__'
1972|hello|waiting
1975|hello|waiting
__OUT__

# ... unless we reload stop point - takes value from final cycle point
# Note: we might want to rethink that - https://github.com/cylc/cylc-flow/issues/4062
workflow_run_ok "${TEST_NAME_BASE}-2-restart-cli-reload" \
cylc play --no-detach --stopcp=reload "${WORKFLOW_NAME}" \
-s 'MANUAL_SHUTDOWN="1973"'
dumpdbtables
cmp_ok stopcp.out <<< '1974'

purge
4 changes: 2 additions & 2 deletions tests/functional/restart/08-stop-after-cycle-point/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ description = """
[scheduling]
runahead limit = P1
initial cycle point = 1970
final cycle point = 1974
stop after cycle point = 1972
final cycle point = 1976
stop after cycle point = {{ STOPCP if STOPCP is defined else 1972 }}
[[graph]]
P1Y = hello

Expand Down
Loading