Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test the simulation mode code. #5712

Merged
merged 7 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,10 +1260,17 @@ def get_script_common_text(this: str, example: Optional[str] = None):
- ``all`` - all instance of the task will fail
- ``2017-08-12T06, 2017-08-12T18`` - these instances of
the task will fail

If you set :cylc:conf:`[..][..]execution retry delays`
the second attempt will succeed unless you set
:cylc:conf:`[..]fail try 1 only = False`.
''')
Conf('fail try 1 only', VDR.V_BOOLEAN, True, desc='''
If ``True`` only the first run of the task
instance will fail, otherwise retries will fail too.

Task instances must be set to fail by
:cylc:conf:`[..]fail cycle points`.
''')
Conf('disable task event handlers', VDR.V_BOOLEAN, True,
desc='''
Expand Down
67 changes: 3 additions & 64 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
get_cylc_run_dir,
is_relative_to,
)
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
EventData,
Expand Down Expand Up @@ -521,7 +521,8 @@ def __init__(
self.process_runahead_limit()

if self.run_mode('simulation', 'dummy'):
self.configure_sim_modes()
configure_sim_modes(
self.taskdefs.values(), self.run_mode())

self.configure_workflow_state_polling_tasks()

Expand Down Expand Up @@ -1340,68 +1341,6 @@ def configure_workflow_state_polling_tasks(self):
script = "echo " + comstr + "\n" + comstr
rtc['script'] = script

def configure_sim_modes(self):
"""Adjust task defs for simulation and dummy mode."""
for tdef in self.taskdefs.values():
# Compute simulated run time by scaling the execution limit.
rtc = tdef.rtconfig
limit = rtc['execution time limit']
speedup = rtc['simulation']['speedup factor']
if limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)
else:
sleep_sec = DurationParser().parse(
str(rtc['simulation']['default run length'])
).get_seconds()
rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)
rtc['job']['simulated run length'] = sleep_sec

# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
scr = "sleep %d" % sleep_sec
# Dummy message outputs.
for msg in rtc['outputs'].values():
scr += "\ncylc message '%s'" % msg
if rtc['simulation']['fail try 1 only']:
arg1 = "true"
else:
arg1 = "false"
arg2 = " ".join(rtc['simulation']['fail cycle points'])
scr += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2)
rtc['script'] = scr

# Dummy mode jobs should run on platform localhost
# All Cylc 7 config items which conflict with platform are removed.
for section, keys in FORBIDDEN_WITH_PLATFORM.items():
if section in rtc:
for key in keys:
if key in rtc[section]:
rtc[section][key] = None

rtc['platform'] = 'localhost'

# Disable environment, in case it depends on env-script.
rtc['environment'] = {}

# Simulation mode tasks should fail in which cycle points?
f_pts = []
f_pts_orig = rtc['simulation']['fail cycle points']
if 'all' in f_pts_orig:
# None for "fail all points".
f_pts = None
else:
# (And [] for "fail no points".)
for point_str in f_pts_orig:
f_pts.append(get_point(point_str).standardise())
rtc['simulation']['fail cycle points'] = f_pts

def get_parent_lists(self):
return self.runtime['parents']

Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
)
from cylc.flow.profiler import Profiler
from cylc.flow.resources import get_resources
from cylc.flow.simulation import sim_time_check
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.templatevars import eval_var
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
Expand Down Expand Up @@ -1740,7 +1741,11 @@ async def main_loop(self) -> None:
self.pool.set_expired_tasks()
self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
if (
self.pool.config.run_mode('simulation')
and sim_time_check(
self.message_queue, self.pool.get_tasks())
):
# A simulated task state change occurred.
self.reset_inactivity_timer()

Expand Down
219 changes: 219 additions & 0 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Utilities supporting simulation and skip modes
"""

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from time import time

from cylc.flow.cycling.loader import get_point
from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.task_state import (
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
)
from cylc.flow.wallclock import get_current_time_string

from metomi.isodatetime.parsers import DurationParser

if TYPE_CHECKING:
from queue import Queue
from cylc.flow.cycling import PointBase
from cylc.flow.task_proxy import TaskProxy


def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.

"""
dummy_mode = bool(sim_mode == 'dummy')

