Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8.0.x.patch #5178

Merged
merged 12 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ Maintenance release.

### Fixes

[#5125](https://github.com/cylc/cylc-flow/pull/5125) - Allow rose-suite.conf
changes to be considered by ``cylc reinstall``.

[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered
after a shutdown was ordered should submit to run immediately on restart.

Expand Down
54 changes: 51 additions & 3 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""Exceptions for "expected" errors."""

import errno
from textwrap import wrap
from typing import (
Callable,
Iterable,
Expand Down Expand Up @@ -372,11 +373,58 @@ def __init__(self, data):
def __str__(self):
ret = 'Could not select host from:'
for host, data in sorted(self.data.items()):
ret += f'\n {host}:'
for key, value in data.items():
ret += f'\n {key}: {value}'
if host != 'ranking':
ret += f'\n {host}:'
for key, value in data.items():
ret += f'\n {key}: {value}'
hint = self.get_hint()
if hint:
ret += f'\n\n{hint}'
return ret

def get_hint(self):
"""Return a hint to explain this error for certain cases."""
if all(
# all procs came back with special SSH error code 255
datum.get('returncode') == 255
for key, datum in self.data.items()
if key != 'ranking'
):
# likely SSH issues
return (
'Cylc could not establish SSH connection to the run hosts.'
'\nEnsure you can SSH to these hosts without having to'
' answer any prompts.'
)

if (
# a ranking expression was used
self.data.get('ranking')
# and all procs came back with special 'cylc psutil' error code 2
# (which is used for errors relating to the extraction of metrics)
and all(
datum.get('returncode') == 2
for key, datum in self.data.items()
if key != 'ranking'
)
):
# likely an issue with the ranking expression
ranking = "\n".join(
wrap(
self.data.get("ranking"),
initial_indent=' ',
subsequent_indent=' ',
)
)
return (
'This is likely an error in the ranking expression:'
f'\n{ranking}'
'\n\nConfigured by:'
'\n global.cylc[scheduler][run hosts]ranking'
)

return None


class NoHostsError(CylcError):
"""None of the hosts of a given platform were reachable."""
Expand Down
85 changes: 52 additions & 33 deletions cylc/flow/host_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@

from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import CylcError, HostSelectException
from cylc.flow.exceptions import GlobalConfigError, HostSelectException
from cylc.flow.hostuserutil import get_fqdn_by_host, is_remote_host
from cylc.flow.remote import run_cmd, cylc_server_cmd
from cylc.flow.terminal import parse_dirty_json


GLBL_CFG_STR = 'global.cylc[scheduler][run hosts]ranking'


def select_workflow_host(cached=True):
"""Return a host as specified in `[workflow hosts]`.

Expand Down Expand Up @@ -77,10 +80,10 @@ def select_workflow_host(cached=True):


def select_host(
hosts,
ranking_string=None,
blacklist=None,
blacklist_name=None
hosts,
ranking_string=None,
blacklist=None,
blacklist_name=None
):
"""Select a host from the provided list.

Expand Down Expand Up @@ -165,6 +168,7 @@ def select_host(
if ranking_string:
# parse rankings
rankings = list(_get_rankings(ranking_string))
data['ranking'] = ranking_string

if not rankings:
# no metrics or ranking required, pick host at random
Expand Down Expand Up @@ -202,10 +206,10 @@ def select_host(


def _filter_by_hostname(
hosts,
blacklist,
blacklist_name=None,
data=None
hosts,
blacklist,
blacklist_name=None,
data=None
):
"""Filter out any hosts present in `blacklist`.

Expand Down Expand Up @@ -292,7 +296,15 @@ def _filter_by_ranking(hosts, rankings, results, data=None):
host_rank = []
for key, expression in rankings:
item = _reformat_expr(key, expression)
result = _simple_eval(expression, RESULT=results[host][key])
try:
result = _simple_eval(expression, RESULT=results[host][key])
except Exception as exc:
raise GlobalConfigError(
'Invalid host ranking expression'
f'\n Expression: {item}'
f'\n Configuration: {GLBL_CFG_STR}'
f'\n Error: {exc}'
)
if isinstance(result, bool):
host_rankings[item] = result
data[host][item] = result
Expand Down Expand Up @@ -332,11 +344,11 @@ def visit(self, node):
# variables
ast.Name, ast.Load, ast.Attribute, ast.Subscript, ast.Index,
# opers
ast.BinOp, ast.operator,
ast.BinOp, ast.operator, ast.UnaryOp, ast.unaryop,
# types
ast.Num, ast.Str,
# comparisons
ast.Compare, ast.cmpop, ast.List, ast.Tuple
ast.Compare, ast.cmpop, ast.List, ast.Tuple,
)


Expand All @@ -356,6 +368,8 @@ def _simple_eval(expr, **variables):
Examples:
>>> _simple_eval('1 + 1')
2
>>> _simple_eval('1 * -1')
-1
>>> _simple_eval('1 < a', a=2)
True
>>> _simple_eval('1 in (1, 2, 3)')
Expand All @@ -367,25 +381,31 @@ def _simple_eval(expr, **variables):
If you try to get it to do something you're not allowed to:
>>> _simple_eval('open("foo")')
Traceback (most recent call last):
CylcError: Invalid expression: open("foo")
ValueError: <class '_ast.Call'>
>>> _simple_eval('import sys')
Traceback (most recent call last):
SyntaxError: ...

If you try to get hold of something you aren't supposed to:
>>> answer = 42 # only variables explicitly passed in should work
>>> _simple_eval('answer')
Traceback (most recent call last):
NameError: name 'a' is not defined

If you try to do something which doesn't make sense:
>>> _simple_eval('a.b.c') # no value "a.b.c"
Traceback (most recent call last):
CylcError: Invalid expression: a.b.c
NameError: name 'answer' is not defined

"""
try:
node = ast.parse(expr.strip(), mode='eval')
SimpleVisitor().visit(node)
# acceptable use of eval due to restricted language features
return eval( # nosec
compile(node, '<string>', 'eval'),
{'__builtins__': None},
variables
)
except Exception as exc:
raise CylcError(f'Invalid expression: {expr}\n{exc}')
node = ast.parse(expr.strip(), mode='eval')
SimpleVisitor().visit(node)
# acceptable use of eval due to restricted language features
return eval( # nosec
compile(node, '<string>', 'eval'),
{'__builtins__': {}},
variables
)


def _get_rankings(string):
Expand Down Expand Up @@ -512,9 +532,9 @@ def _get_metrics(hosts, metrics, data=None):
Used for logging success/fail outcomes of the form {host: {}}

Examples:
Command failure:
Command failure (no such attribute of psutil):
>>> _get_metrics(['localhost'], [['elephant']])
({}, {'localhost': {'get_metrics': 'Command failed (exit: 1)'}})
({}, {'localhost': {'returncode': 2}})

Returns:
dict - {host: {(function, arg1, arg2, ...): result}}
Expand Down Expand Up @@ -543,21 +563,20 @@ def _get_metrics(hosts, metrics, data=None):
if proc.poll() is None:
continue
del proc_map[host]
out, err = proc.communicate()
out, err = (stream.strip() for stream in proc.communicate())
if proc.wait():
# Command failed in verbose/debug mode
# Command failed
LOG.warning(
'Could not evaluate "%s" (return code %d)\n%s',
host, proc.returncode, err
'Error evaluating ranking expression on'
f' {host}: \n{err}'
)
data[host]['get_metrics'] = (
f'Command failed (exit: {proc.returncode})')
else:
host_stats[host] = dict(zip(
metrics,
# convert JSON dicts -> namedtuples
_deserialise(metrics, parse_dirty_json(out))
))
data[host]['returncode'] = proc.returncode
sleep(0.01)
return host_stats, data

Expand Down
16 changes: 11 additions & 5 deletions cylc/flow/scripts/psutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ def get_option_parser():

def _psutil(metrics_json):
metrics = parse_dirty_json(metrics_json)
methods = [
getattr(psutil, key[0])
for key in metrics
]

try:
methods = [
getattr(psutil, key[0])
for key in metrics
]
except AttributeError as exc:
# error obtaining interfaces from psutil e.g:
# * requesting a method which does not exist
print(exc, file=sys.stderr)
sys.exit(2)

try:
ret = [
Expand All @@ -60,7 +67,6 @@ def _psutil(metrics_json):
]
except Exception as exc:
# error extracting metrics from psutil e.g:
# * requesting a method which does not exist
# * requesting information on a resource which does not exist
print(exc, file=sys.stderr)
sys.exit(2)
Expand Down
23 changes: 19 additions & 4 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -1464,18 +1464,26 @@ def get_rsync_rund_cmd(src, dst, reinstall=False, dry_run=False):
rsync_cmd.append("--dry-run")
if reinstall:
rsync_cmd.append('--delete')
for exclude in [

exclusions = [
'.git',
'.svn',
'.cylcignore',
'rose-suite.conf',
'opt/rose-suite-cylc-install.conf',
WorkflowFiles.LOG_DIR,
WorkflowFiles.WORK_DIR,
WorkflowFiles.SHARE_DIR,
WorkflowFiles.Install.DIRNAME,
WorkflowFiles.Service.DIRNAME
]:
]

# This is a hack to make sure that changes to rose-suite.conf
# are considered when re-installing.
# It should be removed after https://github.com/cylc/cylc-rose/issues/149
if not dry_run:
exclusions.append('rose-suite.conf')

for exclude in exclusions:
if (Path(src).joinpath(exclude).exists() or
Path(dst).joinpath(exclude).exists()):
rsync_cmd.append(f"--exclude={exclude}")
Expand Down Expand Up @@ -1527,12 +1535,19 @@ def reinstall_workflow(
reinstall=True,
dry_run=dry_run,
)

# Add '+++' to -out-format to mark lines passed through formatter.
rsync_cmd.append('--out-format=+++%o %n%L+++')

# Run rsync command:
reinstall_log.info(cli_format(rsync_cmd))
LOG.debug(cli_format(rsync_cmd))
proc = Popen(rsync_cmd, stdout=PIPE, stderr=PIPE, text=True) # nosec
# * command is constructed via internal interface
stdout, stderr = proc.communicate()
stdout = stdout.strip()

# Strip unwanted output.
stdout = ('\n'.join(re.findall(r'\+\+\+(.*)\+\+\+', stdout))).strip()
stderr = stderr.strip()

if proc.returncode != 0:
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/test_reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def non_interactive(monkeypatch):
def one_src(tmp_path):
src_dir = tmp_path
(src_dir / 'flow.cylc').touch()
# (src_dir / 'rose-suite.conf').touch()
(src_dir / 'rose-suite.conf').touch()
return SimpleNamespace(path=src_dir)


Expand Down Expand Up @@ -242,15 +242,15 @@ def test_rose_warning(one_src, one_run, capsys, interactive, monkeypatch):
)
(one_src.path / 'a').touch() # give it something to install

# reinstall (no rose-suite.conf file)
reinstall_cli(opts=ReInstallOptions(), args=one_run.id)
assert rose_message not in capsys.readouterr().err

# reinstall (with rose-suite.conf file)
(one_src.path / 'rose-suite.conf').touch()
reinstall_cli(opts=ReInstallOptions(), args=one_run.id)
assert rose_message in capsys.readouterr().err

# reinstall (no rose-suite.conf file)
(one_src.path / 'rose-suite.conf').unlink()
reinstall_cli(opts=ReInstallOptions(), args=one_run.id)
assert rose_message not in capsys.readouterr().err


def test_keyboard_interrupt(
one_src,
Expand Down
Loading