Skip to content

Commit

Permalink
Merge pull request #5064 from cylc/8.0.x
Browse files Browse the repository at this point in the history
8.0.x: bugfixes
  • Loading branch information
MetRonnie authored Aug 12, 2022
2 parents 4ddb3ee + b338a91 commit 138ec26
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 154 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ cycle point validation in the UI.
[#5037](https://github.com/cylc/cylc-flow/pull/5037) - Fix bug where the
workflow restart number would get wiped on reload.

[#5049](https://github.com/cylc/cylc-flow/pull/5049) - Fix several small
bugs related to auto restart.

-------------------------------------------------------------------------------
## __cylc-8.0.0 (<span actions:bind='release-date'>Released 2022-07-28</span>)__

Expand Down
139 changes: 64 additions & 75 deletions cylc/flow/loggingutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,19 @@ class RotatingLogFileHandler(logging.FileHandler):
for names.
Argument:
log_file_path: path to the log file
log_file_path: path to the log file (symlink to latest log file)
no_detach: non-detach mode?
restart_num: restart number for the run
timestamp: Add timestamp to log formatting?
"""

FILE_HEADER_FLAG = 'cylc_log_file_header'
FILE_NUM = 'cylc_log_num'
ROLLOVER_NUM = 'cylc_log_num'
MIN_BYTES = 1024

extra = {FILE_HEADER_FLAG: True}
extra_num = {
FILE_HEADER_FLAG: True,
FILE_NUM: 1
}
header_extra = {FILE_HEADER_FLAG: True}
"""Use to indicate the log msg is a header that should be logged on
every rollover"""

def __init__(
self,
Expand All @@ -152,9 +150,11 @@ def __init__(
logging.FileHandler.__init__(self, log_file_path)
self.no_detach = no_detach
self.formatter = CylcLogFormatter(timestamp=timestamp)
# Header records get appended to when calling
# LOG.info(extra=RotatingLogFileHandler.[rollover_]header_extra)
self.header_records: List[logging.LogRecord] = []
self.restart_num = restart_num
self.log_num: Optional[int] = None
self.log_num: Optional[int] = None # null value until log file created

def emit(self, record):
"""Emit a record, rollover log if necessary."""
Expand All @@ -169,16 +169,14 @@ def emit(self, record):
except Exception:
self.handleError(record)

def load_type_change(self):
def load_type_change(self) -> bool:
"""Has there been a load-type change, e.g. restart?"""
current_load_type = self.get_load_type()
existing_load_type = self.existing_log_load_type()
# Rollover if the load type has changed.
if existing_load_type and current_load_type != existing_load_type:
if existing_load_type and self.load_type != existing_load_type:
return True
return False

def existing_log_load_type(self):
def existing_log_load_type(self) -> Optional[str]:
"""Return a log load type, if one currently exists"""
try:
existing_log_name = os.readlink(self.baseFilename)
Expand All @@ -188,12 +186,11 @@ def existing_log_load_type(self):
for load_type in [RESTART_LOAD_TYPE, START_LOAD_TYPE]:
if existing_log_name.find(load_type) > 0:
return load_type
return None

def should_rollover(self, record):
def should_rollover(self, record: logging.LogRecord) -> bool:
"""Should rollover?"""
if (self.stream is None or
self.load_type_change() or
self.log_num is None):
if self.log_num is None or self.stream is None:
return True
max_bytes = glbl_cfg().get(
['scheduler', 'logging', 'maximum size in bytes'])
Expand All @@ -208,50 +205,41 @@ def should_rollover(self, record):
raise SystemExit(exc)
return self.stream.tell() + len(msg.encode('utf8')) >= max_bytes

def get_load_type(self):
@property
def load_type(self) -> str:
"""Establish current load type, as perceived by scheduler."""
if self.restart_num > 0:
return RESTART_LOAD_TYPE
return START_LOAD_TYPE

def do_rollover(self):
def do_rollover(self) -> None:
"""Create and rollover log file if necessary."""
# Generate new file name
filename = self.get_new_log_filename()
os.makedirs(os.path.dirname(filename), exist_ok=True)
# Touch file
with open(filename, 'w+'):
os.utime(filename, None)
# Update symlink
if (os.path.exists(self.baseFilename) or
os.path.lexists(self.baseFilename)):
os.unlink(self.baseFilename)
os.symlink(os.path.basename(filename), self.baseFilename)
# Housekeep log files
# Create new log file
self.new_log_file()
# Housekeep old log files
arch_len = glbl_cfg().get(
['scheduler', 'logging', 'rolling archive length'])
if arch_len:
self.update_log_archive(arch_len)
# Reopen stream, redirect STDOUT and STDERR to log
if self.stream:
self.stream.close()
self.stream = None
self.stream = self._open()
# Dup STDOUT and STDERR in detach mode
if not self.no_detach:
os.dup2(self.stream.fileno(), sys.stdout.fileno())
os.dup2(self.stream.fileno(), sys.stderr.fileno())
# Emit header records (should only do this for subsequent log files)
for header_record in self.header_records:
if self.FILE_NUM in header_record.__dict__:
# Increment log file number
header_record.__dict__[self.FILE_NUM] += 1
# strip the hard coded log number (1) from the log message
# replace with the log number for that start.
# Note this is different from the log number in the file name
# which is cumulative over the workflow.
header_record.args = header_record.args[0:-1] + (
header_record.__dict__[self.FILE_NUM],)
if self.ROLLOVER_NUM in header_record.__dict__:
# A hack to increment the rollover number that gets logged in
# the log file. (Rollover number only applies to a particular
# workflow run; note this is different from the log count
# number in the log filename.)
header_record.__dict__[self.ROLLOVER_NUM] += 1
header_record.args = (
header_record.__dict__[self.ROLLOVER_NUM],
)
logging.FileHandler.emit(self, header_record)

def update_log_archive(self, arch_len):
Expand All @@ -265,30 +253,32 @@ def update_log_archive(self, arch_len):
while len(log_files) > arch_len:
os.unlink(log_files.pop(0))

def get_new_log_filename(self):
"""Build filename for log"""
base_dir = Path(self.baseFilename).parent
load_type = self.get_load_type()
if load_type == START_LOAD_TYPE:
run_num = 1
elif load_type == RESTART_LOAD_TYPE:
run_num = self.restart_num + 1
self.set_log_num()
filename = base_dir.joinpath(
f'{self.log_num:02d}-{load_type}-{run_num:02d}{LOG_FILE_EXTENSION}'
def new_log_file(self) -> Path:
"""Set self.log_num and create new log file."""
try:
log_file = os.readlink(self.baseFilename)
except OSError:
# "log" symlink not yet created, this is the first log
self.log_num = 1
else:
self.log_num = get_next_log_number(log_file)
log_dir = Path(self.baseFilename).parent
# User-facing restart num is 1 higher than backend value
restart_num = self.restart_num + 1
filename = log_dir.joinpath(
f'{self.log_num:02d}-{self.load_type}-{restart_num:02d}'
f'{LOG_FILE_EXTENSION}'
)
os.makedirs(filename.parent, exist_ok=True)
# Touch file
with open(filename, 'w+'):
os.utime(filename, None)
# Update symlink
if os.path.lexists(self.baseFilename):
os.unlink(self.baseFilename)
os.symlink(os.path.basename(filename), self.baseFilename)
return filename

def set_log_num(self):
if not self.log_num:
try:
current_log = os.readlink(self.baseFilename)
self.log_num = int(get_next_log_number(current_log))
except OSError:
self.log_num = 1
else:
self.log_num = int(self.log_num) + 1


class ReferenceLogFileHandler(logging.FileHandler):
"""A handler class which writes filtered reference logging records
Expand Down Expand Up @@ -378,29 +368,28 @@ def close_log(logger: logging.Logger) -> None:
handler.close()


def get_next_log_number(log: str) -> str:
"""Returns the next log number for the log specified.
def get_next_log_number(log_filepath: Union[str, Path]) -> int:
"""Returns the next log number for the given log file path/name.
Log name formats are of the form :
<log number>-<load type>-<start number>
When given the latest log it returns the next log number, with padded 0s.
When given the latest log it returns the next log number.
Examples:
>>> get_next_log_number('01-restart-02.log')
'02'
>>> get_next_log_number('/some/path/to/19-start-20.cylc')
'20'
>>> get_next_log_number('03-restart-02.log')
4
>>> get_next_log_number('/some/path/to/19-start-01.cylc')
20
>>> get_next_log_number('199-start-08.log')
'200'
200
>>> get_next_log_number('blah')
'01'
1
"""
try:
stripped_log = os.path.basename(log)
next_log_num = int(stripped_log.partition("-")[0]) + 1
stripped_log = os.path.basename(log_filepath)
return int(stripped_log.partition("-")[0]) + 1
except ValueError:
next_log_num = 1
return f'{next_log_num:02d}'
return 1


def get_sorted_logs_by_time(
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/main_loop/auto_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.main_loop import periodic
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.scheduler import SchedulerError
from cylc.flow.workflow_status import AutoRestartMode
from cylc.flow.wallclock import (
get_time_string_from_unix_time as time2str
Expand Down Expand Up @@ -224,7 +225,7 @@ def _set_auto_restart(
# This should raise an "abort" event and return a non-zero code to the
# caller still attached to the workflow process.
if scheduler.options.no_detach:
raise RuntimeError('Workflow host condemned in no detach mode')
raise SchedulerError('Workflow host condemned in no detach mode')

# Check workflow is able to be safely restarted.
if not _can_auto_restart():
Expand Down
18 changes: 13 additions & 5 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@
ClientTimeout,
WorkflowRuntimeClient,
)
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.pathutil import (
get_cylc_run_dir,
get_workflow_run_dir,
)
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.workflow_files import (
ContactFileFields,
Expand Down Expand Up @@ -528,11 +531,16 @@ def _callback(_, entry):
key, value = entry
params[key] = value

db_file = flow['path'] / SERVICE / 'db'
# NOTE: use the public DB for reading
# (only the scheduler process/thread should access the private database)
db_file = Path(get_workflow_run_dir(flow['name'], 'log', 'db'))
if db_file.exists():
dao = CylcWorkflowDAO(db_file, is_public=False)
dao.connect()
dao.select_workflow_params(_callback)
flow['workflow_params'] = params
try:
dao.connect()
dao.select_workflow_params(_callback)
flow['workflow_params'] = params
finally:
dao.close()

return flow
Loading

0 comments on commit 138ec26

Please sign in to comment.