for tdef in taskdefs:
# Compute simulated run time by scaling the execution limit.
rtc = tdef.rtconfig
sleep_sec = get_simulated_run_len(rtc)

rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)

rtc['simulation']['simulated run length'] = sleep_sec
rtc['submission retry delays'] = [1]

# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
rtc['script'] = build_dummy_script(
rtc, sleep_sec) if dummy_mode else ""

disable_platforms(rtc)

# Disable environment, in case it depends on env-script.
rtc['environment'] = {}

rtc["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtc["simulation"]["fail cycle points"]
)


def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
"""Get simulated run time.

rtc = run time config
"""
limit = rtc['execution time limit']
speedup = rtc['simulation']['speedup factor']
if limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)
else:
sleep_sec = DurationParser().parse(
str(rtc['simulation']['default run length'])
).get_seconds()

return sleep_sec


def build_dummy_script(rtc: Dict[str, Any], sleep_sec: int) -> str:
"""Create fake scripting for dummy mode.

This is for Dummy mode only.
"""
script = "sleep %d" % sleep_sec
# Dummy message outputs.
for msg in rtc['outputs'].values():
script += "\ncylc message '%s'" % msg
if rtc['simulation']['fail try 1 only']:
arg1 = "true"
else:
arg1 = "false"
arg2 = " ".join(rtc['simulation']['fail cycle points'])
script += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2)
return script


def disable_platforms(
rtc: Dict[str, Any]
) -> None:
"""Force platform = localhost

Remove legacy sections [job] and [remote], which would conflict
with setting platforms.

This can be simplified when support for the FORBIDDEN_WITH_PLATFORM
configurations is dropped.
"""
for section, keys in FORBIDDEN_WITH_PLATFORM.items():
if section in rtc:
for key in keys:
if key in rtc[section]:
rtc[section][key] = None
rtc['platform'] = 'localhost'


def parse_fail_cycle_points(
f_pts_orig: List[str]
) -> 'Union[None, List[PointBase]]':
"""Parse `[simulation][fail cycle points]`.

- None for "fail all points".
- Else a list of cycle point objects.

Examples:
>>> this = parse_fail_cycle_points
>>> this(['all']) is None
True
>>> this([])
[]
"""
f_pts: 'Optional[List[PointBase]]'
if 'all' in f_pts_orig:
f_pts = None
else:
f_pts = []
for point_str in f_pts_orig:
f_pts.append(get_point(point_str).standardise())
return f_pts


def sim_time_check(
message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
) -> bool:
"""Check if sim tasks have been "running" for as long as required.

If they have change the task state.

Returns:
True if _any_ simulated task state has changed.
"""
sim_task_state_changed = False
now = time()
for itask in itasks:
if itask.state.status != TASK_STATUS_RUNNING:
continue
# Started time is not set on restart
if itask.summary['started_time'] is None:
itask.summary['started_time'] = now
timeout = (
itask.summary['started_time'] +
itask.tdef.rtconfig['simulation']['simulated run length']
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a real config item, added to the rtconfig['simulation'] dictionary at line 54.

)
if now > timeout:
job_d = itask.tokens.duplicate(job=str(itask.submit_num))
now_str = get_current_time_string()
if sim_task_failed(
itask.tdef.rtconfig['simulation'],
itask.point,
itask.get_try_num()
):
message_queue.put(
TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
)
else:
# Simulate message outputs.
for msg in itask.tdef.rtconfig['outputs'].values():
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', msg)
)
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED)
)
sim_task_state_changed = True
return sim_task_state_changed


def sim_task_failed(
sim_conf: Dict[str, Any],
point: 'PointBase',
try_num: int,
) -> bool:
"""Encapsulate logic for deciding whether a sim task has failed.

Allows Unit testing.
"""
return (
sim_conf['fail cycle points'] is None # i.e. "all"
or point in sim_conf['fail cycle points']
) and (
try_num == 1 or not sim_conf['fail try 1 only']
)
Loading
Loading