diff --git a/CHANGES.md b/CHANGES.md index 094bb215584..ab60946d252 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,9 @@ cycle point validation in the UI. [#5037](https://github.com/cylc/cylc-flow/pull/5037) - Fix bug where the workflow restart number would get wiped on reload. +[#5049](https://github.com/cylc/cylc-flow/pull/5049) - Fix several small +bugs related to auto restart. + ------------------------------------------------------------------------------- ## __cylc-8.0.0 (Released 2022-07-28)__ diff --git a/cylc/flow/loggingutil.py b/cylc/flow/loggingutil.py index 26c136e95ab..2d99c225713 100644 --- a/cylc/flow/loggingutil.py +++ b/cylc/flow/loggingutil.py @@ -126,21 +126,19 @@ class RotatingLogFileHandler(logging.FileHandler): for names. Argument: - log_file_path: path to the log file + log_file_path: path to the log file (symlink to latest log file) no_detach: non-detach mode? restart_num: restart number for the run timestamp: Add timestamp to log formatting? """ FILE_HEADER_FLAG = 'cylc_log_file_header' - FILE_NUM = 'cylc_log_num' + ROLLOVER_NUM = 'cylc_log_num' MIN_BYTES = 1024 - extra = {FILE_HEADER_FLAG: True} - extra_num = { - FILE_HEADER_FLAG: True, - FILE_NUM: 1 - } + header_extra = {FILE_HEADER_FLAG: True} + """Use to indicate the log msg is a header that should be logged on + every rollover""" def __init__( self, @@ -152,9 +150,11 @@ def __init__( logging.FileHandler.__init__(self, log_file_path) self.no_detach = no_detach self.formatter = CylcLogFormatter(timestamp=timestamp) + # Header records get appended to when calling + # LOG.info(extra=RotatingLogFileHandler.[rollover_]header_extra) self.header_records: List[logging.LogRecord] = [] self.restart_num = restart_num - self.log_num: Optional[int] = None + self.log_num: Optional[int] = None # null value until log file created def emit(self, record): """Emit a record, rollover log if necessary.""" @@ -169,16 +169,14 @@ def emit(self, record): except Exception: self.handleError(record) - def load_type_change(self): + def load_type_change(self) -> bool: """Has there been a load-type change, e.g. restart?""" - current_load_type = self.get_load_type() existing_load_type = self.existing_log_load_type() - # Rollover if the load type has changed. - if existing_load_type and current_load_type != existing_load_type: + if existing_load_type and self.load_type != existing_load_type: return True return False - def existing_log_load_type(self): + def existing_log_load_type(self) -> Optional[str]: """Return a log load type, if one currently exists""" try: existing_log_name = os.readlink(self.baseFilename) @@ -188,12 +186,11 @@ def existing_log_load_type(self): for load_type in [RESTART_LOAD_TYPE, START_LOAD_TYPE]: if existing_log_name.find(load_type) > 0: return load_type + return None - def should_rollover(self, record): + def should_rollover(self, record: logging.LogRecord) -> bool: """Should rollover?""" - if (self.stream is None or - self.load_type_change() or - self.log_num is None): + if self.log_num is None or self.stream is None: return True max_bytes = glbl_cfg().get( ['scheduler', 'logging', 'maximum size in bytes']) @@ -208,26 +205,18 @@ def should_rollover(self, record): raise SystemExit(exc) return self.stream.tell() + len(msg.encode('utf8')) >= max_bytes - def get_load_type(self): + @property + def load_type(self) -> str: """Establish current load type, as perceived by scheduler.""" if self.restart_num > 0: return RESTART_LOAD_TYPE return START_LOAD_TYPE - def do_rollover(self): + def do_rollover(self) -> None: """Create and rollover log file if necessary.""" - # Generate new file name - filename = self.get_new_log_filename() - os.makedirs(os.path.dirname(filename), exist_ok=True) - # Touch file - with open(filename, 'w+'): - os.utime(filename, None) - # Update symlink - if (os.path.exists(self.baseFilename) or - os.path.lexists(self.baseFilename)): - os.unlink(self.baseFilename) - os.symlink(os.path.basename(filename), self.baseFilename) - # Housekeep log files + # Create new log file + self.new_log_file() + # Housekeep old log files arch_len = glbl_cfg().get( ['scheduler', 'logging', 'rolling archive length']) if arch_len: @@ -235,7 +224,6 @@ def do_rollover(self): # Reopen stream, redirect STDOUT and STDERR to log if self.stream: self.stream.close() - self.stream = None self.stream = self._open() # Dup STDOUT and STDERR in detach mode if not self.no_detach: @@ -243,15 +231,15 @@ def do_rollover(self): os.dup2(self.stream.fileno(), sys.stderr.fileno()) # Emit header records (should only do this for subsequent log files) for header_record in self.header_records: - if self.FILE_NUM in header_record.__dict__: - # Increment log file number - header_record.__dict__[self.FILE_NUM] += 1 - # strip the hard coded log number (1) from the log message - # replace with the log number for that start. - # Note this is different from the log number in the file name - # which is cumulative over the workflow. - header_record.args = header_record.args[0:-1] + ( - header_record.__dict__[self.FILE_NUM],) + if self.ROLLOVER_NUM in header_record.__dict__: + # A hack to increment the rollover number that gets logged in + # the log file. (Rollover number only applies to a particular + # workflow run; note this is different from the log count + # number in the log filename.) + header_record.__dict__[self.ROLLOVER_NUM] += 1 + header_record.args = ( + header_record.__dict__[self.ROLLOVER_NUM], + ) logging.FileHandler.emit(self, header_record) def update_log_archive(self, arch_len): @@ -265,30 +253,32 @@ def update_log_archive(self, arch_len): while len(log_files) > arch_len: os.unlink(log_files.pop(0)) - def get_new_log_filename(self): - """Build filename for log""" - base_dir = Path(self.baseFilename).parent - load_type = self.get_load_type() - if load_type == START_LOAD_TYPE: - run_num = 1 - elif load_type == RESTART_LOAD_TYPE: - run_num = self.restart_num + 1 - self.set_log_num() - filename = base_dir.joinpath( - f'{self.log_num:02d}-{load_type}-{run_num:02d}{LOG_FILE_EXTENSION}' + def new_log_file(self) -> Path: + """Set self.log_num and create new log file.""" + try: + log_file = os.readlink(self.baseFilename) + except OSError: + # "log" symlink not yet created, this is the first log + self.log_num = 1 + else: + self.log_num = get_next_log_number(log_file) + log_dir = Path(self.baseFilename).parent + # User-facing restart num is 1 higher than backend value + restart_num = self.restart_num + 1 + filename = log_dir.joinpath( + f'{self.log_num:02d}-{self.load_type}-{restart_num:02d}' + f'{LOG_FILE_EXTENSION}' ) + os.makedirs(filename.parent, exist_ok=True) + # Touch file + with open(filename, 'w+'): + os.utime(filename, None) + # Update symlink + if os.path.lexists(self.baseFilename): + os.unlink(self.baseFilename) + os.symlink(os.path.basename(filename), self.baseFilename) return filename - def set_log_num(self): - if not self.log_num: - try: - current_log = os.readlink(self.baseFilename) - self.log_num = int(get_next_log_number(current_log)) - except OSError: - self.log_num = 1 - else: - self.log_num = int(self.log_num) + 1 - class ReferenceLogFileHandler(logging.FileHandler): """A handler class which writes filtered reference logging records @@ -378,29 +368,28 @@ def close_log(logger: logging.Logger) -> None: handler.close() -def get_next_log_number(log: str) -> str: - """Returns the next log number for the log specified. +def get_next_log_number(log_filepath: Union[str, Path]) -> int: + """Returns the next log number for the given log file path/name. Log name formats are of the form : -- - When given the latest log it returns the next log number, with padded 0s. + When given the latest log it returns the next log number. Examples: - >>> get_next_log_number('01-restart-02.log') - '02' - >>> get_next_log_number('/some/path/to/19-start-20.cylc') - '20' + >>> get_next_log_number('03-restart-02.log') + 4 + >>> get_next_log_number('/some/path/to/19-start-01.cylc') + 20 >>> get_next_log_number('199-start-08.log') - '200' + 200 >>> get_next_log_number('blah') - '01' + 1 """ try: - stripped_log = os.path.basename(log) - next_log_num = int(stripped_log.partition("-")[0]) + 1 + stripped_log = os.path.basename(log_filepath) + return int(stripped_log.partition("-")[0]) + 1 except ValueError: - next_log_num = 1 - return f'{next_log_num:02d}' + return 1 def get_sorted_logs_by_time( diff --git a/cylc/flow/main_loop/auto_restart.py b/cylc/flow/main_loop/auto_restart.py index cb1c43ae619..0cefb76e1f4 100644 --- a/cylc/flow/main_loop/auto_restart.py +++ b/cylc/flow/main_loop/auto_restart.py @@ -96,6 +96,7 @@ from cylc.flow.hostuserutil import get_fqdn_by_host from cylc.flow.main_loop import periodic from cylc.flow.parsec.exceptions import ParsecError +from cylc.flow.scheduler import SchedulerError from cylc.flow.workflow_status import AutoRestartMode from cylc.flow.wallclock import ( get_time_string_from_unix_time as time2str @@ -224,7 +225,7 @@ def _set_auto_restart( # This should raise an "abort" event and return a non-zero code to the # caller still attached to the workflow process. if scheduler.options.no_detach: - raise RuntimeError('Workflow host condemned in no detach mode') + raise SchedulerError('Workflow host condemned in no detach mode') # Check workflow is able to be safely restarted. if not _can_auto_restart(): diff --git a/cylc/flow/network/scan.py b/cylc/flow/network/scan.py index 12e03ef3f45..201ee4f14fc 100644 --- a/cylc/flow/network/scan.py +++ b/cylc/flow/network/scan.py @@ -67,7 +67,10 @@ ClientTimeout, WorkflowRuntimeClient, ) -from cylc.flow.pathutil import get_cylc_run_dir +from cylc.flow.pathutil import ( + get_cylc_run_dir, + get_workflow_run_dir, +) from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.workflow_files import ( ContactFileFields, @@ -528,11 +531,16 @@ def _callback(_, entry): key, value = entry params[key] = value - db_file = flow['path'] / SERVICE / 'db' + # NOTE: use the public DB for reading + # (only the scheduler process/thread should access the private database) + db_file = Path(get_workflow_run_dir(flow['name'], 'log', 'db')) if db_file.exists(): dao = CylcWorkflowDAO(db_file, is_public=False) - dao.connect() - dao.select_workflow_params(_callback) - flow['workflow_params'] = params + try: + dao.connect() + dao.select_workflow_params(_callback) + flow['workflow_params'] = params + finally: + dao.close() return flow diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 2329c434a21..9639a704a7b 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -74,7 +74,7 @@ from cylc.flow.network.authentication import key_housekeeping from cylc.flow.network.schema import WorkflowStopMode from cylc.flow.network.server import WorkflowRuntimeServer -from cylc.flow.option_parsers import verbosity_to_env +from cylc.flow.option_parsers import verbosity_to_env, verbosity_to_opts from cylc.flow.parsec.exceptions import ParsecError from cylc.flow.parsec.OrderedDict import DictTree from cylc.flow.parsec.validate import DurationFloat @@ -409,28 +409,29 @@ async def configure(self): self.config.cfg['scheduler']['cycle point time zone']) # Note that the following lines must be present at the top of - # the workflow log file for use in reference test runs: + # the workflow log file for use in reference test runs. + # (These are also headers that get logged on each rollover) LOG.info( f'Run mode: {self.config.run_mode()}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) LOG.info( f'Initial point: {self.config.initial_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) if self.config.start_point != self.config.initial_point: LOG.info( f'Start point: {self.config.start_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) LOG.info( f'Final point: {self.config.final_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) if self.config.stop_point: LOG.info( f'Stop point: {self.config.stop_point}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) self.broadcast_mgr.linearized_ancestors.update( @@ -543,32 +544,38 @@ async def log_start(self) -> None: LOG.setLevel(logging.INFO) # Print workflow name to disambiguate in case of inferred run number + # while in no-detach mode LOG.info(f"Workflow: {self.workflow}") + # Headers that also get logged on each rollover: LOG.info( self.START_MESSAGE_TMPL % { 'comms_method': 'tcp', 'host': self.host, 'port': self.server.port, 'pid': os.getpid()}, - extra=RotatingLogFileHandler.extra, + extra=RotatingLogFileHandler.header_extra, ) LOG.info( self.START_PUB_MESSAGE_TMPL % { 'comms_method': 'tcp', 'host': self.host, 'port': self.server.pub_port}, - extra=RotatingLogFileHandler.extra, + extra=RotatingLogFileHandler.header_extra, ) restart_num = self.get_restart_num() + 1 LOG.info( - 'Run: (re)start number=%d, log rollover=%d', - restart_num, - 1, # hard code 1 which is updated later if required - extra=RotatingLogFileHandler.extra_num + f'Run: (re)start number={restart_num}, log rollover=%d', + # Hard code 1 in args, gets updated on log rollover (NOTE: this + # must be the only positional arg): + 1, + extra={ + **RotatingLogFileHandler.header_extra, + RotatingLogFileHandler.ROLLOVER_NUM: 1 + } ) LOG.info( f'Cylc version: {CYLC_VERSION}', - extra=RotatingLogFileHandler.extra + extra=RotatingLogFileHandler.header_extra ) if is_quiet: @@ -599,16 +606,22 @@ async def run_scheduler(self): except SchedulerStop as exc: # deliberate stop await self.shutdown(exc) - if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL: - self.workflow_auto_restart() - # run shutdown coros - await asyncio.gather( - *main_loop.get_runners( - self.main_loop_plugins, - main_loop.CoroTypes.ShutDown, - self + try: + if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL: + self.workflow_auto_restart() + # run shutdown coros + await asyncio.gather( + *main_loop.get_runners( + self.main_loop_plugins, + main_loop.CoroTypes.ShutDown, + self + ) ) - ) + except Exception as exc: + # Need to log traceback manually because otherwise this + # exception gets swallowed + LOG.exception(exc) + raise except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc: # Includes SchedulerError @@ -1108,10 +1121,7 @@ def load_flow_file(self, is_reload=False): config_dir = get_workflow_run_config_log_dir( self.workflow) config_logs = get_sorted_logs_by_time(config_dir, "*[0-9].cylc") - if config_logs: - log_num = get_next_log_number(config_logs[-1]) - else: - log_num = '01' + log_num = get_next_log_number(config_logs[-1]) if config_logs else 1 if is_reload: load_type = "reload" load_type_num = get_reload_start_number(config_logs) @@ -1123,7 +1133,7 @@ def load_flow_file(self, is_reload=False): load_type = "start" load_type_num = '01' file_name = get_workflow_run_config_log_dir( - self.workflow, f"{log_num}-{load_type}-{load_type_num}.cylc") + self.workflow, f"{log_num:02d}-{load_type}-{load_type_num}.cylc") with open(file_name, "w") as handle: handle.write("# cylc-version: %s\n" % CYLC_VERSION) self.config.pcfg.idump(sparse=True, handle=handle) @@ -1405,9 +1415,12 @@ async def workflow_shutdown(self): raise SchedulerError( 'Invalid auto_restart_mode=%s' % self.auto_restart_mode) - def workflow_auto_restart(self, max_retries=3): + def workflow_auto_restart(self, max_retries: int = 3) -> bool: """Attempt to restart the workflow assuming it has already stopped.""" - cmd = ['cylc', 'play', quote(self.workflow)] + cmd = [ + 'cylc', 'play', quote(self.workflow), + *verbosity_to_opts(cylc.flow.flags.verbosity) + ] if self.options.abort_if_any_task_fails: cmd.append('--abort-if-any-task-fails') for attempt_no in range(max_retries): @@ -1419,6 +1432,7 @@ def workflow_auto_restart(self, max_retries=3): stdin=DEVNULL, stdout=PIPE, stderr=PIPE, + text=True ) # * new_host comes from internal interface which can only return # host names @@ -1429,7 +1443,8 @@ def workflow_auto_restart(self, max_retries=3): f' will retry in {self.INTERVAL_AUTO_RESTART_ERROR}s') LOG.critical( f"{msg}. Restart error:\n", - f"{proc.communicate()[1].decode()}") + f"{proc.communicate()[1]}" + ) sleep(self.INTERVAL_AUTO_RESTART_ERROR) else: LOG.info(f'Workflow now running on "{new_host}".') @@ -1681,28 +1696,26 @@ async def shutdown(self, reason: Exception) -> None: async def _shutdown(self, reason: Exception) -> None: """Shutdown the workflow.""" + shutdown_msg = "Workflow shutting down" if isinstance(reason, SchedulerStop): - LOG.info(f'Workflow shutting down - {reason.args[0]}') + LOG.info(f'{shutdown_msg} - {reason.args[0]}') # Unset the "paused" status of the workflow if not auto-restarting if self.auto_restart_mode != AutoRestartMode.RESTART_NORMAL: self.resume_workflow(quiet=True) elif isinstance(reason, SchedulerError): - LOG.error(f"Workflow shutting down - {reason}") + LOG.error(f"{shutdown_msg} - {reason}") elif isinstance(reason, CylcError) or ( isinstance(reason, ParsecError) and reason.schd_expected ): - LOG.error( - f"Workflow shutting down - {type(reason).__name__}: {reason}" - ) + LOG.error(f"{shutdown_msg} - {type(reason).__name__}: {reason}") if cylc.flow.flags.verbosity > 1: # Print traceback LOG.exception(reason) else: LOG.exception(reason) if str(reason): - LOG.critical(f'Workflow shutting down - {reason}') - else: - LOG.critical('Workflow shutting down') + shutdown_msg += f" - {reason}" + LOG.critical(shutdown_msg) if hasattr(self, 'proc_pool'): self.proc_pool.close() diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 5f10f6bfd60..b728e8d85f4 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -568,10 +568,7 @@ def get_log_file_name( install_log_dir ): log_files = get_sorted_logs_by_time(install_log_dir, '*.log') - if log_files: - log_num = get_next_log_number(log_files[-1]) - else: - log_num = '01' + log_num = get_next_log_number(log_files[-1]) if log_files else 1 load_type = "start" if self.is_reload: load_type = "reload" @@ -579,7 +576,7 @@ def get_log_file_name( elif self.is_restart: load_type = "restart" self.is_restart = False # reset marker - file_name = f"{log_num}-{load_type}-{install_target}.log" + file_name = f"{log_num:02d}-{load_type}-{install_target}.log" return file_name def _remote_init_items(self, comms_meth: CommsMeth): diff --git a/cylc/flow/workflow_files.py b/cylc/flow/workflow_files.py index d0b30f826db..7b0a27164c1 100644 --- a/cylc/flow/workflow_files.py +++ b/cylc/flow/workflow_files.py @@ -1427,11 +1427,8 @@ def _open_install_log(rund, logger): WorkflowFiles.LOG_DIR, 'install') log_files = get_sorted_logs_by_time(log_dir, '*.log') - if log_files: - log_num = get_next_log_number(log_files[-1]) - else: - log_num = '01' - log_path = Path(log_dir, f"{log_num}-{log_type}.log") + log_num = get_next_log_number(log_files[-1]) if log_files else 1 + log_path = Path(log_dir, f"{log_num:02d}-{log_type}.log") log_parent_dir = log_path.parent log_parent_dir.mkdir(exist_ok=True, parents=True) handler = logging.FileHandler(log_path) diff --git a/tests/conftest.py b/tests/conftest.py index 29b4d5fb3b0..fb7a7408f31 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,6 +24,7 @@ from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.cfgspec.globalcfg import SPEC from cylc.flow.parsec.config import ParsecConfig +from cylc.flow.parsec.validate import cylc_config_validate @pytest.fixture @@ -62,7 +63,7 @@ def _mock_glbl_cfg(pypath: str, global_config: str) -> None: nonlocal tmp_path, monkeypatch global_config_path = tmp_path / 'global.cylc' global_config_path.write_text(global_config) - glbl_cfg = ParsecConfig(SPEC) + glbl_cfg = ParsecConfig(SPEC, validator=cylc_config_validate) glbl_cfg.loadcfg(global_config_path) def _inner(cached=False): diff --git a/tests/functional/lib/bash/test_header b/tests/functional/lib/bash/test_header index 8cfa897c2a2..263545aae46 100644 --- a/tests/functional/lib/bash/test_header +++ b/tests/functional/lib/bash/test_header @@ -65,7 +65,7 @@ # grep_ok PATTERN FILE [$OPTS] # Run "grep [$OPTS] -q -e PATTERN FILE". # grep_workflow_log_ok TEST_NAME PATTERN [$OPTS] -# Run "grep [$OPTS] -q -e PATTERN ". +# Run "grep [$OPTS] -s -e PATTERN ". # named_grep_ok NAME PATTERN FILE [$OPTS] # Run grep_ok with a custom test name. # OPTS: put grep options like '-E' (extended regex) at end of line. @@ -429,12 +429,26 @@ grep_ok() { grep_workflow_log_ok() { local TEST_NAME="$1" - shift - if grep -s "$@" "${WORKFLOW_RUN_DIR}/log/scheduler/log"; then - ok "${TEST_NAME}" - else - fail "${TEST_NAME}" + local PATTERN="$2" + shift 2 + local LOG_FILE="${WORKFLOW_RUN_DIR}/log/scheduler/log" + if grep "$@" -s -e "$PATTERN" "$LOG_FILE"; then + ok "${TEST_NAME}" + return fi + mkdir -p "${TEST_LOG_DIR}" + { + cat <<__ERR__ +Can't find: +=========== +${PATTERN} +=========== +in: +=========== +__ERR__ + cat "$LOG_FILE" + } >"${TEST_LOG_DIR}/${TEST_NAME}.stderr" + fail "${TEST_NAME}" } named_grep_ok() { diff --git a/tests/functional/restart/34-auto-restart-basic.t b/tests/functional/restart/34-auto-restart-basic.t index 5d1ccbe5f48..62b6677fc7e 100644 --- a/tests/functional/restart/34-auto-restart-basic.t +++ b/tests/functional/restart/34-auto-restart-basic.t @@ -18,7 +18,7 @@ export REQUIRE_PLATFORM='loc:remote fs:shared runner:background' . "$(dirname "$0")/test_header" #------------------------------------------------------------------------------- -set_test_number 9 +set_test_number 10 if ${CYLC_TEST_DEBUG:-false}; then ERR=2; else ERR=1; fi #------------------------------------------------------------------------------- # run through the shutdown - restart procedure @@ -65,7 +65,7 @@ ${BASE_GLOBAL_CONFIG} " # test shutdown procedure - scan the first log file FILE=$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f) -log_scan "${TEST_NAME}-shutdown" "${FILE}" 20 1 \ +log_scan "${TEST_NAME}-shutdown-log-scan" "${FILE}" 20 1 \ 'The Cylc workflow host will soon become un-available' \ 'Workflow shutting down - REQUEST(NOW-NOW)' \ "Attempting to restart on \"${CYLC_TEST_HOST}\"" \ @@ -76,7 +76,7 @@ LATEST_TASK=$(cylc workflow-state "${WORKFLOW_NAME}" -S succeeded \ # test restart procedure - scan the second log file created on restart poll_workflow_restart FILE=$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f) -log_scan "${TEST_NAME}-restart" "${FILE}" 20 1 \ +log_scan "${TEST_NAME}-restart-log-scan" "${FILE}" 20 1 \ "Scheduler: url=tcp://$(get_fqdn "${CYLC_TEST_HOST}")" run_ok "${TEST_NAME}-restart-success" cylc workflow-state "${WORKFLOW_NAME}" \ --task="$(printf 'task_foo%02d' $(( LATEST_TASK + 3 )))" \ @@ -84,11 +84,18 @@ run_ok "${TEST_NAME}-restart-success" cylc workflow-state "${WORKFLOW_NAME}" \ # check the command the workflow has been restarted with run_ok "${TEST_NAME}-contact" cylc get-contact "${WORKFLOW_NAME}" -grep_ok "cylc play ${WORKFLOW_NAME} --host=${CYLC_TEST_HOST} --host=localhost" \ +grep_ok "cylc play ${WORKFLOW_NAME} -v --host=${CYLC_TEST_HOST} --host=localhost" \ "${TEST_NAME}-contact.stdout" # stop workflow cylc stop "${WORKFLOW_NAME}" --kill --max-polls=10 --interval=2 2>'/dev/null' -purge -exit +# Check correct number of logs +ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > ls_logs.out +cmp_ok ls_logs.out << __EOF__ +01-start-01.log +02-restart-02.log +log +__EOF__ + +purge diff --git a/tests/functional/restart/41-auto-restart-local-jobs.t b/tests/functional/restart/41-auto-restart-local-jobs.t index 163b6dd91a3..a1d86d3ef95 100644 --- a/tests/functional/restart/41-auto-restart-local-jobs.t +++ b/tests/functional/restart/41-auto-restart-local-jobs.t @@ -73,8 +73,8 @@ ${BASE_GLOBAL_CONFIG} condemned = ${CYLC_TEST_HOST1} " -FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" -log_scan "${TEST_NAME}-stop" "${FILE}" 40 1 \ +LOG_FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" +log_scan "${TEST_NAME}-stop-log-scan" "${LOG_FILE}" 40 1 \ 'The Cylc workflow host will soon become un-available' \ 'Waiting for jobs running on localhost to complete' \ 'Waiting for jobs running on localhost to complete' \ @@ -82,10 +82,11 @@ log_scan "${TEST_NAME}-stop" "${FILE}" 40 1 \ "Attempting to restart on \"${CYLC_TEST_HOST2}\"" # we shouldn't have any orphaned tasks because we should # have waited for them to complete -grep_fail 'orphaned task' "${FILE}" +grep_fail 'orphaned task' "$LOG_FILE" poll_workflow_restart -grep_workflow_log_ok "restart-log-grep" "Workflow now running on \"${CYLC_TEST_HOST2}\"" +log_scan "${TEST_NAME}-restart-log-scan" "$LOG_FILE" 20 1 \ + "Workflow now running on \"${CYLC_TEST_HOST2}\"" #------------------------------------------------------------------------------- # auto stop-restart - force mode: # ensure the workflow DOESN'T WAIT for local jobs to complete before stopping @@ -103,8 +104,8 @@ ${BASE_GLOBAL_CONFIG} condemned = ${CYLC_TEST_HOST2}! " -FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" -log_scan "${TEST_NAME}-stop" "${FILE}" 40 1 \ +LOG_FILE="$(cylc cat-log "${WORKFLOW_NAME}" -m p |xargs readlink -f)" +log_scan "${TEST_NAME}-stop-log-scan" "${LOG_FILE}" 40 1 \ 'The Cylc workflow host will soon become un-available' \ 'This workflow will be shutdown as the workflow host is unable to continue' \ 'Workflow shutting down - REQUEST(NOW)' \ diff --git a/tests/integration/main_loop/__init__.py b/tests/integration/main_loop/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/main_loop/test_auto_restart.py b/tests/integration/main_loop/test_auto_restart.py new file mode 100644 index 00000000000..a83fb7c7932 --- /dev/null +++ b/tests/integration/main_loop/test_auto_restart.py @@ -0,0 +1,50 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +from unittest.mock import Mock + +import pytest + +from cylc.flow.main_loop import MainLoopPluginException +from cylc.flow.scheduler import Scheduler +from cylc.flow.workflow_status import AutoRestartMode + + +async def test_no_detach( + one_conf, flow, scheduler, run, mock_glbl_cfg, log_filter, + monkeypatch: pytest.MonkeyPatch +): + """Test that the Scheduler aborts when auto restart tries to happen + while in no-detach mode.""" + mock_glbl_cfg( + 'cylc.flow.scheduler.glbl_cfg', ''' + [scheduler] + [[main loop]] + plugins = auto restart + [[[auto restart]]] + interval = PT1S + ''') + monkeypatch.setattr( + 'cylc.flow.main_loop.auto_restart._should_auto_restart', + Mock(return_value=AutoRestartMode.RESTART_NORMAL) + ) + reg: str = flow(one_conf) + schd: Scheduler = scheduler(reg, paused_start=True, no_detach=True) + with pytest.raises(MainLoopPluginException) as exc: + async with run(schd) as log: + await asyncio.sleep(2) + assert log_filter(log, contains=f"Workflow shutting down - {exc.value}") diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 6c367390d0f..c4578ef6a70 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -32,6 +32,8 @@ TASK_STATUS_FAILED ) +from cylc.flow.workflow_status import AutoRestartMode + from .utils.flow_tools import _make_flow @@ -137,7 +139,7 @@ async def mock_shutdown(*a, **k): assert last_record.message == "Error on shutdown" assert last_record.levelno == logging.ERROR assert last_record.exc_text is not None - assert last_record.exc_text.startswith("Traceback (most recent call last)") + assert last_record.exc_text.startswith(TRACEBACK_MSG) assert ("During handling of the above exception, " "another exception occurred") not in last_record.exc_text @@ -343,3 +345,29 @@ def raise_ParsecError(*a, **k): exact_match="Workflow shutting down - Mock error" ) assert TRACEBACK_MSG in log.text + + +async def test_error_during_auto_restart( + one: Scheduler, + run: Callable, + log_filter: Callable, + monkeypatch: pytest.MonkeyPatch, +): + """Test that an error during auto-restart does not get swallowed""" + log: pytest.LogCaptureFixture + err_msg = "Mock error: sugar in water" + + def mock_auto_restart(*a, **k): + raise RuntimeError(err_msg) + + monkeypatch.setattr(one, 'workflow_auto_restart', mock_auto_restart) + monkeypatch.setattr( + one, 'auto_restart_mode', AutoRestartMode.RESTART_NORMAL + ) + + with pytest.raises(RuntimeError, match=err_msg): + async with run(one) as log: + pass + + assert log_filter(log, level=logging.ERROR, contains=err_msg) + assert TRACEBACK_MSG in log.text diff --git a/tests/unit/main_loop/test_auto_restart.py b/tests/unit/main_loop/test_auto_restart.py index 8b8e33cf40b..bab949ae79f 100644 --- a/tests/unit/main_loop/test_auto_restart.py +++ b/tests/unit/main_loop/test_auto_restart.py @@ -20,7 +20,9 @@ import pytest from cylc.flow import CYLC_LOG -from cylc.flow.exceptions import CylcConfigError, HostSelectException +from cylc.flow.exceptions import ( + CylcConfigError, CylcError, HostSelectException +) from cylc.flow.main_loop.auto_restart import ( _can_auto_restart, _set_auto_restart, @@ -28,6 +30,7 @@ auto_restart, ) from cylc.flow.parsec.exceptions import ParsecError +from cylc.flow.scheduler import Scheduler from cylc.flow.workflow_status import ( AutoRestartMode, StopMode @@ -185,15 +188,16 @@ def test_set_auto_restart_already_restarting(caplog): assert caplog.record_tuples == [] -def test_set_auto_restart_no_detach(caplog): - """Ensure raises RuntimeError if running in no-detach mode.""" +def test_set_auto_restart_no_detach(caplog: pytest.LogCaptureFixture): + """Ensure raises a CylcError (or subclass) if running in no-detach mode.""" scheduler = Mock( + spec=Scheduler, stop_mode=None, auto_restart_time=None, options=Mock(no_detach=True) ) with caplog.at_level(level=logging.DEBUG, logger=CYLC_LOG): - with pytest.raises(RuntimeError): + with pytest.raises(CylcError): _set_auto_restart(scheduler) assert caplog.record_tuples == []