Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc clean remote timeout improvements #5872

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5872.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improvements to `cylc clean` remote timeout handling.
27 changes: 20 additions & 7 deletions cylc/flow/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
Union,
)

from metomi.isodatetime.exceptions import ISO8601SyntaxError
from metomi.isodatetime.parsers import DurationParser

from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import (
Expand Down Expand Up @@ -187,7 +190,7 @@ def init_clean(id_: str, opts: 'Values') -> None:

if platform_names and platform_names != {'localhost'}:
remote_clean(
id_, platform_names, opts.rm_dirs, opts.remote_timeout
id_, platform_names, opts.remote_timeout, opts.rm_dirs
)

if not opts.remote_only:
Expand Down Expand Up @@ -338,8 +341,8 @@ def _clean_using_glob(
def remote_clean(
id_: str,
platform_names: Iterable[str],
timeout: str,
rm_dirs: Optional[List[str]] = None,
timeout: str = '120'
) -> None:
"""Run subprocesses to clean a workflow on its remote install targets
(skip localhost), given a set of platform names to look up.
Expand All @@ -348,8 +351,9 @@ def remote_clean(
id_: Workflow name.
platform_names: List of platform names to look up in the global
config, in order to determine the install targets to clean on.
timeout: ISO 8601 duration or number of seconds to wait before
cancelling.
rm_dirs: Sub dirs to remove instead of the whole run dir.
timeout: Number of seconds to wait before cancelling.
"""
try:
install_targets_map = (
Expand All @@ -358,6 +362,10 @@ def remote_clean(
raise PlatformLookupError(
f"Cannot clean {id_} on remote platforms as the workflow database "
f"is out of date/inconsistent with the global config - {exc}")

with suppress(ISO8601SyntaxError):
timeout = str(DurationParser().parse(timeout).get_seconds())
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved

queue: Deque[RemoteCleanQueueTuple] = deque()
remote_clean_cmd = partial(
_remote_clean_cmd, id_=id_, rm_dirs=rm_dirs, timeout=timeout
Expand All @@ -376,7 +384,7 @@ def remote_clean(
remote_clean_cmd(platform=platforms[0]), target, platforms
)
)
failed_targets: Dict[str, PlatformError] = {}
failed_targets: Dict[str, Union[PlatformError, str]] = {}
# Handle subproc pool results almost concurrently:
while queue:
item = queue.popleft()
Expand All @@ -387,7 +395,12 @@ def remote_clean(
out, err = item.proc.communicate()
if out:
LOG.info(f"[{item.install_target}]\n{out}")
if ret_code:
if ret_code == 124:
failed_targets[item.install_target] = (
f"cylc clean timed out after {timeout}s. You can increase "
"this timeout using the --timeout option."
)
elif ret_code:
this_platform = item.platforms.pop(0)
excp = PlatformError(
PlatformError.MSG_TIDY,
Expand Down Expand Up @@ -415,9 +428,9 @@ def remote_clean(
LOG.debug(f"[{item.install_target}]\n{err}")
sleep(0.2)
if failed_targets:
for target, excp in failed_targets.items():
for target, info in failed_targets.items():
LOG.error(
f"Could not clean {id_} on install target: {target}\n{excp}"
f"Could not clean {id_} on install target: {target}\n{info}"
)
raise CylcError(f"Remote clean failed for {id_}")

Expand Down
8 changes: 5 additions & 3 deletions cylc/flow/scripts/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ def get_option_parser():

parser.add_option(
'--timeout',
help=("The number of seconds to wait for cleaning to take place on "
r"remote hosts before cancelling. Default: %default."),
action='store', default='120', dest='remote_timeout'
help=(
"The length of time to wait for cleaning to take place on "
r"remote hosts before cancelling. Default: %default."
),
action='store', default='PT5M', dest='remote_timeout'
)

parser.add_option(
Expand Down
10 changes: 8 additions & 2 deletions tests/functional/cylc-clean/01-remote.t
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ SSH_CMD="$(cylc config -d -i "[platforms][${CYLC_TEST_PLATFORM}]ssh command") ${
if ! $SSH_CMD command -v 'tree' > '/dev/null'; then
skip_all "'tree' command not available on remote host ${CYLC_TEST_HOST}"
fi
set_test_number 10
set_test_number 12

# Generate random name for symlink dirs to avoid any clashes with other tests
SYM_NAME="$(mktemp -u)"
Expand Down Expand Up @@ -108,7 +108,13 @@ __TREE__

# -----------------------------------------------------------------------------

TEST_NAME="cylc-clean"
TEST_NAME="cylc-clean-timeout"
run_fail "$TEST_NAME" cylc clean --timeout 'PT0,1S' "$WORKFLOW_NAME"
dump_std "$TEST_NAME"
grep_ok 'cylc clean timed out after 0.1s' "${TEST_NAME}.stderr"


TEST_NAME="cylc-clean-ok"
run_ok "$TEST_NAME" cylc clean "$WORKFLOW_NAME"
dump_std "$TEST_NAME"

Expand Down
49 changes: 45 additions & 4 deletions tests/unit/test_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
from glob import iglob
from pathlib import Path
from subprocess import Popen
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -274,7 +275,8 @@ def test_init_clean__rm_dirs(
init_clean(id_, opts=opts)
mock_clean.assert_called_with(id_, run_dir, expected_clean)
mock_remote_clean.assert_called_with(
id_, platforms, expected_remote_clean, opts.remote_timeout)
id_, platforms, opts.remote_timeout, expected_remote_clean
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -920,7 +922,7 @@ def test_remote_clean(
# Remove randomness:
monkeymock('cylc.flow.clean.shuffle')

def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
def mocked_remote_clean_cmd_side_effect(id_, platform, timeout, rm_dirs):
proc_ret_code = 0
if failed_platforms and platform['name'] in failed_platforms:
proc_ret_code = failed_platforms[platform['name']]
Expand All @@ -942,11 +944,13 @@ def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
if exc_expected:
with pytest.raises(CylcError) as exc:
cylc_clean.remote_clean(
id_, platform_names, rm_dirs, timeout='irrelevant')
id_, platform_names, timeout='irrelevant', rm_dirs=rm_dirs
)
assert "Remote clean failed" in str(exc.value)
else:
cylc_clean.remote_clean(
id_, platform_names, rm_dirs, timeout='irrelevant')
id_, platform_names, timeout='irrelevant', rm_dirs=rm_dirs
)
for msg in expected_err_msgs:
assert log_filter(caplog, level=logging.ERROR, contains=msg)
if expected_platforms:
Expand All @@ -960,6 +964,43 @@ def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
assert f"{p_name} - {PlatformError.MSG_TIDY}" in caplog.text


@pytest.mark.parametrize(
'timeout, expected',
[('100', '100'),
('PT1M2S', '62.0')]
)
def test_remote_clean__timeout(
monkeymock: MonkeyMock,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
timeout: str,
expected: str,
):
"""Test remote_clean() timeout.

- It should accept ISO 8601 format or number of seconds.
- It should give a sensible error message for return code 124.
"""
caplog.set_level(logging.ERROR, CYLC_LOG)
mock_remote_clean_cmd = monkeymock(
'cylc.flow.clean._remote_clean_cmd',
spec=_remote_clean_cmd,
return_value=mock.Mock(
spec=Popen, poll=lambda: 124, communicate=lambda: ('', '')
)
)
monkeypatch.setattr(
'cylc.flow.clean.get_install_target_to_platforms_map',
lambda *a, **k: {'picard': [PLATFORMS['stargazer']]}
)

with pytest.raises(CylcError):
cylc_clean.remote_clean('blah', 'blah', timeout)
_, kwargs = mock_remote_clean_cmd.call_args
assert kwargs['timeout'] == expected
assert f"cylc clean timed out after {expected}s" in caplog.text


@pytest.mark.parametrize(
'rm_dirs, expected_args',
[
Expand Down
Loading