diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py
index a6d16bbe76d..f521614c73a 100644
--- a/cylc/flow/cfgspec/workflow.py
+++ b/cylc/flow/cfgspec/workflow.py
@@ -1260,10 +1260,17 @@ def get_script_common_text(this: str, example: Optional[str] = None):
- ``all`` - all instance of the task will fail
- ``2017-08-12T06, 2017-08-12T18`` - these instances of
the task will fail
+
+ If you set :cylc:conf:`[..][..]execution retry delays`
+ the second attempt will succeed unless you set
+ :cylc:conf:`[..]fail try 1 only = False`.
''')
Conf('fail try 1 only', VDR.V_BOOLEAN, True, desc='''
If ``True`` only the first run of the task
instance will fail, otherwise retries will fail too.
+
+ Task instances must be set to fail by
+ :cylc:conf:`[..]fail cycle points`.
''')
Conf('disable task event handlers', VDR.V_BOOLEAN, True,
desc='''
diff --git a/cylc/flow/config.py b/cylc/flow/config.py
index 0096d924c8b..d80456266bf 100644
--- a/cylc/flow/config.py
+++ b/cylc/flow/config.py
@@ -79,8 +79,8 @@
get_cylc_run_dir,
is_relative_to,
)
-from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.print_tree import print_tree
+from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
EventData,
@@ -521,7 +521,8 @@ def __init__(
self.process_runahead_limit()
if self.run_mode('simulation', 'dummy'):
- self.configure_sim_modes()
+ configure_sim_modes(
+ self.taskdefs.values(), self.run_mode())
self.configure_workflow_state_polling_tasks()
@@ -1340,68 +1341,6 @@ def configure_workflow_state_polling_tasks(self):
script = "echo " + comstr + "\n" + comstr
rtc['script'] = script
- def configure_sim_modes(self):
- """Adjust task defs for simulation and dummy mode."""
- for tdef in self.taskdefs.values():
- # Compute simulated run time by scaling the execution limit.
- rtc = tdef.rtconfig
- limit = rtc['execution time limit']
- speedup = rtc['simulation']['speedup factor']
- if limit and speedup:
- sleep_sec = (DurationParser().parse(
- str(limit)).get_seconds() / speedup)
- else:
- sleep_sec = DurationParser().parse(
- str(rtc['simulation']['default run length'])
- ).get_seconds()
- rtc['execution time limit'] = (
- sleep_sec + DurationParser().parse(str(
- rtc['simulation']['time limit buffer'])).get_seconds()
- )
- rtc['job']['simulated run length'] = sleep_sec
-
- # Generate dummy scripting.
- rtc['init-script'] = ""
- rtc['env-script'] = ""
- rtc['pre-script'] = ""
- rtc['post-script'] = ""
- scr = "sleep %d" % sleep_sec
- # Dummy message outputs.
- for msg in rtc['outputs'].values():
- scr += "\ncylc message '%s'" % msg
- if rtc['simulation']['fail try 1 only']:
- arg1 = "true"
- else:
- arg1 = "false"
- arg2 = " ".join(rtc['simulation']['fail cycle points'])
- scr += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2)
- rtc['script'] = scr
-
- # Dummy mode jobs should run on platform localhost
- # All Cylc 7 config items which conflict with platform are removed.
- for section, keys in FORBIDDEN_WITH_PLATFORM.items():
- if section in rtc:
- for key in keys:
- if key in rtc[section]:
- rtc[section][key] = None
-
- rtc['platform'] = 'localhost'
-
- # Disable environment, in case it depends on env-script.
- rtc['environment'] = {}
-
- # Simulation mode tasks should fail in which cycle points?
- f_pts = []
- f_pts_orig = rtc['simulation']['fail cycle points']
- if 'all' in f_pts_orig:
- # None for "fail all points".
- f_pts = None
- else:
- # (And [] for "fail no points".)
- for point_str in f_pts_orig:
- f_pts.append(get_point(point_str).standardise())
- rtc['simulation']['fail cycle points'] = f_pts
-
def get_parent_lists(self):
return self.runtime['parents']
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index e5c3fc7c01a..3f77cc2393a 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -113,6 +113,7 @@
)
from cylc.flow.profiler import Profiler
from cylc.flow.resources import get_resources
+from cylc.flow.simulation import sim_time_check
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.templatevars import eval_var
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
@@ -1740,7 +1741,11 @@ async def main_loop(self) -> None:
self.pool.set_expired_tasks()
self.release_queued_tasks()
- if self.pool.sim_time_check(self.message_queue):
+ if (
+ self.pool.config.run_mode('simulation')
+ and sim_time_check(
+ self.message_queue, self.pool.get_tasks())
+ ):
# A simulated task state change occurred.
self.reset_inactivity_timer()
diff --git a/cylc/flow/simulation.py b/cylc/flow/simulation.py
new file mode 100644
index 00000000000..15314f8e3e7
--- /dev/null
+++ b/cylc/flow/simulation.py
@@ -0,0 +1,219 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+"""Utilities supporting simulation and skip modes
+"""
+
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+from time import time
+
+from cylc.flow.cycling.loader import get_point
+from cylc.flow.network.resolvers import TaskMsg
+from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
+from cylc.flow.task_state import (
+ TASK_STATUS_RUNNING,
+ TASK_STATUS_FAILED,
+ TASK_STATUS_SUCCEEDED,
+)
+from cylc.flow.wallclock import get_current_time_string
+
+from metomi.isodatetime.parsers import DurationParser
+
+if TYPE_CHECKING:
+ from queue import Queue
+ from cylc.flow.cycling import PointBase
+ from cylc.flow.task_proxy import TaskProxy
+
+
+def configure_sim_modes(taskdefs, sim_mode):
+ """Adjust task defs for simulation and dummy mode.
+
+ """
+ dummy_mode = bool(sim_mode == 'dummy')
+
+ for tdef in taskdefs:
+ # Compute simulated run time by scaling the execution limit.
+ rtc = tdef.rtconfig
+ sleep_sec = get_simulated_run_len(rtc)
+
+ rtc['execution time limit'] = (
+ sleep_sec + DurationParser().parse(str(
+ rtc['simulation']['time limit buffer'])).get_seconds()
+ )
+
+ rtc['simulation']['simulated run length'] = sleep_sec
+ rtc['submission retry delays'] = [1]
+
+ # Generate dummy scripting.
+ rtc['init-script'] = ""
+ rtc['env-script'] = ""
+ rtc['pre-script'] = ""
+ rtc['post-script'] = ""
+ rtc['script'] = build_dummy_script(
+ rtc, sleep_sec) if dummy_mode else ""
+
+ disable_platforms(rtc)
+
+ # Disable environment, in case it depends on env-script.
+ rtc['environment'] = {}
+
+ rtc["simulation"][
+ "fail cycle points"
+ ] = parse_fail_cycle_points(
+ rtc["simulation"]["fail cycle points"]
+ )
+
+
+def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
+ """Get simulated run time.
+
+ rtc = run time config
+ """
+ limit = rtc['execution time limit']
+ speedup = rtc['simulation']['speedup factor']
+ if limit and speedup:
+ sleep_sec = (DurationParser().parse(
+ str(limit)).get_seconds() / speedup)
+ else:
+ sleep_sec = DurationParser().parse(
+ str(rtc['simulation']['default run length'])
+ ).get_seconds()
+
+ return sleep_sec
+
+
+def build_dummy_script(rtc: Dict[str, Any], sleep_sec: int) -> str:
+ """Create fake scripting for dummy mode.
+
+ This is for Dummy mode only.
+ """
+ script = "sleep %d" % sleep_sec
+ # Dummy message outputs.
+ for msg in rtc['outputs'].values():
+ script += "\ncylc message '%s'" % msg
+ if rtc['simulation']['fail try 1 only']:
+ arg1 = "true"
+ else:
+ arg1 = "false"
+ arg2 = " ".join(rtc['simulation']['fail cycle points'])
+ script += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2)
+ return script
+
+
+def disable_platforms(
+ rtc: Dict[str, Any]
+) -> None:
+ """Force platform = localhost
+
+ Remove legacy sections [job] and [remote], which would conflict
+ with setting platforms.
+
+ This can be simplified when support for the FORBIDDEN_WITH_PLATFORM
+ configurations is dropped.
+ """
+ for section, keys in FORBIDDEN_WITH_PLATFORM.items():
+ if section in rtc:
+ for key in keys:
+ if key in rtc[section]:
+ rtc[section][key] = None
+ rtc['platform'] = 'localhost'
+
+
+def parse_fail_cycle_points(
+ f_pts_orig: List[str]
+) -> 'Union[None, List[PointBase]]':
+ """Parse `[simulation][fail cycle points]`.
+
+ - None for "fail all points".
+ - Else a list of cycle point objects.
+
+ Examples:
+ >>> this = parse_fail_cycle_points
+ >>> this(['all']) is None
+ True
+ >>> this([])
+ []
+ """
+ f_pts: 'Optional[List[PointBase]]'
+ if 'all' in f_pts_orig:
+ f_pts = None
+ else:
+ f_pts = []
+ for point_str in f_pts_orig:
+ f_pts.append(get_point(point_str).standardise())
+ return f_pts
+
+
+def sim_time_check(
+ message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
+) -> bool:
+ """Check if sim tasks have been "running" for as long as required.
+
+ If they have change the task state.
+
+ Returns:
+ True if _any_ simulated task state has changed.
+ """
+ sim_task_state_changed = False
+ now = time()
+ for itask in itasks:
+ if itask.state.status != TASK_STATUS_RUNNING:
+ continue
+ # Started time is not set on restart
+ if itask.summary['started_time'] is None:
+ itask.summary['started_time'] = now
+ timeout = (
+ itask.summary['started_time'] +
+ itask.tdef.rtconfig['simulation']['simulated run length']
+ )
+ if now > timeout:
+ job_d = itask.tokens.duplicate(job=str(itask.submit_num))
+ now_str = get_current_time_string()
+ if sim_task_failed(
+ itask.tdef.rtconfig['simulation'],
+ itask.point,
+ itask.get_try_num()
+ ):
+ message_queue.put(
+ TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
+ )
+ else:
+ # Simulate message outputs.
+ for msg in itask.tdef.rtconfig['outputs'].values():
+ message_queue.put(
+ TaskMsg(job_d, now_str, 'DEBUG', msg)
+ )
+ message_queue.put(
+ TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED)
+ )
+ sim_task_state_changed = True
+ return sim_task_state_changed
+
+
+def sim_task_failed(
+ sim_conf: Dict[str, Any],
+ point: 'PointBase',
+ try_num: int,
+) -> bool:
+ """Encapsulate logic for deciding whether a sim task has failed.
+
+ Allows Unit testing.
+ """
+ return (
+ sim_conf['fail cycle points'] is None # i.e. "all"
+ or point in sim_conf['fail cycle points']
+ ) and (
+ try_num == 1 or not sim_conf['fail try 1 only']
+ )
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index d4d9130ee27..ec5e467b8aa 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -575,6 +575,7 @@ def process_message(
True: if polling is required to confirm a reversal of status.
"""
+
# Log messages
if event_time is None:
event_time = get_current_time_string()
@@ -617,7 +618,6 @@ def process_message(
self.setup_event_handlers(
itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
self.spawn_func(itask, TASK_OUTPUT_STARTED)
-
if message == self.EVENT_STARTED:
if (
flag == self.FLAG_RECEIVED
@@ -777,24 +777,23 @@ def _process_message_check(
return False
if (
- itask.state(TASK_STATUS_WAITING)
- and
+ itask.state(TASK_STATUS_WAITING)
+ and itask.tdef.run_mode == 'live' # Polling in live mode only.
+ and (
(
- (
- # task has a submit-retry lined up
- TimerFlags.SUBMISSION_RETRY in itask.try_timers
- and itask.try_timers[
- TimerFlags.SUBMISSION_RETRY].num > 0
- )
- or
- (
- # task has an execution-retry lined up
- TimerFlags.EXECUTION_RETRY in itask.try_timers
- and itask.try_timers[
- TimerFlags.EXECUTION_RETRY].num > 0
- )
+ # task has a submit-retry lined up
+ TimerFlags.SUBMISSION_RETRY in itask.try_timers
+ and itask.try_timers[
+ TimerFlags.SUBMISSION_RETRY].num > 0
)
-
+ or
+ (
+ # task has an execution-retry lined up
+ TimerFlags.EXECUTION_RETRY in itask.try_timers
+ and itask.try_timers[
+ TimerFlags.EXECUTION_RETRY].num > 0
+ )
+ )
):
# Ignore messages if task has a retry lined up
# (caused by polling overlapping with task failure)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 412ba23b97a..d6bc59fb255 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -260,12 +260,13 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
Return (list): list of tasks that attempted submission.
"""
-
if is_simulation:
return self._simulation_submit_task_jobs(itasks, workflow)
+
# Prepare tasks for job submission
prepared_tasks, bad_tasks = self.prep_submit_task_jobs(
workflow, itasks)
+
# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()
@@ -999,16 +1000,17 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
itask.waiting_on_job_prep = False
itask.submit_num += 1
self._set_retry_timers(itask)
+
itask.platform = {'name': 'SIMULATION'}
itask.summary['job_runner_name'] = 'SIMULATION'
itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = (
- itask.tdef.rtconfig['job']['simulated run length']
+ itask.tdef.rtconfig['simulation']['simulated run length']
)
itask.jobs.append(
self.get_simulation_job_conf(itask, workflow)
)
self.task_events_mgr.process_message(
- itask, INFO, TASK_OUTPUT_SUBMITTED
+ itask, INFO, TASK_OUTPUT_SUBMITTED,
)
return itasks
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index 697bc72a155..01cbfa2f205 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -40,7 +40,6 @@
from cylc.flow.id import Tokens, detokenise
from cylc.flow.id_cli import contains_fnmatch
from cylc.flow.id_match import filter_ids
-from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.workflow_status import StopMode
from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags
from cylc.flow.task_events_mgr import (
@@ -74,7 +73,6 @@
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NONE, FLOW_NEW
if TYPE_CHECKING:
- from queue import Queue
from cylc.flow.config import WorkflowConfig
from cylc.flow.cycling import IntervalBase, PointBase
from cylc.flow.data_store_mgr import DataStoreMgr
@@ -1756,45 +1754,6 @@ def force_trigger_tasks(
return len(unmatched)
- def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
- """Simulation mode: simulate task run times and set states."""
- if not self.config.run_mode('simulation'):
- return False
- sim_task_state_changed = False
- now = time()
- for itask in self.get_tasks():
- if itask.state.status != TASK_STATUS_RUNNING:
- continue
- # Started time is not set on restart
- if itask.summary['started_time'] is None:
- itask.summary['started_time'] = now
- timeout = (itask.summary['started_time'] +
- itask.tdef.rtconfig['job']['simulated run length'])
- if now > timeout:
- conf = itask.tdef.rtconfig['simulation']
- job_d = itask.tokens.duplicate(job=str(itask.submit_num))
- now_str = get_current_time_string()
- if (
- conf['fail cycle points'] is None # i.e. "all"
- or itask.point in conf['fail cycle points']
- ) and (
- itask.get_try_num() == 1 or not conf['fail try 1 only']
- ):
- message_queue.put(
- TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
- )
- else:
- # Simulate message outputs.
- for msg in itask.tdef.rtconfig['outputs'].values():
- message_queue.put(
- TaskMsg(job_d, now_str, 'DEBUG', msg)
- )
- message_queue.put(
- TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED)
- )
- sim_task_state_changed = True
- return sim_task_state_changed
-
def set_expired_tasks(self):
res = False
for itask in self.get_tasks():
diff --git a/tests/functional/modes/03-simulation.t b/tests/functional/modes/03-simulation.t
new file mode 100644
index 00000000000..87a7ca37a9b
--- /dev/null
+++ b/tests/functional/modes/03-simulation.t
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+# Test that simulation mode runs, and reruns a failed task successfully
+# when execution retry delays is configured.
+
+. "$(dirname "$0")/test_header"
+set_test_number 2
+
+install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
+run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
+workflow_run_ok "${TEST_NAME_BASE}-run" \
+ cylc play \
+ --no-detach \
+ --mode=simulation \
+ --reference-test "${WORKFLOW_NAME}"
+purge
+exit
diff --git a/tests/functional/modes/03-simulation/flow.cylc b/tests/functional/modes/03-simulation/flow.cylc
new file mode 100644
index 00000000000..49300212d39
--- /dev/null
+++ b/tests/functional/modes/03-simulation/flow.cylc
@@ -0,0 +1,16 @@
+[scheduler]
+ [[events]]
+ workflow timeout = PT30S
+
+[scheduling]
+ initial cycle point = 2359
+ [[graph]]
+ R1 = get_observations
+
+[runtime]
+ [[get_observations]]
+ execution retry delays = PT2S
+ [[[simulation]]]
+ fail cycle points = all
+ fail try 1 only = True
+
diff --git a/tests/functional/modes/03-simulation/reference.log b/tests/functional/modes/03-simulation/reference.log
new file mode 100644
index 00000000000..2d14bc201fb
--- /dev/null
+++ b/tests/functional/modes/03-simulation/reference.log
@@ -0,0 +1,2 @@
+23590101T0000Z/get_observations -triggered off [] in flow 1
+23590101T0000Z/get_observations -triggered off [] in flow 1
diff --git a/tests/integration/test_simulation.py b/tests/integration/test_simulation.py
new file mode 100644
index 00000000000..e0ada2c3e49
--- /dev/null
+++ b/tests/integration/test_simulation.py
@@ -0,0 +1,125 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import pytest
+from queue import Queue
+from types import SimpleNamespace
+
+from cylc.flow.cycling.iso8601 import ISO8601Point
+from cylc.flow.simulation import sim_time_check
+
+
+def get_msg_queue_item(queue, id_):
+ for item in queue.queue:
+ if id_ in str(item.job_id):
+ return item
+
+
+@pytest.fixture(scope='module')
+async def sim_time_check_setup(
+ mod_flow, mod_scheduler, mod_start, mod_one_conf
+):
+ schd = mod_scheduler(mod_flow({
+ 'scheduler': {'cycle point format': '%Y'},
+ 'scheduling': {
+ 'initial cycle point': '1066',
+ 'graph': {
+ 'R1': 'one & fail_all & fast_forward'
+ }
+ },
+ 'runtime': {
+ 'one': {},
+ 'fail_all': {
+ 'simulation': {'fail cycle points': 'all'},
+ 'outputs': {'foo': 'bar'}
+ },
+ # This task ought not be finished quickly, but for the speed up
+ 'fast_forward': {
+ 'execution time limit': 'PT1M',
+ 'simulation': {'speedup factor': 2}
+ }
+ }
+ }))
+ msg_q = Queue()
+ async with mod_start(schd):
+ itasks = schd.pool.get_tasks()
+ for i in itasks:
+ i.try_timers = {'execution-retry': SimpleNamespace(num=0)}
+ yield schd, itasks, msg_q
+
+
+def test_false_if_not_running(sim_time_check_setup, monkeypatch):
+ schd, itasks, msg_q = sim_time_check_setup
+
+ # False if task status not running:
+ assert sim_time_check(msg_q, itasks) is False
+
+
+def test_sim_time_check_sets_started_time(sim_time_check_setup):
+ """But sim_time_check still returns False"""
+ schd, _, msg_q = sim_time_check_setup
+ one_1066 = schd.pool.get_task(ISO8601Point('1066'), 'one')
+ one_1066.state.status = 'running'
+ assert one_1066.summary['started_time'] is None
+ assert sim_time_check(msg_q, [one_1066]) is False
+ assert one_1066.summary['started_time'] is not None
+
+
+def test_task_finishes(sim_time_check_setup, monkeypatch):
+ """...and an appropriate message sent.
+
+ Checks all possible outcomes in sim_time_check where elapsed time is
+ greater than the simulation time.
+
+ Does NOT check every possible cause on an outcome - this is done
+ in unit tests.
+ """
+ schd, _, msg_q = sim_time_check_setup
+ monkeypatch.setattr('cylc.flow.simulation.time', lambda: 0)
+
+ # Setup a task to fail
+ fail_all_1066 = schd.pool.get_task(ISO8601Point('1066'), 'fail_all')
+ fail_all_1066.state.status = 'running'
+ fail_all_1066.try_timers = {'execution-retry': SimpleNamespace(num=0)}
+
+ # Before simulation time is up:
+ assert sim_time_check(msg_q, [fail_all_1066]) is False
+
+ # After simulation time is up:
+ monkeypatch.setattr('cylc.flow.simulation.time', lambda: 12)
+ assert sim_time_check(msg_q, [fail_all_1066]) is True
+ assert get_msg_queue_item(msg_q, '1066/fail_all').message == 'failed'
+
+ # Succeeds and records messages for all outputs:
+ fail_all_1066.try_timers = {'execution-retry': SimpleNamespace(num=1)}
+ msg_q = Queue()
+ assert sim_time_check(msg_q, [fail_all_1066]) is True
+ assert sorted(i.message for i in msg_q.queue) == ['bar', 'succeeded']
+
+
+def test_task_sped_up(sim_time_check_setup, monkeypatch):
+ """Task will speed up by a factor set in config."""
+ schd, _, msg_q = sim_time_check_setup
+ fast_forward_1066 = schd.pool.get_task(
+ ISO8601Point('1066'), 'fast_forward')
+ fast_forward_1066.state.status = 'running'
+
+ monkeypatch.setattr('cylc.flow.simulation.time', lambda: 0)
+ assert sim_time_check(msg_q, [fast_forward_1066]) is False
+ monkeypatch.setattr('cylc.flow.simulation.time', lambda: 29)
+ assert sim_time_check(msg_q, [fast_forward_1066]) is False
+ monkeypatch.setattr('cylc.flow.simulation.time', lambda: 31)
+ assert sim_time_check(msg_q, [fast_forward_1066]) is True
diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py
index 34c596a770c..bb55cbf295e 100644
--- a/tests/unit/test_config.py
+++ b/tests/unit/test_config.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+from copy import deepcopy
import os
import sys
from optparse import Values
@@ -38,12 +39,13 @@
from cylc.flow.parsec.exceptions import Jinja2Error, EmPyError
from cylc.flow.scheduler_cli import RunOptions
from cylc.flow.scripts.validate import ValidateOptions
+from cylc.flow.simulation import configure_sim_modes
from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.wallclock import get_utc_mode, set_utc_mode
from cylc.flow.xtrigger_mgr import XtriggerManager
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED,
- TASK_OUTPUT_SUCCEEDED
+ TASK_OUTPUT_SUCCEEDED,
)
from cylc.flow.cycling.iso8601 import ISO8601Point
@@ -1741,3 +1743,31 @@ def test_cylc_env_at_parsing(
assert var in cylc_env
else:
assert var not in cylc_env
+
+
+def test_configure_sim_mode(caplog):
+ job_section = {}
+ sim_section = {
+ 'speedup factor': '',
+ 'default run length': 'PT10S',
+ 'time limit buffer': 'PT0S',
+ 'fail try 1 only': False,
+ 'fail cycle points': '',
+ }
+ rtconfig_1 = {
+ 'execution time limit': '',
+ 'simulation': sim_section,
+ 'job': job_section,
+ 'outputs': {},
+ }
+ rtconfig_2 = deepcopy(rtconfig_1)
+ rtconfig_2['simulation']['default run length'] = 'PT2S'
+
+ taskdefs = [
+ SimpleNamespace(rtconfig=rtconfig_1),
+ SimpleNamespace(rtconfig=rtconfig_2),
+ ]
+ configure_sim_modes(taskdefs, 'simulation')
+ results = [
+ i.rtconfig['simulation']['simulated run length'] for i in taskdefs]
+ assert results == [10.0, 2.0]
diff --git a/tests/unit/test_simulation.py b/tests/unit/test_simulation.py
new file mode 100644
index 00000000000..1c490f35c16
--- /dev/null
+++ b/tests/unit/test_simulation.py
@@ -0,0 +1,166 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+"""Tests for utilities supporting simulation and skip modes
+"""
+import pytest
+from pytest import param
+
+from cylc.flow.cycling.integer import IntegerPoint
+from cylc.flow.cycling.iso8601 import ISO8601Point
+from cylc.flow.simulation import (
+ parse_fail_cycle_points,
+ build_dummy_script,
+ disable_platforms,
+ get_simulated_run_len,
+ sim_task_failed,
+)
+
+
+@pytest.mark.parametrize(
+ 'execution_time_limit, speedup_factor, default_run_length',
+ (
+ param(None, None, 'PT1H', id='default-run-length'),
+ param(None, 10, 'PT1H', id='speedup-factor-alone'),
+ param('PT1H', None, 'PT1H', id='execution-time-limit-alone'),
+ param('P1D', 24, 'PT1M', id='speed-up-and-execution-tl'),
+ )
+)
+def test_get_simulated_run_len(
+ execution_time_limit, speedup_factor, default_run_length
+):
+ """Test the logic of the presence or absence of config items.
+
+ Avoid testing the correct workign of DurationParser.
+ """
+ rtc = {
+ 'execution time limit': execution_time_limit,
+ 'simulation': {
+ 'speedup factor': speedup_factor,
+ 'default run length': default_run_length
+ }
+ }
+ assert get_simulated_run_len(rtc) == 3600
+
+
+@pytest.mark.parametrize(
+ 'fail_one_time_only', (True, False)
+)
+def test_set_simulation_script(fail_one_time_only):
+ rtc = {
+ 'outputs': {'foo': '1', 'bar': '2'},
+ 'simulation': {
+ 'fail try 1 only': fail_one_time_only,
+ 'fail cycle points': '1',
+ }
+ }
+ result = build_dummy_script(rtc, 60)
+ assert result.split('\n') == [
+ 'sleep 60',
+ "cylc message '1'",
+ "cylc message '2'",
+ f"cylc__job__dummy_result {str(fail_one_time_only).lower()}"
+ " 1 || exit 1"
+ ]
+
+
+@pytest.mark.parametrize(
+ 'rtc, expect', (
+ ({'platform': 'skarloey'}, 'localhost'),
+ ({'remote': {'host': 'rheneas'}}, 'localhost'),
+ ({'job': {'batch system': 'loaf'}}, 'localhost'),
+ )
+)
+def test_disable_platforms(rtc, expect):
+ """A sampling of items FORBIDDEN_WITH_PLATFORMS are removed from a
+ config passed to this method.
+ """
+ disable_platforms(rtc)
+ assert rtc['platform'] == expect
+ subdicts = [v for v in rtc.values() if isinstance(v, dict)]
+ for subdict in subdicts:
+ for k, val in subdict.items():
+ if k != 'platform':
+ assert val is None
+
+
+def test_parse_fail_cycle_points(set_cycling_type):
+ before = ['2', '4']
+ set_cycling_type()
+ assert parse_fail_cycle_points(before) == [
+ IntegerPoint(i) for i in before
+ ]
+
+
+@pytest.mark.parametrize(
+ 'conf, point, try_, expect',
+ (
+ param(
+ {'fail cycle points': [], 'fail try 1 only': True},
+ ISO8601Point('1'),
+ 1,
+ False,
+ id='defaults'
+ ),
+ param(
+ {'fail cycle points': None, 'fail try 1 only': True},
+ ISO8601Point('1066'),
+ 1,
+ True,
+ id='fail-all'
+ ),
+ param(
+ {
+ 'fail cycle points': [
+ ISO8601Point('1066'), ISO8601Point('1067')],
+ 'fail try 1 only': True
+ },
+ ISO8601Point('1067'),
+ 1,
+ True,
+ id='point-in-failCP'
+ ),
+ param(
+ {
+ 'fail cycle points': [
+ ISO8601Point('1066'), ISO8601Point('1067')],
+ 'fail try 1 only': True
+ },
+ ISO8601Point('1000'),
+ 1,
+ False,
+ id='point-notin-failCP'
+ ),
+ param(
+ {'fail cycle points': None, 'fail try 1 only': True},
+ ISO8601Point('1066'),
+ 2,
+ False,
+ id='succeed-attempt2'
+ ),
+ param(
+ {'fail cycle points': None, 'fail try 1 only': False},
+ ISO8601Point('1066'),
+ 7,
+ True,
+ id='fail-attempt7'
+ ),
+ )
+)
+def test_sim_task_failed(
+ conf, point, try_, expect, set_cycling_type
+):
+ set_cycling_type('iso8601')
+ assert sim_task_failed(conf, point, try_) == expect