Skip to content

Commit

Permalink
Merge pull request #5872 from MetRonnie/cylc-clean
Browse files Browse the repository at this point in the history
`cylc clean` remote timeout improvements
  • Loading branch information
wxtim authored Dec 13, 2023
2 parents ec8eec8 + 5518b48 commit fe43c7c
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 19 deletions.
1 change: 1 addition & 0 deletions changes.d/5872.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improvements to `cylc clean` remote timeout handling.
21 changes: 14 additions & 7 deletions cylc/flow/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def init_clean(id_: str, opts: 'Values') -> None:

if platform_names and platform_names != {'localhost'}:
remote_clean(
id_, platform_names, opts.rm_dirs, opts.remote_timeout
id_, platform_names, opts.remote_timeout, opts.rm_dirs
)

if not opts.remote_only:
Expand Down Expand Up @@ -338,8 +338,8 @@ def _clean_using_glob(
def remote_clean(
id_: str,
platform_names: Iterable[str],
timeout: str,
rm_dirs: Optional[List[str]] = None,
timeout: str = '120'
) -> None:
"""Run subprocesses to clean a workflow on its remote install targets
(skip localhost), given a set of platform names to look up.
Expand All @@ -348,8 +348,9 @@ def remote_clean(
id_: Workflow name.
platform_names: List of platform names to look up in the global
config, in order to determine the install targets to clean on.
timeout: ISO 8601 duration or number of seconds to wait before
cancelling.
rm_dirs: Sub dirs to remove instead of the whole run dir.
timeout: Number of seconds to wait before cancelling.
"""
try:
install_targets_map = (
Expand All @@ -358,6 +359,7 @@ def remote_clean(
raise PlatformLookupError(
f"Cannot clean {id_} on remote platforms as the workflow database "
f"is out of date/inconsistent with the global config - {exc}")

queue: Deque[RemoteCleanQueueTuple] = deque()
remote_clean_cmd = partial(
_remote_clean_cmd, id_=id_, rm_dirs=rm_dirs, timeout=timeout
Expand All @@ -376,7 +378,7 @@ def remote_clean(
remote_clean_cmd(platform=platforms[0]), target, platforms
)
)
failed_targets: Dict[str, PlatformError] = {}
failed_targets: Dict[str, Union[PlatformError, str]] = {}
# Handle subproc pool results almost concurrently:
while queue:
item = queue.popleft()
Expand All @@ -387,7 +389,12 @@ def remote_clean(
out, err = item.proc.communicate()
if out:
LOG.info(f"[{item.install_target}]\n{out}")
if ret_code:
if ret_code == 124:
failed_targets[item.install_target] = (
f"cylc clean timed out after {timeout}s. You can increase "
"this timeout using the --timeout option."
)
elif ret_code:
this_platform = item.platforms.pop(0)
excp = PlatformError(
PlatformError.MSG_TIDY,
Expand Down Expand Up @@ -415,9 +422,9 @@ def remote_clean(
LOG.debug(f"[{item.install_target}]\n{err}")
sleep(0.2)
if failed_targets:
for target, excp in failed_targets.items():
for target, info in failed_targets.items():
LOG.error(
f"Could not clean {id_} on install target: {target}\n{excp}"
f"Could not clean {id_} on install target: {target}\n{info}"
)
raise CylcError(f"Remote clean failed for {id_}")

Expand Down
41 changes: 37 additions & 4 deletions cylc/flow/scripts/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import sys
from typing import TYPE_CHECKING, Iterable, List, Tuple

from metomi.isodatetime.exceptions import ISO8601SyntaxError
from metomi.isodatetime.parsers import DurationParser

from cylc.flow import LOG
from cylc.flow.clean import init_clean, get_contained_workflows
from cylc.flow.exceptions import CylcError, InputError
Expand Down Expand Up @@ -120,9 +123,11 @@ def get_option_parser():

parser.add_option(
'--timeout',
help=("The number of seconds to wait for cleaning to take place on "
r"remote hosts before cancelling. Default: %default."),
action='store', default='120', dest='remote_timeout'
help=(
"The length of time to wait for cleaning to take place on "
r"remote hosts before cancelling. Default: %default."
),
action='store', default='PT5M', dest='remote_timeout'
)

parser.add_option(
Expand All @@ -138,6 +143,24 @@ def get_option_parser():
CleanOptions = Options(get_option_parser())


def parse_timeout(opts: 'Values') -> None:
"""Parse timeout as ISO 8601 duration or number of seconds."""
if opts.remote_timeout:
try:
timeout = int(
DurationParser().parse(opts.remote_timeout).get_seconds()
)
except ISO8601SyntaxError:
try:
timeout = int(opts.remote_timeout)
except ValueError:
raise InputError(
f"Invalid timeout: {opts.remote_timeout}. Must be "
"an ISO 8601 duration or number of seconds."
)
opts.remote_timeout = str(timeout)


def prompt(workflows: Iterable[str]) -> None:
"""Ask user if they want to clean the given set of workflows."""
print("Would clean the following workflows:")
Expand Down Expand Up @@ -218,7 +241,15 @@ async def run(*ids: str, opts: 'Values') -> None:


@cli_function(get_option_parser)
def main(_, opts: 'Values', *ids: str):
def main(_parser, opts: 'Values', *ids: str):
_main(opts, *ids)


def _main(opts: 'Values', *ids: str):
"""Run the clean command.
This is a separate function for ease of testing.
"""
if cylc.flow.flags.verbosity < 2:
set_timestamps(LOG, False)

Expand All @@ -227,4 +258,6 @@ def main(_, opts: 'Values', *ids: str):
"--local and --remote options are mutually exclusive"
)

parse_timeout(opts)

asyncio.run(run(*ids, opts=opts))
5 changes: 3 additions & 2 deletions tests/functional/cylc-clean/01-remote.t
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ __TREE__

# -----------------------------------------------------------------------------

TEST_NAME="cylc-clean"
run_ok "$TEST_NAME" cylc clean "$WORKFLOW_NAME"
TEST_NAME="cylc-clean-ok"
run_ok "$TEST_NAME" cylc clean "$WORKFLOW_NAME" --timeout PT2M
# (timeout opt is covered by unit tests but no harm double-checking here)
dump_std "$TEST_NAME"

TEST_NAME="run-dir-not-exist-post-clean.local"
Expand Down
44 changes: 42 additions & 2 deletions tests/unit/scripts/test_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Callable, List
from typing import Callable, List, Type, Union

import pytest

from cylc.flow.scripts.clean import CleanOptions, scan, run
from cylc.flow.exceptions import InputError
from cylc.flow.scripts.clean import (
CleanOptions, _main, parse_timeout, scan, run
)


async def test_scan(tmp_run_dir):
Expand Down Expand Up @@ -88,3 +91,40 @@ async def test_multi(tmp_run_dir: Callable, mute: List[str]):
mute.clear()
await run('*', opts=opts)
assert mute == ['bar/pub/beer', 'baz/run1', 'foo']


@pytest.mark.parametrize(
'timeout, expected',
[('100', '100'),
('PT1M2S', '62'),
('', ''),
('oopsie', InputError),
(' ', InputError)]
)
def test_parse_timeout(
timeout: str,
expected: Union[str, Type[InputError]]
):
"""It should accept ISO 8601 format or number of seconds."""
opts = CleanOptions(remote_timeout=timeout)

if expected is InputError:
with pytest.raises(expected):
parse_timeout(opts)
else:
parse_timeout(opts)
assert opts.remote_timeout == expected


@pytest.mark.parametrize(
'opts, expected_msg',
[
({'local_only': True, 'remote_only': True}, "mutually exclusive"),
({'remote_timeout': 'oops'}, "Invalid timeout"),
]
)
def test_bad_user_input(opts: dict, expected_msg: str, mute):
"""It should raise an InputError for bad user input."""
with pytest.raises(InputError) as exc_info:
_main(CleanOptions(**opts), 'blah')
assert expected_msg in str(exc_info.value)
42 changes: 38 additions & 4 deletions tests/unit/test_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
from glob import iglob
from pathlib import Path
from subprocess import Popen
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -274,7 +275,8 @@ def test_init_clean__rm_dirs(
init_clean(id_, opts=opts)
mock_clean.assert_called_with(id_, run_dir, expected_clean)
mock_remote_clean.assert_called_with(
id_, platforms, expected_remote_clean, opts.remote_timeout)
id_, platforms, opts.remote_timeout, expected_remote_clean
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -920,7 +922,7 @@ def test_remote_clean(
# Remove randomness:
monkeymock('cylc.flow.clean.shuffle')

def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
def mocked_remote_clean_cmd_side_effect(id_, platform, timeout, rm_dirs):
proc_ret_code = 0
if failed_platforms and platform['name'] in failed_platforms:
proc_ret_code = failed_platforms[platform['name']]
Expand All @@ -942,11 +944,13 @@ def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
if exc_expected:
with pytest.raises(CylcError) as exc:
cylc_clean.remote_clean(
id_, platform_names, rm_dirs, timeout='irrelevant')
id_, platform_names, timeout='irrelevant', rm_dirs=rm_dirs
)
assert "Remote clean failed" in str(exc.value)
else:
cylc_clean.remote_clean(
id_, platform_names, rm_dirs, timeout='irrelevant')
id_, platform_names, timeout='irrelevant', rm_dirs=rm_dirs
)
for msg in expected_err_msgs:
assert log_filter(caplog, level=logging.ERROR, contains=msg)
if expected_platforms:
Expand All @@ -960,6 +964,36 @@ def mocked_remote_clean_cmd_side_effect(id_, platform, rm_dirs, timeout):
assert f"{p_name} - {PlatformError.MSG_TIDY}" in caplog.text


def test_remote_clean__timeout(
monkeymock: MonkeyMock,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
):
"""Test remote_clean() gives a sensible error message for return code 124.
"""
caplog.set_level(logging.ERROR, CYLC_LOG)
monkeymock(
'cylc.flow.clean._remote_clean_cmd',
spec=_remote_clean_cmd,
return_value=mock.Mock(
spec=Popen, poll=lambda: 124, communicate=lambda: ('', '')
)
)
monkeypatch.setattr(
'cylc.flow.clean.get_install_target_to_platforms_map',
lambda *a, **k: {'picard': [PLATFORMS['stargazer']]}
)

with pytest.raises(CylcError):
cylc_clean.remote_clean(
'blah', platform_names=['blah'], timeout='blah'
)
assert "cylc clean timed out" in caplog.text
# No need to log the remote clean cmd etc. for timeout
assert "ssh" not in caplog.text.lower()
assert "stderr" not in caplog.text.lower()


@pytest.mark.parametrize(
'rm_dirs, expected_args',
[
Expand Down

0 comments on commit fe43c7c

Please sign in to comment.