Skip to content

Commit

Permalink
Merge pull request #6040 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
wxtim authored Mar 27, 2024
2 parents 19aa44e + 0831c79 commit 9eabebb
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 74 deletions.
24 changes: 7 additions & 17 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
WorkflowFiles,
check_deprecation,
)
from cylc.flow.workflow_status import RunMode
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
Expand Down Expand Up @@ -521,9 +522,9 @@ def __init__(

self.process_runahead_limit()

if self.run_mode('simulation', 'dummy'):
configure_sim_modes(
self.taskdefs.values(), self.run_mode())
run_mode = self.run_mode()
if run_mode in {RunMode.SIMULATION, RunMode.DUMMY}:
configure_sim_modes(self.taskdefs.values(), run_mode)

self.configure_workflow_state_polling_tasks()

Expand Down Expand Up @@ -1494,20 +1495,9 @@ def process_config_env(self):
os.environ['PATH'] = os.pathsep.join([
os.path.join(self.fdir, 'bin'), os.environ['PATH']])

def run_mode(self, *reqmodes):
"""Return the run mode.
Combine command line option with configuration setting.
If "reqmodes" is specified, return the boolean (mode in reqmodes).
Otherwise, return the mode as a str.
"""
mode = getattr(self.options, 'run_mode', None)
if not mode:
mode = 'live'
if reqmodes:
return mode in reqmodes
else:
return mode
def run_mode(self) -> str:
"""Return the run mode."""
return RunMode.get(self.options)

def _check_task_event_handlers(self):
"""Check custom event handler templates can be expanded.
Expand Down
13 changes: 0 additions & 13 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,19 +617,6 @@ def select_workflow_params_restart_count(self):
result = self.connect().execute(stmt).fetchone()
return int(result[0]) if result else 0

def select_workflow_params_run_mode(self):
"""Return original run_mode for workflow_params."""
stmt = rf"""
SELECT
value
FROM
{self.TABLE_WORKFLOW_PARAMS}
WHERE
key == 'run_mode'
""" # nosec (table name is code constant)
result = self.connect().execute(stmt).fetchone()
return result[0] if result else None

def select_workflow_template_vars(self, callback):
"""Select from workflow_template_vars.
Expand Down
53 changes: 22 additions & 31 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
from cylc.flow.templatevars import eval_var
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.workflow_events import WorkflowEventHandler
from cylc.flow.workflow_status import StopMode, AutoRestartMode
from cylc.flow.workflow_status import RunMode, StopMode, AutoRestartMode
from cylc.flow import workflow_files
from cylc.flow.taskdef import TaskDef
from cylc.flow.task_events_mgr import TaskEventsManager
Expand Down Expand Up @@ -425,7 +425,14 @@ async def configure(self, params):
self._check_startup_opts()

if self.is_restart:
run_mode = self.get_run_mode()
self._set_workflow_params(params)
# Prevent changing run mode on restart:
og_run_mode = self.get_run_mode()
if run_mode != og_run_mode:
raise InputError(
f'This workflow was originally run in {og_run_mode} mode:'
f' Will not restart in {run_mode} mode.')

self.profiler.log_memory("scheduler.py: before load_flow_file")
try:
Expand All @@ -435,18 +442,6 @@ async def configure(self, params):
# Mark this exc as expected (see docstring for .schd_expected):
exc.schd_expected = True
raise exc

# Prevent changing mode on restart.
if self.is_restart:
# check run mode against db
og_run_mode = self.workflow_db_mgr.get_pri_dao(
).select_workflow_params_run_mode() or 'live'
run_mode = self.config.run_mode()
if run_mode != og_run_mode:
raise InputError(
f'This workflow was originally run in {og_run_mode} mode:'
f' Will not restart in {run_mode} mode.')

self.profiler.log_memory("scheduler.py: after load_flow_file")

self.workflow_db_mgr.on_workflow_start(self.is_restart)
Expand Down Expand Up @@ -605,7 +600,7 @@ def log_start(self) -> None:
# Note that the following lines must be present at the top of
# the workflow log file for use in reference test runs.
LOG.info(
f'Run mode: {self.config.run_mode()}',
f'Run mode: {self.get_run_mode()}',
extra=RotatingLogFileHandler.header_extra
)
LOG.info(
Expand Down Expand Up @@ -1053,7 +1048,7 @@ def command_resume(self) -> None:

def command_poll_tasks(self, tasks: Iterable[str]) -> int:
"""Poll pollable tasks or a task or family if options are provided."""
if self.config.run_mode('simulation'):
if self.get_run_mode() == RunMode.SIMULATION:
return 0
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
self.task_job_mgr.poll_task_jobs(self.workflow, itasks)
Expand All @@ -1062,7 +1057,7 @@ def command_poll_tasks(self, tasks: Iterable[str]) -> int:
def command_kill_tasks(self, tasks: Iterable[str]) -> int:
"""Kill all tasks or a task/family if options are provided."""
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
if self.config.run_mode('simulation'):
if self.get_run_mode() == RunMode.SIMULATION:
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(TASK_STATUS_FAILED)
Expand Down Expand Up @@ -1360,6 +1355,9 @@ def _set_workflow_params(
"""
LOG.info('LOADING workflow parameters')
for key, value in params:
if key == self.workflow_db_mgr.KEY_RUN_MODE:
self.options.run_mode = value or RunMode.LIVE
LOG.info(f"+ run mode = {value}")
if value is None:
continue
if key in self.workflow_db_mgr.KEY_INITIAL_CYCLE_POINT_COMPATS:
Expand All @@ -1380,12 +1378,6 @@ def _set_workflow_params(
elif self.options.stopcp is None:
self.options.stopcp = value
LOG.info(f"+ stop point = {value}")
elif (
key == self.workflow_db_mgr.KEY_RUN_MODE
and self.options.run_mode is None
):
self.options.run_mode = value
LOG.info(f"+ run mode = {value}")
elif key == self.workflow_db_mgr.KEY_UUID_STR:
self.uuid_str = value
LOG.info(f"+ workflow UUID = {value}")
Expand Down Expand Up @@ -1431,12 +1423,8 @@ def run_event_handlers(self, event, reason=""):
Run workflow events in simulation and dummy mode ONLY if enabled.
"""
conf = self.config
with suppress(KeyError):
if (
conf.run_mode('simulation', 'dummy')
):
return
if self.get_run_mode() in {RunMode.SIMULATION, RunMode.DUMMY}:
return
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self) -> bool:
Expand Down Expand Up @@ -1509,7 +1497,7 @@ def release_queued_tasks(self) -> bool:
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
is_simulation=self.config.run_mode('simulation')
is_simulation=(self.get_run_mode() == RunMode.SIMULATION)
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
Expand Down Expand Up @@ -1560,7 +1548,7 @@ def timeout_check(self):
"""Check workflow and task timers."""
self.check_workflow_timers()
# check submission and execution timeout and polling timers
if not self.config.run_mode('simulation'):
if self.get_run_mode() != RunMode.SIMULATION:
self.task_job_mgr.check_task_jobs(self.workflow, self.pool)

async def workflow_shutdown(self):
Expand Down Expand Up @@ -1759,7 +1747,7 @@ async def _main_loop(self) -> None:
self.release_queued_tasks()

if (
self.pool.config.run_mode('simulation')
self.get_run_mode() == RunMode.SIMULATION
and sim_time_check(
self.task_events_mgr,
self.pool.get_tasks(),
Expand Down Expand Up @@ -2252,6 +2240,9 @@ def _check_startup_opts(self) -> None:
f"option --{opt}=reload is only valid for restart"
)

def get_run_mode(self) -> str:
return RunMode.get(self.options)

async def handle_exception(self, exc: BaseException) -> NoReturn:
"""Gracefully shut down the scheduler given a caught exception.
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
is_terminal,
prompt,
)
from cylc.flow.workflow_status import RunMode

if TYPE_CHECKING:
from optparse import Values
Expand Down Expand Up @@ -130,7 +131,7 @@
["-m", "--mode"],
help="Run mode: live, dummy, simulation (default live).",
metavar="STRING", action='store', dest="run_mode",
choices=['live', 'dummy', 'simulation'],
choices=[RunMode.LIVE, RunMode.DUMMY, RunMode.SIMULATION],
)

PLAY_RUN_MODE = deepcopy(RUN_MODE)
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/scripts/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from cylc.flow.templatevars import get_template_vars
from cylc.flow.terminal import cli_function
from cylc.flow.scheduler_cli import RUN_MODE
from cylc.flow.workflow_status import RunMode

if TYPE_CHECKING:
from cylc.flow.option_parsers import Values
Expand Down Expand Up @@ -126,7 +127,7 @@ def get_option_parser():
{
'check_circular': False,
'profile_mode': False,
'run_mode': 'live'
'run_mode': RunMode.LIVE
}
)

Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from time import time

from metomi.isodatetime.parsers import DurationParser

from cylc.flow import LOG
from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import PointParsingError
Expand All @@ -30,8 +32,8 @@
TASK_STATUS_SUCCEEDED,
)
from cylc.flow.wallclock import get_unix_time_from_time_string
from cylc.flow.workflow_status import RunMode

from metomi.isodatetime.parsers import DurationParser

if TYPE_CHECKING:
from cylc.flow.task_events_mgr import TaskEventsManager
Expand Down Expand Up @@ -134,7 +136,7 @@ def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.
"""
dummy_mode = bool(sim_mode == 'dummy')
dummy_mode = (sim_mode == RunMode.DUMMY)

for tdef in taskdefs:
# Compute simulated run time by scaling the execution limit.
Expand Down
14 changes: 8 additions & 6 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
get_template_variables as get_workflow_template_variables,
process_mail_footer,
)
from cylc.flow.workflow_status import RunMode


if TYPE_CHECKING:
Expand Down Expand Up @@ -767,7 +768,7 @@ def process_message(

# ... but either way update the job ID in the job proxy (it only
# comes in via the submission message).
if itask.tdef.run_mode != 'simulation':
if itask.tdef.run_mode != RunMode.SIMULATION:
job_tokens = itask.tokens.duplicate(
job=str(itask.submit_num)
)
Expand Down Expand Up @@ -885,7 +886,8 @@ def _process_message_check(

if (
itask.state(TASK_STATUS_WAITING)
and itask.tdef.run_mode == 'live' # Polling in live mode only.
# Polling in live mode only:
and itask.tdef.run_mode == RunMode.LIVE
and (
(
# task has a submit-retry lined up
Expand Down Expand Up @@ -930,7 +932,7 @@ def _process_message_check(

def setup_event_handlers(self, itask, event, message):
"""Set up handlers for a task event."""
if itask.tdef.run_mode != 'live':
if itask.tdef.run_mode != RunMode.LIVE:
return
msg = ""
if message != f"job {event}":
Expand Down Expand Up @@ -1455,7 +1457,7 @@ def _process_message_submitted(
)

itask.set_summary_time('submitted', event_time)
if itask.tdef.run_mode == 'simulation':
if itask.tdef.run_mode == RunMode.SIMULATION:
# Simulate job started as well.
itask.set_summary_time('started', event_time)
if itask.state_reset(TASK_STATUS_RUNNING, forced=forced):
Expand Down Expand Up @@ -1492,7 +1494,7 @@ def _process_message_submitted(
'submitted',
event_time,
)
if itask.tdef.run_mode == 'simulation':
if itask.tdef.run_mode == RunMode.SIMULATION:
# Simulate job started as well.
self.data_store_mgr.delta_job_time(
job_tokens,
Expand Down Expand Up @@ -1525,7 +1527,7 @@ def _insert_task_job(
# not see previous submissions (so can't use itask.jobs[submit_num-1]).
# And transient tasks, used for setting outputs and spawning children,
# do not submit jobs.
if itask.tdef.run_mode == "simulation" or forced:
if (itask.tdef.run_mode == RunMode.SIMULATION) or forced:
job_conf = {"submit_num": 0}
else:
job_conf = itask.jobs[-1]
Expand Down
19 changes: 19 additions & 0 deletions cylc/flow/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from cylc.flow.wallclock import get_time_string_from_unix_time as time2str

if TYPE_CHECKING:
from optparse import Values
from cylc.flow.scheduler import Scheduler

# Keys for identify API call
Expand Down Expand Up @@ -198,3 +199,21 @@ def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]:
status_msg = 'running'

return (status.value, status_msg)


class RunMode:
"""The possible run modes of a workflow."""

LIVE = 'live'
"""Workflow will run normally."""

SIMULATION = 'simulation'
"""Workflow will run in simulation mode."""

DUMMY = 'dummy'
"""Workflow will run in dummy mode."""

@staticmethod
def get(options: 'Values') -> str:
"""Return the run mode from the options."""
return getattr(options, 'run_mode', None) or RunMode.LIVE
Loading

0 comments on commit 9eabebb

Please sign in to comment.