Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc clean remote timeout improvements #5872

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5872.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improvements to `cylc clean` remote timeout handling.
21 changes: 14 additions & 7 deletions cylc/flow/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@

if platform_names and platform_names != {'localhost'}:
remote_clean(
id_, platform_names, opts.rm_dirs, opts.remote_timeout
id_, platform_names, opts.remote_timeout, opts.rm_dirs
)

if not opts.remote_only:
Expand Down Expand Up @@ -338,8 +338,8 @@
def remote_clean(
id_: str,
platform_names: Iterable[str],
timeout: str,
rm_dirs: Optional[List[str]] = None,
timeout: str = '120'
) -> None:
"""Run subprocesses to clean a workflow on its remote install targets
(skip localhost), given a set of platform names to look up.
Expand All @@ -348,8 +348,9 @@
id_: Workflow name.
platform_names: List of platform names to look up in the global
config, in order to determine the install targets to clean on.
timeout: ISO 8601 duration or number of seconds to wait before
cancelling.
rm_dirs: Sub dirs to remove instead of the whole run dir.
timeout: Number of seconds to wait before cancelling.
"""
try:
install_targets_map = (
Expand All @@ -358,6 +359,7 @@
raise PlatformLookupError(
f"Cannot clean {id_} on remote platforms as the workflow database "
f"is out of date/inconsistent with the global config - {exc}")

queue: Deque[RemoteCleanQueueTuple] = deque()
remote_clean_cmd = partial(
_remote_clean_cmd, id_=id_, rm_dirs=rm_dirs, timeout=timeout
Expand All @@ -376,7 +378,7 @@
remote_clean_cmd(platform=platforms[0]), target, platforms
)
)
failed_targets: Dict[str, PlatformError] = {}
failed_targets: Dict[str, Union[PlatformError, str]] = {}
# Handle subproc pool results almost concurrently:
while queue:
item = queue.popleft()
Expand All @@ -387,7 +389,12 @@
out, err = item.proc.communicate()
if out:
LOG.info(f"[{item.install_target}]\n{out}")
if ret_code:
if ret_code == 124:
failed_targets[item.install_target] = (

Check warning on line 393 in cylc/flow/clean.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/clean.py#L393

Added line #L393 was not covered by tests
f"cylc clean timed out after {timeout}s. You can increase "
"this timeout using the --timeout option."
)
elif ret_code:
this_platform = item.platforms.pop(0)
excp = PlatformError(
PlatformError.MSG_TIDY,
Expand Down Expand Up @@ -415,9 +422,9 @@
LOG.debug(f"[{item.install_target}]\n{err}")
sleep(0.2)
if failed_targets:
for target, excp in failed_targets.items():
for target, info in failed_targets.items():
LOG.error(
f"Could not clean {id_} on install target: {target}\n{excp}"
f"Could not clean {id_} on install target: {target}\n{info}"
)
raise CylcError(f"Remote clean failed for {id_}")

Expand Down
41 changes: 37 additions & 4 deletions cylc/flow/scripts/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import sys
from typing import TYPE_CHECKING, Iterable, List, Tuple

from metomi.isodatetime.exceptions import ISO8601SyntaxError
from metomi.isodatetime.parsers import DurationParser

from cylc.flow import LOG
from cylc.flow.clean import init_clean, get_contained_workflows
from cylc.flow.exceptions import CylcError, InputError
Expand Down Expand Up @@ -120,9 +123,11 @@

parser.add_option(
'--timeout',
help=("The number of seconds to wait for cleaning to take place on "
r"remote hosts before cancelling. Default: %default."),
action='store', default='120', dest='remote_timeout'
help=(
"The length of time to wait for cleaning to take place on "
r"remote hosts before cancelling. Default: %default."
),
action='store', default='PT5M', dest='remote_timeout'
)

parser.add_option(
Expand All @@ -138,6 +143,24 @@
CleanOptions = Options(get_option_parser())


def parse_timeout(opts: 'Values') -> None:
"""Parse timeout as ISO 8601 duration or number of seconds."""
if opts.remote_timeout:
try:
timeout = int(
DurationParser().parse(opts.remote_timeout).get_seconds()
)
except ISO8601SyntaxError:
try:
timeout = int(opts.remote_timeout)
except ValueError:
raise InputError(

Check warning on line 157 in cylc/flow/scripts/clean.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/clean.py#L153-L157

Added lines #L153 - L157 were not covered by tests
f"Invalid timeout: {opts.remote_timeout}. Must be "
"an ISO 8601 duration or number of seconds."
)
opts.remote_timeout = str(timeout)


def prompt(workflows: Iterable[str]) -> None:
"""Ask user if they want to clean the given set of workflows."""
print("Would clean the following workflows:")
Expand Down Expand Up @@ -218,7 +241,15 @@


@cli_function(get_option_parser)
def main(_, opts: 'Values', *ids: str):
def main(_parser, opts: 'Values', *ids: str):
_main(opts, *ids)


def _main(opts: 'Values', *ids: str):
"""Run the clean command.

This is a separate function for ease of testing.
"""
if cylc.flow.flags.verbosity < 2:
set_timestamps(LOG, False)

Expand All @@ -227,4 +258,6 @@
"--local and --remote options are mutually exclusive"
)

parse_timeout(opts)

asyncio.run(run(*ids, opts=opts))
5 changes: 3 additions & 2 deletions tests/functional/cylc-clean/01-remote.t
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ __TREE__

# -----------------------------------------------------------------------------

TEST_NAME="cylc-clean"
run_ok "$TEST_NAME" cylc clean "$WORKFLOW_NAME"
TEST_NAME="cylc-clean-ok"
run_ok "$TEST_NAME" cylc clean "$WORKFLOW_NAME" --timeout PT2M
# (timeout opt is covered by unit tests but no harm double-checking here)
dump_std "$TEST_NAME"

TEST_NAME="run-dir-not-exist-post-clean.local"
Expand Down
44 changes: 42 additions & 2 deletions tests/unit/scripts/test_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Callable, List
from typing import Callable, List, Type, Union

import pytest

from cylc.flow.scripts.clean import CleanOptions, scan, run
from cylc.flow.exceptions import InputError
from cylc.flow.scripts.clean import (
CleanOptions, _main, parse_timeout, scan, run
)


async def test_scan(tmp_run_dir):
Expand Down Expand Up @@ -88,3 +91,40 @@ async def test_multi(tmp_run_dir: Callable, mute: List[str]):
mute.clear()
await run('*', opts=opts)
assert mute == ['bar/pub/beer', 'baz/run1', 'foo']


@pytest.mark.parametrize(
'timeout, expected',
[('100', '100'),
('PT1M2S', '62'),
('', ''),
('oopsie', InputError),
(' ', InputError)]
)
def test_parse_timeout(
timeout: str,
expected: Union[str, Type[InputError]]
):
"""It should accept ISO 8601 format or number of seconds."""
opts = CleanOptions(remote_timeout=timeout)

if expected is InputError:
with pytest.raises(expected):
parse_timeout(opts)
else:
parse_timeout(opts)
assert opts.remote_timeout == expected


@pytest.mark.parametrize(
'opts, expected_msg',
[
({'local_only': True, 'remote_only': True}, "mutually exclusive"),
({'remote_timeout': 'oops'}, "Invalid timeout"),
]
)
def test_bad_user_input(opts: dict, expected_msg: str, mute):
"""It should raise an InputError for bad user input."""
with pytest.raises(InputError) as exc_info:
_main(CleanOptions(**opts), 'blah')
assert expected_msg in str(exc_info.value)
42 changes: 38 additions & 4 deletions tests/unit/test_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
from glob import iglob
from pathlib import Path
from subprocess import Popen
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -274,7 +275,8 @@ def test_init_clean__rm_dirs(
init_clean(id_, opts=opts)
mock_clean.assert_called_with(id_, run_dir, expected_clean)
mock_remote_clean.assert_called_with(
id_, platforms, expected_remote_clean, opts.remote_timeout)
id_, platforms, opts.remote_timeout, expected_remote_clean
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -920,7 +922,7 @@ def test_remote_clean(
# Remove randomness:
monkeymock('cylc.flow.clean.shuffle')

def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
def mocked_remote_clean_cmd_side_effect(id_, platform, timeout, rm_dirs):
proc_ret_code = 0
if failed_platforms and platform['name'] in failed_platforms:
proc_ret_code = failed_platforms[platform['name']]
Expand All @@ -942,11 +944,13 @@ def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
if exc_expected:
with pytest.raises(CylcError) as exc:
cylc_clean.remote_clean(
id_, platform_names, rm_dirs, timeout='irrelevant')
id_, platform_names, timeout='irrelevant', rm_dirs=rm_dirs
)
assert "Remote clean failed" in str(exc.value)
else:
cylc_clean.remote_clean(
id_, platform_names, rm_dirs, timeout='irrelevant')
id_, platform_names, timeout='irrelevant', rm_dirs=rm_dirs
)
for msg in expected_err_msgs:
assert log_filter(caplog, level=logging.ERROR, contains=msg)
if expected_platforms:
Expand All @@ -960,6 +964,36 @@ def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
assert f"{p_name} - {PlatformError.MSG_TIDY}" in caplog.text


def test_remote_clean__timeout(
monkeymock: MonkeyMock,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
):
"""Test remote_clean() gives a sensible error message for return code 124.
"""
caplog.set_level(logging.ERROR, CYLC_LOG)
monkeymock(
'cylc.flow.clean._remote_clean_cmd',
spec=_remote_clean_cmd,
return_value=mock.Mock(
spec=Popen, poll=lambda: 124, communicate=lambda: ('', '')
)
)
monkeypatch.setattr(
'cylc.flow.clean.get_install_target_to_platforms_map',
lambda *a, **k: {'picard': [PLATFORMS['stargazer']]}
)

with pytest.raises(CylcError):
cylc_clean.remote_clean(
'blah', platform_names=['blah'], timeout='blah'
)
assert "cylc clean timed out" in caplog.text
# No need to log the remote clean cmd etc. for timeout
assert "ssh" not in caplog.text.lower()
assert "stderr" not in caplog.text.lower()


@pytest.mark.parametrize(
'rm_dirs, expected_args',
[
Expand Down
Loading