From 9717930438c0dcff3a61d2efa2c96e8ca95794cc Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 8 Aug 2022 15:41:15 +0100 Subject: [PATCH 1/4] reload: preserve xtriggers on reload (#5045) * 8.0.x version of #5040 * Closes https://github.com/cylc/cylc-flow/issues/4866 * Reload causes new instances of each task to be created. * The attributes of the old instances are copied onto the new ones. * Xtriggers were not copied so were lost. * This means that all xtriggers were effectively deleted on reload. --- cylc/flow/task_proxy.py | 7 +++ tests/functional/reload/25-xtriggers.t | 72 ++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 tests/functional/reload/25-xtriggers.t diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 966b17ad74b..b5511ffb188 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -284,6 +284,13 @@ def copy_to_reload_successor(self, reload_successor): reload_successor.state.is_runahead = self.state.is_runahead reload_successor.state.is_updated = self.state.is_updated reload_successor.state.prerequisites = self.state.prerequisites + reload_successor.state.xtriggers.update({ + # copy across any special "_cylc" xtriggers which were added + # dynamically at runtime (i.e. execution retry xtriggers) + key: value + for key, value in self.state.xtriggers.items() + if key.startswith('_cylc') + }) reload_successor.jobs = self.jobs @staticmethod diff --git a/tests/functional/reload/25-xtriggers.t b/tests/functional/reload/25-xtriggers.t new file mode 100644 index 00000000000..0eecc13590d --- /dev/null +++ b/tests/functional/reload/25-xtriggers.t @@ -0,0 +1,72 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Ensure that xtriggers are preserved after reloads +# See https://github.com/cylc/cylc-flow/issues/4866 + +. "$(dirname "$0")/test_header" + +set_test_number 6 + +init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' +[scheduling] + [[graph]] + R1 = """ + broken + reload + """ + +[runtime] + [[broken]] + script = false + # should be long enough for the reload to complete + # (annoyingly we can't make this event driven) + execution retry delays = PT1M + # NOTE: "execution retry delays" is implemented as an xtrigger + + [[reload]] + script = """ + # wait for "broken" to fail + cylc__job__poll_grep_workflow_log \ + '1/broken .* (received)failed/ERR' + # fix "broken" to allow it to pass + sed -i 's/false/true/' "${CYLC_WORKFLOW_RUN_DIR}/flow.cylc" + # reload the workflow + cylc reload "${CYLC_WORKFLOW_ID}" + """ +__FLOW_CONFIG__ + +run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach + +# ensure the following order of events +# 1. "1/broken" fails +# 2. workflow is reloaded (by "1/reload") +# 3. the retry xtrigger for "1/broken" becomes satisfied (after the reload) +# (thus proving that the xtrigger survived the reload) +# 4. "1/broken" succeeds +log_scan "${TEST_NAME_BASE}-scan" \ + "$(cylc cat-log -m p "${WORKFLOW_NAME}")" \ + 1 1 \ + '1/broken .* (received)failed/ERR' \ + 'Command succeeded: reload_workflow()' \ + 'xtrigger satisfied: _cylc_retry_1/broken' \ + '1/broken .* (received)succeeded' + +purge +exit From 2f2f0e77bcdfddc0fc8fbf1f9382b8906a0e9bdb Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 12 Aug 2022 10:12:35 +0100 Subject: [PATCH 2/4] fix flake8 (#5060) --- cylc/flow/network/__init__.py | 4 ++-- cylc/flow/parsec/fileparse.py | 2 +- cylc/flow/tui/app.py | 2 +- cylc/flow/workflow_files.py | 3 ++- cylc/flow/xtriggers/echo.py | 3 ++- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index e456de3fb12..e12b1d5ef61 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -179,13 +179,13 @@ def _socket_bind(self, min_port, max_port, srv_prv_key_loc=None): try: server_public_key, server_private_key = zmq.auth.load_certificate( srv_prv_key_info.full_key_path) - except (ValueError): + except ValueError: raise ServiceFileError( f"Failed to find server's public " f"key in " f"{srv_prv_key_info.full_key_path}." ) - except(OSError): + except OSError: raise ServiceFileError( f"IO error opening server's private " f"key from " diff --git a/cylc/flow/parsec/fileparse.py b/cylc/flow/parsec/fileparse.py index 326aed43a62..f3e93311878 100644 --- a/cylc/flow/parsec/fileparse.py +++ b/cylc/flow/parsec/fileparse.py @@ -292,7 +292,7 @@ def process_plugins(fpath, opts): f"{extra_vars['templating_detected']} and " f"{plugin_result['templating_detected']}" ) - elif( + elif ( 'templating_detected' in plugin_result and plugin_result['templating_detected'] is not None ): diff --git a/cylc/flow/tui/app.py b/cylc/flow/tui/app.py index 484fd3fc610..fd05a613443 100644 --- a/cylc/flow/tui/app.py +++ b/cylc/flow/tui/app.py @@ -316,7 +316,7 @@ def get_snapshot(self): # Distinguish stopped flow from non-existent flow. self.client = None full_path = Path(get_workflow_run_dir(self.reg)) - if( + if ( (full_path / WorkflowFiles.SUITE_RC).is_file() or (full_path / WorkflowFiles.FLOW_FILE).is_file() ): diff --git a/cylc/flow/workflow_files.py b/cylc/flow/workflow_files.py index 344da8b5130..d0b30f826db 100644 --- a/cylc/flow/workflow_files.py +++ b/cylc/flow/workflow_files.py @@ -284,7 +284,8 @@ class ContactFileFields: """The process ID of the running workflow on ``CYLC_WORKFLOW_HOST``.""" COMMAND = 'CYLC_WORKFLOW_COMMAND' - """The command that was used to run the workflow on ``CYLC_WORKFLOW_HOST```. + """The command that was used to run the workflow on + ``CYLC_WORKFLOW_HOST```. Note that this command may be affected by: diff --git a/cylc/flow/xtriggers/echo.py b/cylc/flow/xtriggers/echo.py index eb60bb2be4d..e3c5880cab0 100644 --- a/cylc/flow/xtriggers/echo.py +++ b/cylc/flow/xtriggers/echo.py @@ -20,7 +20,8 @@ def echo(*args, **kwargs): - """Prints args to stdout and return success only if kwargs['succeed'] is True. + """Prints args to stdout and return success only if kwargs['succeed'] + is True. This may be a useful aid to understanding how xtriggers work. From 2e7a3bb7d7ce45dc9be5b3754a512a9af8c05234 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Fri, 12 Aug 2022 12:40:11 +0100 Subject: [PATCH 3/4] Fix auto restart bugs (#5049) * Fix lack of abort for main loop auto restart in no-detach mode * Tidy & add type annotations * Fix flaky spurious log rollover on auto restart * Remember verbosity on auto restart * Prevent exceptions during auto restart from being swallowed * Update changelog * De-flake test --- CHANGES.md | 3 + cylc/flow/loggingutil.py | 139 ++++++++---------- cylc/flow/main_loop/auto_restart.py | 3 +- cylc/flow/scheduler.py | 91 +++++++----- cylc/flow/task_remote_mgr.py | 7 +- cylc/flow/workflow_files.py | 7 +- tests/conftest.py | 3 +- tests/functional/lib/bash/test_header | 26 +++- .../restart/34-auto-restart-basic.t | 19 ++- .../restart/41-auto-restart-local-jobs.t | 13 +- tests/integration/main_loop/__init__.py | 0 .../main_loop/test_auto_restart.py | 50 +++++++ tests/integration/test_scheduler.py | 30 +++- tests/unit/main_loop/test_auto_restart.py | 12 +- 14 files changed, 254 insertions(+), 149 deletions(-) create mode 100644 tests/integration/main_loop/__init__.py create mode 100644 tests/integration/main_loop/test_auto_restart.py diff --git a/CHANGES.md b/CHANGES.md index 544f13d3264..d4ed7b70469 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -49,6 +49,9 @@ cycle point validation in the UI. [#5037](https://github.com/cylc/cylc-flow/pull/5037) - Fix bug where the workflow restart number would get wiped on reload. +[#5049](https://github.com/cylc/cylc-flow/pull/5049) - Fix several small +bugs related to auto restart. + ------------------------------------------------------------------------------- ## __cylc-8.0.0 (Released 2022-07-28)__ diff --git a/cylc/flow/loggingutil.py b/cylc/flow/loggingutil.py index 26c136e95ab..2d99c225713 100644 --- a/cylc/flow/loggingutil.py +++ b/cylc/flow/loggingutil.py @@ -126,21 +126,19 @@ class RotatingLogFileHandler(logging.FileHandler): for names. Argument: - log_file_path: path to the log file + log_file_path: path to the log file (symlink to latest log file) no_detach: non-detach mode? restart_num: restart number for the run timestamp: Add timestamp to log formatting? """ FILE_HEADER_FLAG = 'cylc_log_file_header' - FILE_NUM = 'cylc_log_num' + ROLLOVER_NUM = 'cylc_log_num' MIN_BYTES = 1024 - extra = {FILE_HEADER_FLAG: True} - extra_num = { - FILE_HEADER_FLAG: True, - FILE_NUM: 1 - } + header_extra = {FILE_HEADER_FLAG: True} + """Use to indicate the log msg is a header that should be logged on + every rollover""" def __init__( self, @@ -152,9 +150,11 @@ def __init__( logging.FileHandler.__init__(self, log_file_path) self.no_detach = no_detach self.formatter = CylcLogFormatter(timestamp=timestamp) + # Header records get appended to when calling + # LOG.info(extra=RotatingLogFileHandler.[rollover_]header_extra) self.header_records: List[logging.LogRecord] = [] self.restart_num = restart_num - self.log_num: Optional[int] = None + self.log_num: Optional[int] = None # null value until log file created def emit(self, record): """Emit a record, rollover log if necessary.""" @@ -169,16 +169,14 @@ def emit(self, record): except Exception: self.handleError(record) - def load_type_change(self): + def load_type_change(self) -> bool: """Has there been a load-type change, e.g. restart?""" - current_load_type = self.get_load_type() existing_load_type = self.existing_log_load_type() - # Rollover if the load type has changed. - if existing_load_type and current_load_type != existing_load_type: + if existing_load_type and self.load_type != existing_load_type: return True return False - def existing_log_load_type(self): + def existing_log_load_type(self) -> Optional[str]: """Return a log load type, if one currently exists""" try: existing_log_name = os.readlink(self.baseFilename) @@ -188,12 +186,11 @@ def existing_log_load_type(self): for load_type in [RESTART_LOAD_TYPE, START_LOAD_TYPE]: if existing_log_name.find(load_type) > 0: return load_type + return None - def should_rollover(self, record): + def should_rollover(self, record: logging.LogRecord) -> bool: """Should rollover?""" - if (self.stream is None or - self.load_type_change() or - self.log_num is None): + if self.log_num is None or self.stream is None: return True max_bytes = glbl_cfg().get( ['scheduler', 'logging', 'maximum size in bytes']) @@ -208,26 +205,18 @@ def should_rollover(self, record): raise SystemExit(exc) return self.stream.tell() + len(msg.encode('utf8')) >= max_bytes - def get_load_type(self): + @property + def load_type(self) -> str: """Establish current load type, as perceived by scheduler.""" if self.restart_num > 0: return RESTART_LOAD_TYPE return START_LOAD_TYPE - def do_rollover(self): + def do_rollover(self) -> None: """Create and rollover log file if necessary.""" - # Generate new file name - filename = self.get_new_log_filename() - os.makedirs(os.path.dirname(filename), exist_ok=True) - # Touch file - with open(filename, 'w+'): - os.utime(filename, None) - # Update symlink - if (os.path.exists(self.baseFilename) or - os.path.lexists(self.baseFilename)): - os.unlink(self.baseFilename) - os.symlink(os.path.basename(filename), self.baseFilename) - # Housekeep log files + # Create new log file + self.new_log_file() + # Housekeep old log files arch_len = glbl_cfg().get( ['scheduler', 'logging', 'rolling archive length']) if arch_len: @@ -235,7 +224,6 @@ def do_rollover(self): # Reopen stream, redirect STDOUT and STDERR to log if self.stream: self.stream.close() - self.stream = None self.stream = self._open() # Dup STDOUT and STDERR in detach mode if not self.no_detach: @@ -243,15 +231,15 @@ def do_rollover(self): os.dup2(self.stream.fileno(), sys.stderr.fileno()) # Emit header records (should only do this for subsequent log files) for header_record in self.header_records: - if self.FILE_NUM in header_record.__dict__: - # Increment log file number - header_record.__dict__[self.FILE_NUM] += 1 - # strip the hard coded log number (1) from the log message - # replace with the log number for that start. - # Note this is different from the log number in the file name - # which is cumulative over the workflow. - header_record.args = header_record.args[0:-1] + ( - header_record.__dict__[self.FILE_NUM],) + if self.ROLLOVER_NUM in header_record.__dict__: + # A hack to increment the rollover number that gets logged in + # the log file. (Rollover number only applies to a particular + # workflow run; note this is different from the log count + # number in the log filename.) + header_record.__dict__[self.ROLLOVER_NUM] += 1 + header_record.args = ( + header_record.__dict__[self.ROLLOVER_NUM], + ) logging.FileHandler.emit(self, header_record) def update_log_archive(self, arch_len): @@ -265,30 +253,32 @@ def update_log_archive(self, arch_len): while len(log_files) > arch_len: os.unlink(log_files.pop(0)) - def get_new_log_filename(self): - """Build filename for log""" - base_dir = Path(self.baseFilename).parent - load_type = self.get_load_type() - if load_type == START_LOAD_TYPE: - run_num = 1 - elif load_type == RESTART_LOAD_TYPE: - run_num = self.restart_num + 1 - self.set_log_num() - filename = base_dir.joinpath( - f'{self.log_num:02d}-{load_type}-{run_num:02d}{LOG_FILE_EXTENSION}' + def new_log_file(self) -> Path: + """Set self.log_num and create new log file.""" + try: + log_file = os.readlink(self.baseFilename) + except OSError: + # "log" symlink not yet created, this is the first log + self.log_num = 1 + else: + self.log_num = get_next_log_number(log_file) + log_dir = Path(self.baseFilename).parent + # User-facing restart num is 1 higher than backend value + restart_num = self.restart_num + 1 + filename = log_dir.joinpath( + f'{self.log_num:02d}-{self.load_type}-{restart_num:02d}' + f'{LOG_FILE_EXTENSION}' ) + os.makedirs(filename.parent, exist_ok=True) + # Touch file + with open(filename, 'w+'): + os.utime(filename, None) + # Update symlink + if os.path.lexists(self.baseFilename): + os.unlink(self.baseFilename) + os.symlink(os.path.basename(filename), self.baseFilename) return filename - def set_log_num(self): - if not self.log_num: - try: - current_log = os.readlink(self.baseFilename) - self.log_num = int(get_next_log_number(current_log)) - except OSError: - self.log_num = 1 - else: - self.log_num = int(self.log_num) + 1 - class ReferenceLogFileHandler(logging.FileHandler): """A handler class which writes filtered reference logging records @@ -378,29 +368,28 @@ def close_log(logger: logging.Logger) -> None: handler.close() -def get_next_log_number(log: str) -> str: - """Returns the next log number for the log specified. +def get_next_log_number(log_filepath: Union[str, Path]) -> int: + """Returns the next log number for the given log file path/name. Log name formats are of the form : -- - When given the latest log it returns the next log number, with padded 0s. + When given the latest log it returns the next log number. Examples: - >>> get_next_log_number('01-restart-02.log') - '02' - >>> get_next_log_number('/some/path/to/19-start-20.cylc') - '20' + >>> get_next_log_number('03-restart-02.log') + 4 + >>> get_next_log_number('/some/path/to/19-start-01.cylc') + 20 >>> get_next_log_number('199-start-08.log') - '200' + 200 >>> get_next_log_number('blah') - '01' + 1 """ try: - stripped_log = os.path.basename(log) - next_log_num = int(stripped_log.partition("-")[0]) + 1 + stripped_log = os.path.basename(log_filepath) + return int(stripped_log.partition("-")[0]) + 1 except ValueError: - next_log_num = 1 - return f'{next_log_num:02d}' + return 1 def get_sorted_logs_by_time( diff --git a/cylc/flow/main_loop/auto_restart.py b/cylc/flow/main_loop/auto_restart.py index cb1c43ae619..0cefb76e1f4 100644 --- a/cylc/flow/main_loop/auto_restart.py +++ b/cylc/flow/main_loop/auto_restart.py @@ -96,6 +96,7 @@ from cylc.flow.hostuserutil import get_fqdn_by_host from cylc.flow.main_loop import periodic from cylc.flow.parsec.exceptions import ParsecError +from cylc.flow.scheduler import SchedulerError from cylc.flow.workflow_status import AutoRestartMode from cylc.flow.wallclock import ( get_time_string_from_unix_time as time2str @@ -224,7 +225,7 @@ def _set_auto_restart( # This should raise an "abort" event and return a non-zero code to the # caller still attached to the workflow process. if scheduler.options.no_detach: - raise RuntimeError('Workflow host condemned in no detach mode') + raise SchedulerError('Workflow host condemned in no detach mode') # Check workflow is able to be safely restarted. if not _can_auto_restart(): diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 2329c434a21..9639a704a7b 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -74,7 +74,7 @@ from cylc.flow.network.authentication import key_housekeeping from cylc.flow.network.schema import WorkflowStopMode from cylc.flow.network.server import WorkflowRuntimeServer -from cylc.flow.option_parsers import verbosity_to_env +from cylc.flow.option_parsers import verbosity_to_env, verbosity_to_opts from cylc.flow.parsec.exceptions import ParsecError from cylc.flow.parsec.OrderedDict import DictTree from cylc.flow.parsec.validate import DurationFloat @@ -409,28 +409,29 @@ async def configure(self): self.config.cfg['scheduler']['cycle point time zone']) # Note that the following lines must be present at the top of - # the workflow log file for use in reference test runs: + # the workflow log file for use in reference test runs. + # (These are also headers that get logged on each rollover) LOG.info( f'Run mode: {self.config.run_mode()}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) LOG.info( f'Initial point: {self.config.initial_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) if self.config.start_point != self.config.initial_point: LOG.info( f'Start point: {self.config.start_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) LOG.info( f'Final point: {self.config.final_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) if self.config.stop_point: LOG.info( f'Stop point: {self.config.stop_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) self.broadcast_mgr.linearized_ancestors.update( @@ -543,32 +544,38 @@ async def log_start(self) -> None: LOG.setLevel(logging.INFO) # Print workflow name to disambiguate in case of inferred run number + # while in no-detach mode LOG.info(f"Workflow: {self.workflow}") + # Headers that also get logged on each rollover: LOG.info( self.START_MESSAGE_TMPL % { 'comms_method': 'tcp', 'host': self.host, 'port': self.server.port, 'pid': os.getpid()}, - extra=RotatingLogFileHandler.extra, + extra=RotatingLogFileHandler.header_extra, ) LOG.info( self.START_PUB_MESSAGE_TMPL % { 'comms_method': 'tcp', 'host': self.host, 'port': self.server.pub_port}, - extra=RotatingLogFileHandler.extra, + extra=RotatingLogFileHandler.header_extra, ) restart_num = self.get_restart_num() + 1 LOG.info( - 'Run: (re)start number=%d, log rollover=%d', - restart_num, - 1, # hard code 1 which is updated later if required - extra=RotatingLogFileHandler.extra_num + f'Run: (re)start number={restart_num}, log rollover=%d', + # Hard code 1 in args, gets updated on log rollover (NOTE: this + # must be the only positional arg): + 1, + extra={ + **RotatingLogFileHandler.header_extra, + RotatingLogFileHandler.ROLLOVER_NUM: 1 + } ) LOG.info( f'Cylc version: {CYLC_VERSION}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) if is_quiet: @@ -599,16 +606,22 @@ async def run_scheduler(self): except SchedulerStop as exc: # deliberate stop await self.shutdown(exc) - if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL: - self.workflow_auto_restart() - # run shutdown coros - await asyncio.gather( - *main_loop.get_runners( - self.main_loop_plugins, - main_loop.CoroTypes.ShutDown, - self + try: + if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL: + self.workflow_auto_restart() + # run shutdown coros + await asyncio.gather( + *main_loop.get_runners( + self.main_loop_plugins, + main_loop.CoroTypes.ShutDown, + self + ) ) - ) + except Exception as exc: + # Need to log traceback manually because otherwise this + # exception gets swallowed + LOG.exception(exc) + raise except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: # Includes SchedulerError @@ -1108,10 +1121,7 @@ def load_flow_file(self, is_reload=False): config_dir = get_workflow_run_config_log_dir( self.workflow) config_logs = get_sorted_logs_by_time(config_dir, "*[0-9].cylc") - if config_logs: - log_num = get_next_log_number(config_logs[-1]) - else: - log_num = '01' + log_num = get_next_log_number(config_logs[-1]) if config_logs else 1 if is_reload: load_type = "reload" load_type_num = get_reload_start_number(config_logs) @@ -1123,7 +1133,7 @@ def load_flow_file(self, is_reload=False): load_type = "start" load_type_num = '01' file_name = get_workflow_run_config_log_dir( - self.workflow, f"{log_num}-{load_type}-{load_type_num}.cylc") + self.workflow, f"{log_num:02d}-{load_type}-{load_type_num}.cylc") with open(file_name, "w") as handle: handle.write("# cylc-version: %s\n" % CYLC_VERSION) self.config.pcfg.idump(sparse=True, handle=handle) @@ -1405,9 +1415,12 @@ async def workflow_shutdown(self): raise SchedulerError( 'Invalid auto_restart_mode=%s' % self.auto_restart_mode) - def workflow_auto_restart(self, max_retries=3): + def workflow_auto_restart(self, max_retries: int = 3) -> bool: """Attempt to restart the workflow assuming it has already stopped.""" - cmd = ['cylc', 'play', quote(self.workflow)] + cmd = [ + 'cylc', 'play', quote(self.workflow), + *verbosity_to_opts(cylc.flow.flags.verbosity) + ] if self.options.abort_if_any_task_fails: cmd.append('--abort-if-any-task-fails') for attempt_no in range(max_retries): @@ -1419,6 +1432,7 @@ def workflow_auto_restart(self, max_retries=3): stdin=DEVNULL, stdout=PIPE, stderr=PIPE, + text=True ) # * new_host comes from internal interface which can only return # host names @@ -1429,7 +1443,8 @@ def workflow_auto_restart(self, max_retries=3): f' will retry in {self.INTERVAL_AUTO_RESTART_ERROR}s') LOG.critical( f"{msg}. Restart error:\n", - f"{proc.communicate()[1].decode()}") + f"{proc.communicate()[1]}" + ) sleep(self.INTERVAL_AUTO_RESTART_ERROR) else: LOG.info(f'Workflow now running on "{new_host}".') @@ -1681,28 +1696,26 @@ async def shutdown(self, reason: Exception) -> None: async def _shutdown(self, reason: Exception) -> None: """Shutdown the workflow.""" + shutdown_msg = "Workflow shutting down" if isinstance(reason, SchedulerStop): - LOG.info(f'Workflow shutting down - {reason.args[0]}') + LOG.info(f'{shutdown_msg} - {reason.args[0]}') # Unset the "paused" status of the workflow if not auto-restarting if self.auto_restart_mode != AutoRestartMode.RESTART_NORMAL: self.resume_workflow(quiet=True) elif isinstance(reason, SchedulerError): - LOG.error(f"Workflow shutting down - {reason}") + LOG.error(f"{shutdown_msg} - {reason}") elif isinstance(reason, CylcError) or ( isinstance(reason, ParsecError) and reason.schd_expected ): - LOG.error( - f"Workflow shutting down - {type(reason).__name__}: {reason}" - ) + LOG.error(f"{shutdown_msg} - {type(reason).__name__}: {reason}") if cylc.flow.flags.verbosity > 1: # Print traceback LOG.exception(reason) else: LOG.exception(reason) if str(reason): - LOG.critical(f'Workflow shutting down - {reason}') - else: - LOG.critical('Workflow shutting down') + shutdown_msg += f" - {reason}" + LOG.critical(shutdown_msg) if hasattr(self, 'proc_pool'): self.proc_pool.close() diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 5f10f6bfd60..b728e8d85f4 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -568,10 +568,7 @@ def get_log_file_name( install_log_dir ): log_files = get_sorted_logs_by_time(install_log_dir, '*.log') - if log_files: - log_num = get_next_log_number(log_files[-1]) - else: - log_num = '01' + log_num = get_next_log_number(log_files[-1]) if log_files else 1 load_type = "start" if self.is_reload: load_type = "reload" @@ -579,7 +576,7 @@ def get_log_file_name( elif self.is_restart: load_type = "restart" self.is_restart = False # reset marker - file_name = f"{log_num}-{load_type}-{install_target}.log" + file_name = f"{log_num:02d}-{load_type}-{install_target}.log" return file_name def _remote_init_items(self, comms_meth: CommsMeth): diff --git a/cylc/flow/workflow_files.py b/cylc/flow/workflow_files.py index d0b30f826db..7b0a27164c1 100644 --- a/cylc/flow/workflow_files.py +++ b/cylc/flow/workflow_files.py @@ -1427,11 +1427,8 @@ def _open_install_log(rund, logger): WorkflowFiles.LOG_DIR, 'install') log_files = get_sorted_logs_by_time(log_dir, '*.log') - if log_files: - log_num = get_next_log_number(log_files[-1]) - else: - log_num = '01' - log_path = Path(log_dir, f"{log_num}-{log_type}.log") + log_num = get_next_log_number(log_files[-1]) if log_files else 1 + log_path = Path(log_dir, f"{log_num:02d}-{log_type}.log") log_parent_dir = log_path.parent log_parent_dir.mkdir(exist_ok=True, parents=True) handler = logging.FileHandler(log_path) diff --git a/tests/conftest.py b/tests/conftest.py index 29b4d5fb3b0..fb7a7408f31 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,6 +24,7 @@ from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.cfgspec.globalcfg import SPEC from cylc.flow.parsec.config import ParsecConfig +from cylc.flow.parsec.validate import cylc_config_validate @pytest.fixture @@ -62,7 +63,7 @@ def _mock_glbl_cfg(pypath: str, global_config: str) -> None: nonlocal tmp_path, monkeypatch global_config_path = tmp_path / 'global.cylc' global_config_path.write_text(global_config) - glbl_cfg = ParsecConfig(SPEC) + glbl_cfg = ParsecConfig(SPEC, validator=cylc_config_validate) glbl_cfg.loadcfg(global_config_path) def _inner(cached=False): diff --git a/tests/functional/lib/bash/test_header b/tests/functional/lib/bash/test_header index 8cfa897c2a2..263545aae46 100644 --- a/tests/functional/lib/bash/test_header +++ b/tests/functional/lib/bash/test_header @@ -65,7 +65,7 @@ # grep_ok PATTERN FILE [$OPTS] # Run "grep [$OPTS] -q -e PATTERN FILE". # grep_workflow_log_ok TEST_NAME PATTERN [$OPTS] -# Run "grep [$OPTS] -q -e PATTERN ". +# Run "grep [$OPTS] -s -e PATTERN ". # named_grep_ok NAME PATTERN FILE [$OPTS] # Run grep_ok with a custom test name. # OPTS: put grep options like '-E' (extended regex) at end of line. @@ -429,12 +429,26 @@ grep_ok() { grep_workflow_log_ok() { local TEST_NAME="$1" - shift - if grep -s "$@" "${WORKFLOW_RUN_DIR}/log/scheduler/log"; then - ok "${TEST_NAME}" - else - fail "${TEST_NAME}" + local PATTERN="$2" + shift 2 + local LOG_FILE="${WORKFLOW_RUN_DIR}/log/scheduler/log" + if grep "$@" -s -e "$PATTERN" "$LOG_FILE"; then + ok "${TEST_NAME}" + return fi + mkdir -p "${TEST_LOG_DIR}" + { + cat <<__ERR__ +Can't find: +=========== +${PATTERN} +=========== +in: +=========== +__ERR__ + cat "$LOG_FILE" + } >"${TEST_LOG_DIR}/${TEST_NAME}.stderr" + fail "${TEST_NAME}" } named_grep_ok() { diff --git a/tests/functional/restart/34-auto-restart-basic.t b/tests/functional/restart/34-auto-restart-basic.t index 5d1ccbe5f48..62b6677fc7e 100644 --- a/tests/functional/restart/34-auto-restart-basic.t +++ b/tests/functional/restart/34-auto-restart-basic.t @@ -18,7 +18,7 @@ export REQUIRE_PLATFORM='loc:remote fs:shared runner:background' . "$(dirname "$0")/test_header" #------------------------------------------------------------------------------- -set_test_number 9 +set_test_number 10 if ${CYLC_TEST_DEBUG:-false}; then ERR=2; else ERR=1; fi #------------------------------------------------------------------------------- # run through the shutdown - restart procedure @@ -65,7 +65,7 @@ ${BASE_GLOBAL_CONFIG} " # test shutdown procedure - scan the first log file FILE=$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f) -log_scan "${TEST_NAME}-shutdown" "${FILE}" 20 1 \ +log_scan "${TEST_NAME}-shutdown-log-scan" "${FILE}" 20 1 \ 'The Cylc workflow host will soon become un-available' \ 'Workflow shutting down - REQUEST(NOW-NOW)' \ "Attempting to restart on \"${CYLC_TEST_HOST}\"" \ @@ -76,7 +76,7 @@ LATEST_TASK=$(cylc workflow-state "${WORKFLOW_NAME}" -S succeeded \ # test restart procedure - scan the second log file created on restart poll_workflow_restart FILE=$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f) -log_scan "${TEST_NAME}-restart" "${FILE}" 20 1 \ +log_scan "${TEST_NAME}-restart-log-scan" "${FILE}" 20 1 \ "Scheduler: url=tcp://$(get_fqdn "${CYLC_TEST_HOST}")" run_ok "${TEST_NAME}-restart-success" cylc workflow-state "${WORKFLOW_NAME}" \ --task="$(printf 'task_foo%02d' $(( LATEST_TASK + 3 )))" \ @@ -84,11 +84,18 @@ run_ok "${TEST_NAME}-restart-success" cylc workflow-state "${WORKFLOW_NAME}" \ # check the command the workflow has been restarted with run_ok "${TEST_NAME}-contact" cylc get-contact "${WORKFLOW_NAME}" -grep_ok "cylc play ${WORKFLOW_NAME} --host=${CYLC_TEST_HOST} --host=localhost" \ +grep_ok "cylc play ${WORKFLOW_NAME} -v --host=${CYLC_TEST_HOST} --host=localhost" \ "${TEST_NAME}-contact.stdout" # stop workflow cylc stop "${WORKFLOW_NAME}" --kill --max-polls=10 --interval=2 2>'/dev/null' -purge -exit +# Check correct number of logs +ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > ls_logs.out +cmp_ok ls_logs.out << __EOF__ +01-start-01.log +02-restart-02.log +log +__EOF__ + +purge diff --git a/tests/functional/restart/41-auto-restart-local-jobs.t b/tests/functional/restart/41-auto-restart-local-jobs.t index 163b6dd91a3..a1d86d3ef95 100644 --- a/tests/functional/restart/41-auto-restart-local-jobs.t +++ b/tests/functional/restart/41-auto-restart-local-jobs.t @@ -73,8 +73,8 @@ ${BASE_GLOBAL_CONFIG} condemned = ${CYLC_TEST_HOST1} " -FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" -log_scan "${TEST_NAME}-stop" "${FILE}" 40 1 \ +LOG_FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" +log_scan "${TEST_NAME}-stop-log-scan" "${LOG_FILE}" 40 1 \ 'The Cylc workflow host will soon become un-available' \ 'Waiting for jobs running on localhost to complete' \ 'Waiting for jobs running on localhost to complete' \ @@ -82,10 +82,11 @@ log_scan "${TEST_NAME}-stop" "${FILE}" 40 1 \ "Attempting to restart on \"${CYLC_TEST_HOST2}\"" # we shouldn't have any orphaned tasks because we should # have waited for them to complete -grep_fail 'orphaned task' "${FILE}" +grep_fail 'orphaned task' "$LOG_FILE" poll_workflow_restart -grep_workflow_log_ok "restart-log-grep" "Workflow now running on \"${CYLC_TEST_HOST2}\"" +log_scan "${TEST_NAME}-restart-log-scan" "$LOG_FILE" 20 1 \ + "Workflow now running on \"${CYLC_TEST_HOST2}\"" #------------------------------------------------------------------------------- # auto stop-restart - force mode: # ensure the workflow DOESN'T WAIT for local jobs to complete before stopping @@ -103,8 +104,8 @@ ${BASE_GLOBAL_CONFIG} condemned = ${CYLC_TEST_HOST2}! " -FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" -log_scan "${TEST_NAME}-stop" "${FILE}" 40 1 \ +LOG_FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" +log_scan "${TEST_NAME}-stop-log-scan" "${LOG_FILE}" 40 1 \ 'The Cylc workflow host will soon become un-available' \ 'This workflow will be shutdown as the workflow host is unable to continue' \ 'Workflow shutting down - REQUEST(NOW)' \ diff --git a/tests/integration/main_loop/__init__.py b/tests/integration/main_loop/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/main_loop/test_auto_restart.py b/tests/integration/main_loop/test_auto_restart.py new file mode 100644 index 00000000000..a83fb7c7932 --- /dev/null +++ b/tests/integration/main_loop/test_auto_restart.py @@ -0,0 +1,50 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +from unittest.mock import Mock + +import pytest + +from cylc.flow.main_loop import MainLoopPluginException +from cylc.flow.scheduler import Scheduler +from cylc.flow.workflow_status import AutoRestartMode + + +async def test_no_detach( + one_conf, flow, scheduler, run, mock_glbl_cfg, log_filter, + monkeypatch: pytest.MonkeyPatch +): + """Test that the Scheduler aborts when auto restart tries to happen + while in no-detach mode.""" + mock_glbl_cfg( + 'cylc.flow.scheduler.glbl_cfg', ''' + [scheduler] + [[main loop]] + plugins = auto restart + [[[auto restart]]] + interval = PT1S + ''') + monkeypatch.setattr( + 'cylc.flow.main_loop.auto_restart._should_auto_restart', + Mock(return_value=AutoRestartMode.RESTART_NORMAL) + ) + reg: str = flow(one_conf) + schd: Scheduler = scheduler(reg, paused_start=True, no_detach=True) + with pytest.raises(MainLoopPluginException) as exc: + async with run(schd) as log: + await asyncio.sleep(2) + assert log_filter(log, contains=f"Workflow shutting down - {exc.value}") diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 6c367390d0f..c4578ef6a70 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -32,6 +32,8 @@ TASK_STATUS_FAILED ) +from cylc.flow.workflow_status import AutoRestartMode + from .utils.flow_tools import _make_flow @@ -137,7 +139,7 @@ async def mock_shutdown(*a, **k): assert last_record.message == "Error on shutdown" assert last_record.levelno == logging.ERROR assert last_record.exc_text is not None - assert last_record.exc_text.startswith("Traceback (most recent call last)") + assert last_record.exc_text.startswith(TRACEBACK_MSG) assert ("During handling of the above exception, " "another exception occurred") not in last_record.exc_text @@ -343,3 +345,29 @@ def raise_ParsecError(*a, **k): exact_match="Workflow shutting down - Mock error" ) assert TRACEBACK_MSG in log.text + + +async def test_error_during_auto_restart( + one: Scheduler, + run: Callable, + log_filter: Callable, + monkeypatch: pytest.MonkeyPatch, +): + """Test that an error during auto-restart does not get swallowed""" + log: pytest.LogCaptureFixture + err_msg = "Mock error: sugar in water" + + def mock_auto_restart(*a, **k): + raise RuntimeError(err_msg) + + monkeypatch.setattr(one, 'workflow_auto_restart', mock_auto_restart) + monkeypatch.setattr( + one, 'auto_restart_mode', AutoRestartMode.RESTART_NORMAL + ) + + with pytest.raises(RuntimeError, match=err_msg): + async with run(one) as log: + pass + + assert log_filter(log, level=logging.ERROR, contains=err_msg) + assert TRACEBACK_MSG in log.text diff --git a/tests/unit/main_loop/test_auto_restart.py b/tests/unit/main_loop/test_auto_restart.py index 8b8e33cf40b..bab949ae79f 100644 --- a/tests/unit/main_loop/test_auto_restart.py +++ b/tests/unit/main_loop/test_auto_restart.py @@ -20,7 +20,9 @@ import pytest from cylc.flow import CYLC_LOG -from cylc.flow.exceptions import CylcConfigError, HostSelectException +from cylc.flow.exceptions import ( + CylcConfigError, CylcError, HostSelectException +) from cylc.flow.main_loop.auto_restart import ( _can_auto_restart, _set_auto_restart, @@ -28,6 +30,7 @@ auto_restart, ) from cylc.flow.parsec.exceptions import ParsecError +from cylc.flow.scheduler import Scheduler from cylc.flow.workflow_status import ( AutoRestartMode, StopMode @@ -185,15 +188,16 @@ def test_set_auto_restart_already_restarting(caplog): assert caplog.record_tuples == [] -def test_set_auto_restart_no_detach(caplog): - """Ensure raises RuntimeError if running in no-detach mode.""" +def test_set_auto_restart_no_detach(caplog: pytest.LogCaptureFixture): + """Ensure raises a CylcError (or subclass) if running in no-detach mode.""" scheduler = Mock( + spec=Scheduler, stop_mode=None, auto_restart_time=None, options=Mock(no_detach=True) ) with caplog.at_level(level=logging.DEBUG, logger=CYLC_LOG): - with pytest.raises(RuntimeError): + with pytest.raises(CylcError): _set_auto_restart(scheduler) assert caplog.record_tuples == [] From b338a91d5db0e1e60ca7ca16c3be28c86c630693 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Fri, 12 Aug 2022 14:30:16 +0100 Subject: [PATCH 4/4] scan: update workflow_params to use public db (#4998) * The workflow_params function was using the private DB to list workflow params. * Really it should be using the public DB as only the scheduler process/thread should be accessing the private DB to avoid the potential for DB locking. --- cylc/flow/network/scan.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/cylc/flow/network/scan.py b/cylc/flow/network/scan.py index 12e03ef3f45..201ee4f14fc 100644 --- a/cylc/flow/network/scan.py +++ b/cylc/flow/network/scan.py @@ -67,7 +67,10 @@ ClientTimeout, WorkflowRuntimeClient, ) -from cylc.flow.pathutil import get_cylc_run_dir +from cylc.flow.pathutil import ( + get_cylc_run_dir, + get_workflow_run_dir, +) from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.workflow_files import ( ContactFileFields, @@ -528,11 +531,16 @@ def _callback(_, entry): key, value = entry params[key] = value - db_file = flow['path'] / SERVICE / 'db' + # NOTE: use the public DB for reading + # (only the scheduler process/thread should access the private database) + db_file = Path(get_workflow_run_dir(flow['name'], 'log', 'db')) if db_file.exists(): dao = CylcWorkflowDAO(db_file, is_public=False) - dao.connect() - dao.select_workflow_params(_callback) - flow['workflow_params'] = params + try: + dao.connect() + dao.select_workflow_params(_callback) + flow['workflow_params'] = params + finally: + dao.close() return flow