Skip to content

Commit

Permalink
Suggestions from review.
Browse files Browse the repository at this point in the history
Co-authored-by: Oliver Sanders <oliver.sanders@metoffice.gov.uk>
  • Loading branch information
wxtim and oliver-sanders committed Sep 24, 2024
1 parent ea7fd7e commit de921ee
Show file tree
Hide file tree
Showing 35 changed files with 433 additions and 330 deletions.
7 changes: 4 additions & 3 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@
from cylc.flow.platforms import (
fail_if_platform_and_host_conflict, get_platform_deprecated_settings,
is_platform_definition_subshell)
from cylc.flow.run_modes import RunMode
from cylc.flow.task_events_mgr import EventData
from cylc.flow.task_state import RunMode
from cylc.flow.run_modes import TASK_CONFIG_RUN_MODES


# Regex to check whether a string is a command
Expand Down Expand Up @@ -1338,8 +1339,8 @@ def get_script_common_text(this: str, example: Optional[str] = None):
)
Conf(
'run mode', VDR.V_STRING,
options=list(RunMode.OVERRIDING_MODES.value) + [''],
default='',
options=list(TASK_CONFIG_RUN_MODES),
default=RunMode.LIVE.value,
desc=f'''
For a workflow run in live mode run this task in skip
mode.
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@
from cylc.flow.log_level import log_level_to_verbosity
from cylc.flow.network.schema import WorkflowStopMode
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.run_modes import RunMode
from cylc.flow.task_id import TaskID
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED, RunMode)
TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED)
from cylc.flow.workflow_status import StopMode

from metomi.isodatetime.parsers import TimePointParser
Expand Down
11 changes: 5 additions & 6 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
get_trigger_completion_variable_maps,
trigger_to_completion_variable,
)
from cylc.flow.task_state import RunMode
from cylc.flow.run_modes import RunMode
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.taskdef import TaskDef
from cylc.flow.unicode_rules import (
Expand Down Expand Up @@ -1738,10 +1738,6 @@ def process_config_env(self):
]
)

def run_mode(self) -> str:
"""Return the run mode."""
return RunMode.get(self.options)

def _check_task_event_handlers(self):
"""Check custom event handler templates can be expanded.
Expand Down Expand Up @@ -2493,7 +2489,10 @@ def _get_taskdef(self, name: str) -> TaskDef:

# Get the taskdef object for generating the task proxy class
taskd = TaskDef(
name, rtcfg, self.run_mode(), self.start_point,
name,
rtcfg,
RunMode.get(self.options),
self.start_point,
self.initial_point)

# TODO - put all taskd.foo items in a single config dict
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
pdeepcopy,
poverride
)
from cylc.flow.run_modes import RunMode
from cylc.flow.workflow_status import (
get_workflow_status,
get_workflow_status_msg,
Expand Down Expand Up @@ -699,8 +700,7 @@ def generate_definition_elements(self):
time_zone_info = TIME_ZONE_LOCAL_INFO
for key, val in time_zone_info.items():
setbuff(workflow.time_zone_info, key, val)

workflow.run_mode = config.run_mode()
workflow.run_mode = RunMode.get(config.options)
workflow.cycling_mode = config.cfg['scheduling']['cycling mode']
workflow.workflow_log_dir = self.schd.workflow_log_dir
workflow.job_log_names.extend(list(JOB_LOG_OPTS.values()))
Expand Down
16 changes: 8 additions & 8 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@
)
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE
from cylc.flow.id import Tokens
from cylc.flow.run_modes import (
TASK_CONFIG_RUN_MODES, WORKFLOW_RUN_MODES, RunMode)
from cylc.flow.task_outputs import SORT_ORDERS
from cylc.flow.task_state import (
RunMode,
TASK_STATUSES_ORDERED,
TASK_STATUS_DESC,
TASK_STATUS_WAITING,
Expand Down Expand Up @@ -605,20 +606,19 @@ def describe_run_mode(run_mode: Optional['Enum']) -> str:
return getattr(RunMode, run_mode.value.upper()).__doc__


# The run mode for the workflow.
WorkflowRunMode = graphene.Enum(
'WorkflowRunMode',
[(m.capitalize(), m) for m in RunMode.WORKFLOW_MODES.value],
description=describe_run_mode,
[(m.capitalize(), m) for m in WORKFLOW_RUN_MODES],
description=lambda x: RunMode(x.value).describe() if x else None,
)
"""The run mode for the workflow."""


# The run mode for the task.
TaskRunMode = graphene.Enum(
'TaskRunMode',
[(m.capitalize(), m) for m in RunMode.WORKFLOW_MODES.value],
description=describe_run_mode,
[(m.capitalize(), m) for m in TASK_CONFIG_RUN_MODES],
description=lambda x: RunMode(x.value).describe() if x else None,
)
"""The run mode for tasks."""


class Workflow(ObjectType):
Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
PlatformLookupError, CylcError, NoHostsError, NoPlatformsError)
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.hostuserutil import is_remote_host
from cylc.flow.task_state import RunMode
from cylc.flow.run_modes import JOBLESS_MODES

if TYPE_CHECKING:
from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
Expand Down Expand Up @@ -267,7 +267,7 @@ def platform_from_name(
return platform_data

# If platform name in run mode and not otherwise defined:
if platform_name in RunMode.JOBLESS_MODES.value:
if platform_name in JOBLESS_MODES:
return platforms['localhost']

raise PlatformLookupError(
Expand Down Expand Up @@ -652,7 +652,7 @@ def get_install_target_to_platforms_map(
Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...}
"""
ret: Dict[str, List[Dict[str, Any]]] = {}
for p_name in set(platform_names) - set(RunMode.JOBLESS_MODES.value):
for p_name in set(platform_names) - set(JOBLESS_MODES):
try:
platform = platform_from_name(p_name)
except PlatformLookupError as exc:
Expand All @@ -665,10 +665,10 @@ def get_install_target_to_platforms_map(
# Map jobless modes to localhost.
if 'localhost' in ret:
ret['localhost'] += [
{'name': mode} for mode in RunMode.JOBLESS_MODES.value]
{'name': mode} for mode in JOBLESS_MODES]
else:
ret['localhost'] = [
{'name': mode} for mode in RunMode.JOBLESS_MODES.value]
{'name': mode} for mode in JOBLESS_MODES]
return ret


Expand Down
1 change: 1 addition & 0 deletions cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def satisfy_me(
"""
satisfied_message: SatisfiedState

if mode != 'live':
satisfied_message = self.DEP_STATE_SATISFIED_BY.format(
mode) # type: ignore
Expand Down
141 changes: 141 additions & 0 deletions cylc/flow/run_modes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from enum import Enum
from typing import TYPE_CHECKING, Callable, Optional, Tuple

if TYPE_CHECKING:
from optparse import Values
from cylc.flow.task_job_mgr import TaskJobManager
from cylc.flow.task_proxy import TaskProxy

# The interface for submitting jobs
SubmissionInterface = Callable[
[ # Args:
# the task job manager instance
'TaskJobManager',
# the task to submit
'TaskProxy',
# the task's runtime config (with broadcasts applied)
dict,
# the workflow ID
str,
# the current time as (float_unix_time, str_ISO8601)
Tuple[float, str]
],
# Return False if the job requires live-mode submission
# (dummy mode does this), else return True.
bool
]


class RunMode(Enum):
"""The possible run modes of a task/workflow."""

LIVE = 'live'
"""Task will run normally."""

SIMULATION = 'simulation'
"""Simulates job submission with configurable exection time
and succeeded/failed outcomes(but does not submit real jobs)."""

DUMMY = 'dummy'
"""Submits real jobs with empty scripts."""

SKIP = 'skip'
"""Skips job submission; sets required outputs (by default) or
configured outputs."""

def describe(self):
"""Return user friendly description of run mode.
For use by configuration spec documenter.
"""
if self == self.LIVE:
return "Task will run normally."
if self == self.SKIP:
return (
"Skips job submission; sets required outputs"
" (by default) or configured outputs.")
if self == self.DUMMY:
return "Submits real jobs with empty scripts."
if self == self.SIMULATION:
return (
"Simulates job submission with configurable"
" exection time and succeeded/failed outcomes"
"(but does not submit real jobs).")
raise KeyError(f'No description for {self}.')

@staticmethod
def get(options: 'Values') -> str:
"""Return the workflow run mode from the options."""
if hasattr(options, 'run_mode') and options.run_mode:
return options.run_mode
else:
return RunMode.LIVE.value

def get_submit_method(self) -> 'Optional[SubmissionInterface]':
"""Return the job submission method for this run mode.
This returns None for live-mode jobs as these use a
different code pathway for job submission.
"""
if self == RunMode.DUMMY:
from cylc.flow.run_modes.dummy import (
submit_task_job as dummy_submit_task_job)
return dummy_submit_task_job
elif self == RunMode.SIMULATION:
from cylc.flow.run_modes.simulation import (
submit_task_job as simulation_submit_task_job)
return simulation_submit_task_job
elif self == RunMode.SKIP:
from cylc.flow.run_modes.skip import (
submit_task_job as skip_submit_task_job)
return skip_submit_task_job
return None


def disable_task_event_handlers(itask: 'TaskProxy'):
"""Should we disable event handlers for this task?
No event handlers in simulation mode, or in skip mode
if we don't deliberately enable them:
"""
mode = itask.run_mode
return (
mode == RunMode.SIMULATION.value
or (
mode == RunMode.SKIP.value
and itask.platform.get(
'disable task event handlers', False)
)
)


# Modes available for running a whole workflow:
WORKFLOW_RUN_MODES = frozenset(i.value for i in {
RunMode.LIVE, RunMode.DUMMY, RunMode.SIMULATION})

# Modes which can be set in task config:
TASK_CONFIG_RUN_MODES = frozenset(
i.value for i in (RunMode.LIVE, RunMode.SKIP))
# And those only available to the workflow:
WORKFLOW_ONLY_MODES = frozenset(
i.value for i in RunMode) - TASK_CONFIG_RUN_MODES

# Modes which completely ignore the standard submission path:
JOBLESS_MODES = frozenset(i.value for i in {
RunMode.SKIP, RunMode.SIMULATION})
2 changes: 1 addition & 1 deletion cylc/flow/run_modes/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
get_simulated_run_len,
parse_fail_cycle_points
)
from cylc.flow.task_state import RunMode
from cylc.flow.run_modes import RunMode
from cylc.flow.platforms import get_platform


Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/run_modes/nonlive.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from cylc.flow import LOG
from cylc.flow.run_modes.skip import check_task_skip_config
from cylc.flow.task_state import RunMode
from cylc.flow.run_modes import RunMode

if TYPE_CHECKING:
from cylc.flow.taskdef import TaskDef
Expand Down
15 changes: 11 additions & 4 deletions cylc/flow/run_modes/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
TASK_STATUS_SUCCEEDED,
)
from cylc.flow.wallclock import get_unix_time_from_time_string
from cylc.flow.task_state import RunMode
from cylc.flow.run_modes import RunMode


if TYPE_CHECKING:
Expand Down Expand Up @@ -73,8 +73,12 @@ def submit_task_job(
itask.submit_num += 1

itask.platform = {
'name': RunMode.SIMULATION.value, 'install target': 'localhost'}
itask.platform['name'] = RunMode.SIMULATION.value
'name': RunMode.SIMULATION.value,
'install target': 'localhost',
'hosts': ['localhost'],
'disable task event handlers':
rtconfig['simulation']['disable task event handlers'],
}
itask.summary['job_runner_name'] = RunMode.SIMULATION.value
itask.summary[task_job_mgr.KEY_EXECUTE_TIME_LIMIT] = (
itask.mode_settings.simulated_run_length
Expand Down Expand Up @@ -311,7 +315,10 @@ def sim_time_check(
for itask in itasks:
if (
itask.state.status != TASK_STATUS_RUNNING
or itask.run_mode and itask.run_mode != RunMode.SIMULATION.value
or (
itask.run_mode
and itask.run_mode != RunMode.SIMULATION.value
)
):
continue

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
TASK_OUTPUT_FAILED,
TASK_OUTPUT_STARTED
)
from cylc.flow.task_state import RunMode
from cylc.flow.run_modes import RunMode

if TYPE_CHECKING:
from cylc.flow.taskdef import TaskDef
Expand All @@ -39,6 +39,7 @@ def submit_task_job(
task_job_mgr: 'TaskJobManager',
itask: 'TaskProxy',
rtconfig: Dict,
_workflow: str,
now: Tuple[float, str]
) -> 'Literal[True]':
"""Submit a task in skip mode.
Expand All @@ -63,7 +64,6 @@ def submit_task_job(
rtconfig['skip']['disable task event handlers'],
'execution polling intervals': []
}
itask.platform['name'] = RunMode.SKIP.value
itask.summary['job_runner_name'] = RunMode.SKIP.value
itask.run_mode = RunMode.SKIP.value
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
Expand Down
Loading

0 comments on commit de921ee

Please sign in to comment.