From 80572d0e453c1d7be1436042878258a9614e9a05 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 11 Dec 2020 11:19:51 +0000
Subject: [PATCH 01/21] cylc clean: Ensure run dir is removed if broken symlink
---
cylc/flow/suite_files.py | 2 +-
tests/unit/test_suite_files.py | 17 +++++++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index ec469b1e0d5..ee8fcab8b96 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -571,7 +571,7 @@ def clean(reg):
'Workflow name cannot be a path that points to the cylc-run '
'directory or above')
run_dir = Path(get_suite_run_dir(reg))
- if not run_dir.is_dir():
+ if not run_dir.is_dir() and not run_dir.is_symlink():
LOG.info(f'No workflow directory to clean at {run_dir}')
return
try:
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index a33dffbf445..018c82c44fc 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -473,6 +473,23 @@ def mocked_detect_old_contact_file(reg):
assert d.is_symlink() is False
+def test_clean_broken_symlink_run_dir(monkeypatch, tmp_path):
+ """Test removing a run dir that is a broken symlink."""
+ reg = 'foo/bar'
+ run_dir = tmp_path.joinpath('cylc-run', reg)
+ run_dir.parent.mkdir(parents=True)
+ target = tmp_path.joinpath('rabbow/cylc-run', reg)
+ target.mkdir(parents=True)
+ run_dir.symlink_to(target)
+ target.rmdir()
+
+ monkeypatch.setattr('cylc.flow.suite_files.get_suite_run_dir',
+ lambda x: tmp_path.joinpath('cylc-run', x))
+
+ suite_files.clean(reg)
+ assert run_dir.parent.is_dir() is False
+
+
def test_remove_empty_reg_parents(tmp_path):
"""Test that _remove_empty_parents() doesn't remove parents containing a
sibling."""
From 550340e4f704174a5fc567de7cb863b984fc216a Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 11 Dec 2020 12:52:18 +0000
Subject: [PATCH 02/21] Tidy
---
cylc/flow/cfgspec/globalcfg.py | 16 ++++++++--------
cylc/flow/pathutil.py | 6 +++---
cylc/flow/task_remote_mgr.py | 3 +--
3 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py
index dbd029e311a..7a0c8975733 100644
--- a/cylc/flow/cfgspec/globalcfg.py
+++ b/cylc/flow/cfgspec/globalcfg.py
@@ -536,16 +536,16 @@
accepts up to 236 characters.
''')
Conf('install target', VDR.V_STRING, desc='''
- This defaults to the platform name. This will be used as the
- target for remote file installation.
- For example, to indicate to Cylc that Platform_A shares a file
- system with localhost, we would configure as follows:
+ This defaults to the platform name. This will be used as the
+ target for remote file installation.
+ For example, to indicate to Cylc that Platform_A shares a file
+ system with localhost, we would configure as follows:
- .. code-block:: cylc
+ .. code-block:: cylc
- [platforms]
- [[Platform_A]]
- install target = localhost
+ [platforms]
+ [[Platform_A]]
+ install target = localhost
''')
with Conf('localhost', meta=Platform):
diff --git a/cylc/flow/pathutil.py b/cylc/flow/pathutil.py
index 91bba9304f8..0079754a5fc 100644
--- a/cylc/flow/pathutil.py
+++ b/cylc/flow/pathutil.py
@@ -116,7 +116,7 @@ def make_suite_run_tree(suite):
dir_ = os.path.expandvars(get_suite_run_dir(suite))
for i in range(archlen, -1, -1): # archlen...0
if i > 0:
- dpath = dir_ + '.' + str(i)
+ dpath = f'{dir_}.{i}'
else:
dpath = dir_
if os.path.exists(dpath):
@@ -125,7 +125,7 @@ def make_suite_run_tree(suite):
rmtree(dpath)
else:
# roll others over
- os.rename(dpath, dir_ + '.' + str(i + 1))
+ os.rename(dpath, f'{dir_}.{i + 1}')
# Create
for dir_ in (
get_suite_run_dir(suite),
@@ -138,7 +138,7 @@ def make_suite_run_tree(suite):
dir_ = os.path.expandvars(dir_)
if dir_:
os.makedirs(dir_, exist_ok=True)
- LOG.debug('%s: directory created', dir_)
+ LOG.debug(f'{dir_}: directory created')
def make_localhost_symlinks(suite):
diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py
index 61081969f23..7e26ecf2216 100644
--- a/cylc/flow/task_remote_mgr.py
+++ b/cylc/flow/task_remote_mgr.py
@@ -151,8 +151,7 @@ def subshell_eval_reset(self):
if value is not None:
del self.remote_command_map[key]
- def remote_init(self, platform, curve_auth,
- client_pub_key_dir):
+ def remote_init(self, platform, curve_auth, client_pub_key_dir):
"""Initialise a remote [owner@]host if necessary.
Call "cylc remote-init" to install suite items to remote:
From 3972780cc9e81cb39d61ddfd472a28bb388806f0 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Wed, 16 Dec 2020 12:50:56 +0000
Subject: [PATCH 03/21] cylc clean: remote platforms
---
cylc/flow/rundb.py | 5 ++
cylc/flow/scripts/clean.py | 13 ++--
cylc/flow/scripts/remote_clean.py | 53 +++++++++++++
cylc/flow/suite_files.py | 124 ++++++++++++++++++++++++++----
setup.cfg | 1 +
5 files changed, 175 insertions(+), 21 deletions(-)
create mode 100644 cylc/flow/scripts/remote_clean.py
diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py
index dda590936d6..6ef8eb3f107 100644
--- a/cylc/flow/rundb.py
+++ b/cylc/flow/rundb.py
@@ -609,6 +609,11 @@ def select_task_job_run_times(self, callback):
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))
+ def select_task_job_platforms(self):
+ """Return the set of platform names from task_jobs table."""
+ stmt = f"SELECT platform_name FROM {self.TABLE_TASK_JOBS}"
+ return set(i[0] for i in self.connect().execute(stmt))
+
def select_submit_nums(self, name, point):
"""Select submit_num and flow_label from task_states table.
diff --git a/cylc/flow/scripts/clean.py b/cylc/flow/scripts/clean.py
index 6b36cfbb2c3..c916d6f78ae 100644
--- a/cylc/flow/scripts/clean.py
+++ b/cylc/flow/scripts/clean.py
@@ -18,26 +18,25 @@
"""cylc clean [OPTIONS] ARGS
-Remove a stopped workflow from the local scheduler filesystem.
+Remove a stopped workflow from the local scheduler filesystem and remote hosts.
NOTE: this command is intended for workflows installed with `cylc install`. If
this is run for a workflow that was instead written directly in ~/cylc-run and
not backed up elsewhere, it will be lost.
-It will also remove an symlink directory targets. For now, it will fail to
-remove workflow files/directories on a remote host.
+It will also remove an symlink directory targets.
Suite names can be hierarchical, corresponding to the path under ~/cylc-run.
Examples:
- # Remove the workflow at ~/cylc-run/foo
- $ cylc clean foo
+ # Remove the workflow at ~/cylc-run/foo/bar
+ $ cylc clean foo/bar
"""
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.terminal import cli_function
-from cylc.flow.suite_files import clean
+from cylc.flow.suite_files import init_clean
def get_option_parser():
@@ -50,7 +49,7 @@ def get_option_parser():
@cli_function(get_option_parser)
def main(parser, opts, reg):
- clean(reg)
+ init_clean(reg)
if __name__ == "__main__":
diff --git a/cylc/flow/scripts/remote_clean.py b/cylc/flow/scripts/remote_clean.py
new file mode 100644
index 00000000000..2a869c81e60
--- /dev/null
+++ b/cylc/flow/scripts/remote_clean.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python3
+
+# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""cylc remote-clean [OPTIONS] ARGS
+
+(This command is for internal use.)
+
+Remove a stopped workflow from the remote host. This is called on any remote
+hosts when "cylc clean" is called on localhost.
+
+"""
+
+from cylc.flow.option_parsers import CylcOptionParser as COP
+from cylc.flow.terminal import cli_function
+from cylc.flow.suite_files import clean
+
+
+INTERNAL = True
+
+
+def get_option_parser():
+ parser = COP(
+ __doc__,
+ argdoc=[
+ ("REG", "Suite name"),
+ ("[RUND]", "The run directory of the suite")
+ ]
+ )
+ return parser
+
+
+@cli_function(get_option_parser)
+def main(parser, opts, reg, rund=None):
+ clean(reg, rund)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index ee8fcab8b96..564af221bff 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -21,22 +21,29 @@
import os
from pathlib import Path
+from random import shuffle
import re
import shutil
+from subprocess import Popen, PIPE, DEVNULL
import zmq.auth
import aiofiles
from cylc.flow import LOG
-from cylc.flow.exceptions import SuiteServiceFileError, WorkflowFilesError
+from cylc.flow.exceptions import (
+ PlatformLookupError, SuiteServiceFileError, TaskRemoteMgmtError,
+ WorkflowFilesError)
from cylc.flow.pathutil import (
- get_suite_run_dir, make_localhost_symlinks, remove_dir)
-from cylc.flow.platforms import get_platform
+ get_remote_suite_run_dir, get_suite_run_dir, make_localhost_symlinks,
+ remove_dir)
+from cylc.flow.platforms import get_platform, get_install_target_from_platform
from cylc.flow.hostuserutil import (
get_user,
is_remote_host,
is_remote_user
)
+from cylc.flow.remote import construct_ssh_cmd
+from cylc.flow.suite_db_mgr import SuiteDatabaseManager
from cylc.flow.unicode_rules import SuiteNameValidator
from enum import Enum
@@ -284,9 +291,8 @@ def detect_old_contact_file(reg, check_host_port=None):
import shlex
ssh_str = get_platform()["ssh command"]
cmd = shlex.split(ssh_str) + ["-n", old_host] + cmd
- from subprocess import Popen, PIPE, DEVNULL # nosec
from time import sleep, time
- proc = Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE) # nosec
+ proc = Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
# Terminate command after 10 seconds to prevent hanging SSH, etc.
timeout = time() + 10.0
while proc.poll() is None:
@@ -554,12 +560,9 @@ def register(reg=None, source=None, redirect=False):
return reg
-def clean(reg):
- """Remove stopped workflows on the local scheduler filesystem.
-
- Deletes the run dir in ~/cylc-run and any symlink dirs. Note: if the
- run dir has been manually deleted, it will not be possible to clean the
- symlink dirs.
+def init_clean(reg):
+ """Remove a stopped workflow from the local scheduler filesystem and remote
+ hosts.
Args:
reg (str): workflow name.
@@ -570,7 +573,84 @@ def clean(reg):
raise WorkflowFilesError(
'Workflow name cannot be a path that points to the cylc-run '
'directory or above')
- run_dir = Path(get_suite_run_dir(reg))
+ local_run_dir = Path(get_suite_run_dir(reg))
+ if not local_run_dir.is_dir() and not local_run_dir.is_symlink():
+ LOG.info(f'No workflow directory to clean at {local_run_dir}')
+ return
+ # Get platform names from DB:
+ suite_db_mgr = SuiteDatabaseManager(
+ local_run_dir.joinpath(SuiteFiles.Service.DIRNAME))
+ if Path(suite_db_mgr.pri_path).is_file():
+ pri_dao = suite_db_mgr.get_pri_dao()
+ platform_names = pri_dao.select_task_job_platforms()
+ pri_dao.close()
+ else:
+ LOG.warning(
+ 'The workflow database is missing - will not be able to clean on '
+ 'any remote platforms')
+ platform_names = None
+
+ if platform_names and platform_names != {'localhost'}:
+ # Get mapping of platforms to install targets:
+ try:
+ platforms = [get_platform(p_name) for p_name in platform_names]
+ except PlatformLookupError as exc:
+ raise PlatformLookupError(
+ 'Cannot clean on remote platforms as the workflow database is '
+ f'out of date/inconsistent with the global config - {exc}')
+ platform_names_map = {
+ p_name: get_install_target_from_platform(get_platform(p_name))
+ for p_name in platform_names
+ }
+ install_targets = set(get_install_target_from_platform(platform)
+ for platform in platforms)
+ if 'localhost' in install_targets:
+ install_targets.remove('localhost')
+ # Now get the inverse - the mapping of install targets to platforms:
+ install_targets_map = {
+ target: [platform for platform in platforms
+ if platform_names_map[platform['name']] == target]
+ for target in install_targets
+ }
+ # Clean on remote platforms:
+ procs = []
+ for target, platforms in install_targets_map.items():
+ shuffle(platforms)
+ # Issue ssh command:
+ procs.append(
+ (remote_clean(reg, platforms[0]), platforms[0])
+ )
+ for proc, platform in procs:
+ ret_code = proc.wait()
+ out, err = (f.decode() for f in proc.communicate())
+ if out:
+ LOG.info(out)
+ if err:
+ LOG.warning(err)
+ if ret_code:
+ raise TaskRemoteMgmtError(
+ TaskRemoteMgmtError.MSG_TIDY, platform['name'],
+ " ".join(proc.args), ret_code, out, err)
+
+ # Finally clean on local filesystem:
+ clean(reg)
+
+
+def clean(reg, run_dir=None):
+ """Remove a stopped workflow from the local filesystem.
+
+ Deletes the workflow run directory and any symlink dirs. Note: if the
+ run dir has already been manually deleted, it will not be possible to
+ clean the symlink dirs.
+
+ Args:
+ reg (str): Workflow name.
+ run_dir (str): Path to the run dir on the filesystem.
+ """
+ if run_dir:
+ run_dir = Path(run_dir)
+ else:
+ run_dir = Path(get_suite_run_dir(reg))
if not run_dir.is_dir() and not run_dir.is_symlink():
LOG.info(f'No workflow directory to clean at {run_dir}')
return
@@ -580,8 +660,6 @@ def clean(reg):
raise SuiteServiceFileError(
f'Cannot remove running workflow.\n\n{exc}')
- # TODO: check task_jobs table in database to see what platforms are used
-
possible_symlinks = [(Path(name), Path(run_dir, name)) for name in [
'log', 'share/cycle', 'share', 'work', '']]
# Note: 'share/cycle' must come before 'share', and '' must come last
@@ -611,6 +689,24 @@ def clean(reg):
_remove_empty_reg_parents(reg, run_dir)
+def remote_clean(reg, platform):
+ """Remove a stopped workflow on a remote host.
+
+ Call "cylc remote-clean" over ssh and return the subprocess.
+
+ Args:
+ reg (str): Workflow name.
+ platform (dict): Config for the platform on which to remove the
+ workflow.
+ """
+ LOG.info(
+ f'Cleaning on install target: {platform["install target"]}, '
+ f'platform: {platform["name"]}')
+ cmd = ['remote-clean', reg, get_remote_suite_run_dir(platform, reg)]
+ cmd = construct_ssh_cmd(cmd, platform, timeout='10s')
+ return Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
+
+
def _remove_empty_reg_parents(reg, path):
"""If reg is nested e.g. a/b/c, work our way up the tree, removing empty
parents only.
diff --git a/setup.cfg b/setup.cfg
index ed594936aea..f7a775ab20b 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -98,6 +98,7 @@ cylc.command =
register = cylc.flow.scripts.register:main
release = cylc.flow.scripts.release:main
reload = cylc.flow.scripts.reload:main
+ remote-clean = cylc.flow.scripts.remote_clean:main
remote-init = cylc.flow.scripts.remote_init:main
remote-tidy = cylc.flow.scripts.remote_tidy:main
remove = cylc.flow.scripts.remove:main
From 41d1920d9f694f6d6f9b6dfd395369d33b293e20 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Wed, 16 Dec 2020 16:39:31 +0000
Subject: [PATCH 04/21] cylc clean: keep retrying with next platform in event
of errors
until all platforms for that install target have been exhausted
---
cylc/flow/suite_files.py | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 564af221bff..b79562cacd1 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -31,7 +31,7 @@
from cylc.flow import LOG
from cylc.flow.exceptions import (
- PlatformLookupError, SuiteServiceFileError, TaskRemoteMgmtError,
+ CylcError, PlatformLookupError, SuiteServiceFileError, TaskRemoteMgmtError,
WorkflowFilesError)
from cylc.flow.pathutil import (
get_remote_suite_run_dir, get_suite_run_dir, make_localhost_symlinks,
@@ -618,9 +618,9 @@ def init_clean(reg):
shuffle(platforms)
# Issue ssh command:
procs.append(
- (remote_clean(reg, platforms[0]), platforms[0])
+ (remote_clean(reg, platforms[0]), target, platforms)
)
- for proc, platform in procs:
+ for proc, target, platforms in procs:
ret_code = proc.wait()
out, err = (f.decode() for f in proc.communicate())
if out:
@@ -628,9 +628,19 @@ def init_clean(reg):
if err:
LOG.warning(err)
if ret_code:
- raise TaskRemoteMgmtError(
- TaskRemoteMgmtError.MSG_TIDY, platform['name'],
+ # Try again on the next platform for this install target:
+ this_platform = platforms.pop(0)
+ exc = TaskRemoteMgmtError(
+ TaskRemoteMgmtError.MSG_TIDY, this_platform['name'],
" ".join(proc.args), ret_code, out, err)
+ LOG.error(exc)
+ if platforms:
+ procs.append(
+ (remote_clean(reg, platforms[0]), target, platforms)
+ )
+ else: # Exhausted list of platforms
+ raise CylcError(
+ f'Could not clean on install target: {target}')
# Finally clean on local filesystem:
clean(reg)
@@ -704,6 +714,7 @@ def remote_clean(reg, platform):
f'platform: {platform["name"]}')
cmd = ['remote-clean', reg, get_remote_suite_run_dir(platform, reg)]
cmd = construct_ssh_cmd(cmd, platform, timeout='10s')
+ LOG.debug(" ".join(cmd))
return Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
From 4cd1e540a589a9190cde89cdceeccbfac88c3488 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 17 Dec 2020 10:59:41 +0000
Subject: [PATCH 05/21] cylc clean: slight refactor & unit test for getting
install targets map
---
cylc/flow/platforms.py | 27 +++++++++++++
cylc/flow/suite_files.py | 33 ++++++----------
tests/unit/test_platforms.py | 77 +++++++++++++++++++++++++++++++++++-
3 files changed, 114 insertions(+), 23 deletions(-)
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index dcc87686ef1..4f13b40673d 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -421,6 +421,33 @@ def get_install_target_from_platform(platform):
return platform.get('install target')
+def get_install_target_to_platforms_map(platform_names):
+ """Get a dictionary of unique install targets and the platforms which use
+ them.
+
+ Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...}
+
+ Args:
+ platform_names (list): List of platform names to look up in the
+ global config.
+ """
+ platform_names = set(platform_names)
+ platforms = [get_platform(p_name) for p_name in platform_names]
+ # First get mapping of platforms to install targets:
+ platform_names_map = {
+ p_name: get_install_target_from_platform(get_platform(p_name))
+ for p_name in platform_names
+ }
+ install_targets = set(get_install_target_from_platform(platform)
+ for platform in platforms)
+ # Now get the inverse - the mapping of install targets to platforms:
+ return {
+ target: [platform for platform in platforms
+ if platform_names_map[platform['name']] == target]
+ for target in install_targets
+ }
+
+
def is_platform_with_target_in_list(
install_target, distinct_platforms_list):
"""Determines whether install target is in the list of platforms"""
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index b79562cacd1..58d35be28e2 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -36,7 +36,8 @@
from cylc.flow.pathutil import (
get_remote_suite_run_dir, get_suite_run_dir, make_localhost_symlinks,
remove_dir)
-from cylc.flow.platforms import get_platform, get_install_target_from_platform
+from cylc.flow.platforms import (
+ get_platform, get_install_target_to_platforms_map)
from cylc.flow.hostuserutil import (
get_user,
is_remote_host,
@@ -591,36 +592,24 @@ def init_clean(reg):
platform_names = None
if platform_names and platform_names != {'localhost'}:
- # Get mapping of platforms to install targets:
+ # Clean on remote platforms
try:
- platforms = [get_platform(p_name) for p_name in platform_names]
+ install_targets_map = (
+ get_install_target_to_platforms_map(platform_names))
except PlatformLookupError as exc:
raise PlatformLookupError(
'Cannot clean on remote platforms as the workflow database is '
f'out of date/inconsistent with the global config - {exc}')
- platform_names_map = {
- p_name: get_install_target_from_platform(get_platform(p_name))
- for p_name in platform_names
- }
- install_targets = set(get_install_target_from_platform(platform)
- for platform in platforms)
- if 'localhost' in install_targets:
- install_targets.remove('localhost')
- # Now get the inverse - the mapping of install targets to platforms:
- install_targets_map = {
- target: [platform for platform in platforms
- if platform_names_map[platform['name']] == target]
- for target in install_targets
- }
- # Clean on remote platforms:
- procs = []
+ queue = []
for target, platforms in install_targets_map.items():
+ if target == 'localhost':
+ continue
shuffle(platforms)
# Issue ssh command:
- procs.append(
+ queue.append(
(remote_clean(reg, platforms[0]), target, platforms)
)
- for proc, target, platforms in procs:
+ for proc, target, platforms in queue:
ret_code = proc.wait()
out, err = (f.decode() for f in proc.communicate())
if out:
@@ -635,7 +624,7 @@ def init_clean(reg):
" ".join(proc.args), ret_code, out, err)
LOG.error(exc)
if platforms:
- procs.append(
+ queue.append(
(remote_clean(reg, platforms[0]), target, platforms)
)
else: # Exhausted list of platforms
diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py
index 56da3a7e50b..7f2cc1efd67 100644
--- a/tests/unit/test_platforms.py
+++ b/tests/unit/test_platforms.py
@@ -17,7 +17,9 @@
# Tests for the platform lookup.
import pytest
-from cylc.flow.platforms import platform_from_name, platform_from_job_info
+from cylc.flow.platforms import (
+ platform_from_name, platform_from_job_info,
+ get_install_target_to_platforms_map)
from cylc.flow.exceptions import PlatformLookupError
PLATFORMS = {
@@ -72,6 +74,24 @@
}
}
+PLATFORMS_TREK = {
+ 'enterprise': {
+ 'hosts': ['kirk', 'picard'],
+ 'install target': 'picard',
+ 'name': 'enterprise'
+ },
+ 'voyager': {
+ 'hosts': ['janeway'],
+ 'install target': 'janeway',
+ 'name': 'voyager'
+ },
+ 'stargazer': {
+ 'hosts': ['picard'],
+ 'install target': 'picard',
+ 'name': 'stargazer'
+ }
+}
+
# ----------------------------------------------------------------------------
# Tests of platform_from_name
@@ -311,3 +331,58 @@ def test_platform_from_job_info_similar_platforms(
},
}
assert platform_from_job_info(platforms, job, remote) == returns
+
+
+# -----------------------------------------------------------------------------
+# Tests for getting install target info
+
+@pytest.mark.parametrize(
+ 'platform_names, expected_map, expected_err',
+ [
+ (
+ ['enterprise', 'stargazer'],
+ {
+ 'picard': [
+ PLATFORMS_TREK['enterprise'],
+ PLATFORMS_TREK['stargazer']
+ ]
+ },
+ None
+ ),
+ (
+ ['enterprise', 'voyager', 'enterprise'],
+ {
+ 'picard': [
+ PLATFORMS_TREK['enterprise']
+ ],
+ 'janeway': [
+ PLATFORMS_TREK['voyager']
+ ]
+ },
+ None
+ ),
+ (
+ ['enterprise', 'discovery'],
+ None,
+ PlatformLookupError
+ )
+ ]
+)
+def test_get_install_target_to_platforms_map(
+ platform_names, expected_map, expected_err, monkeypatch):
+ """Test that get_install_target_to_platforms_map works as expected."""
+
+ monkeypatch.setattr('cylc.flow.platforms.get_platform',
+ lambda x: platform_from_name(x, PLATFORMS_TREK))
+
+ if expected_err:
+ with pytest.raises(expected_err):
+ get_install_target_to_platforms_map(platform_names)
+ else:
+ result = get_install_target_to_platforms_map(platform_names)
+ # Sort the maps:
+ for _map in (result, expected_map):
+ for install_target in _map:
+ _map[install_target] = sorted(_map[install_target],
+ key=lambda k: k['name'])
+ assert result == expected_map
From 4e679e71e54ab5b9b51b412b5d178e44248cbf3b Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 24 Dec 2020 10:39:18 +0000
Subject: [PATCH 06/21] Tidy
---
cylc/flow/platforms.py | 8 +-------
1 file changed, 1 insertion(+), 7 deletions(-)
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index 4f13b40673d..3d7122cd538 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -433,17 +433,11 @@ def get_install_target_to_platforms_map(platform_names):
"""
platform_names = set(platform_names)
platforms = [get_platform(p_name) for p_name in platform_names]
- # First get mapping of platforms to install targets:
- platform_names_map = {
- p_name: get_install_target_from_platform(get_platform(p_name))
- for p_name in platform_names
- }
install_targets = set(get_install_target_from_platform(platform)
for platform in platforms)
- # Now get the inverse - the mapping of install targets to platforms:
return {
target: [platform for platform in platforms
- if platform_names_map[platform['name']] == target]
+ if get_install_target_from_platform(platform) == target]
for target in install_targets
}
From 6b3c72b33e8d7d7d9302304125570f321dea3f46 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 17 Dec 2020 11:50:52 +0000
Subject: [PATCH 07/21] Platforms: add unit test for
get_install_target_from_platform()
---
tests/unit/test_platforms.py | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py
index 7f2cc1efd67..aa34c94c76b 100644
--- a/tests/unit/test_platforms.py
+++ b/tests/unit/test_platforms.py
@@ -19,7 +19,7 @@
import pytest
from cylc.flow.platforms import (
platform_from_name, platform_from_job_info,
- get_install_target_to_platforms_map)
+ get_install_target_from_platform, get_install_target_to_platforms_map)
from cylc.flow.exceptions import PlatformLookupError
PLATFORMS = {
@@ -336,6 +336,18 @@ def test_platform_from_job_info_similar_platforms(
# -----------------------------------------------------------------------------
# Tests for getting install target info
+@pytest.mark.parametrize(
+ 'platform, expected',
+ [
+ ({'name': 'rick', 'install target': 'desktop'}, 'desktop'),
+ ({'name': 'morty', 'install target': ''}, 'morty')
+ ]
+)
+def test_get_install_target_from_platform(platform, expected):
+ """Test that get_install_target_from_platform works as expected."""
+ assert get_install_target_from_platform(platform) == expected
+
+
@pytest.mark.parametrize(
'platform_names, expected_map, expected_err',
[
From 0751972a4ddbd3a907002886da2dc60690f37170 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 21 Dec 2020 17:46:43 +0000
Subject: [PATCH 08/21] Suite files: Add test case to test_validate_reg()
---
cylc/flow/platforms.py | 3 +--
tests/unit/test_suite_files.py | 12 +++++++-----
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index 3d7122cd538..894ae139f72 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -442,8 +442,7 @@ def get_install_target_to_platforms_map(platform_names):
}
-def is_platform_with_target_in_list(
- install_target, distinct_platforms_list):
+def is_platform_with_target_in_list(install_target, distinct_platforms_list):
"""Determines whether install target is in the list of platforms"""
for distinct_platform in distinct_platforms_list:
return install_target == distinct_platform['install target']
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index 018c82c44fc..8bad9a09257 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -328,15 +328,17 @@ def mock_scandir(path):
@pytest.mark.parametrize(
- 'reg, expected_err',
- [('foo/bar/', None),
- ('/foo/bar', SuiteServiceFileError)]
+ 'reg, expected_err, expected_msg',
+ [('foo/bar/', None, None),
+ ('/foo/bar', SuiteServiceFileError, "cannot be an absolute path"),
+ ('$HOME/alone', SuiteServiceFileError, "invalid suite name")]
)
-def test_validate_reg(reg, expected_err):
+def test_validate_reg(reg, expected_err, expected_msg):
if expected_err:
with pytest.raises(expected_err) as exc:
suite_files._validate_reg(reg)
- assert 'cannot be an absolute path' in str(exc.value)
+ if expected_msg:
+ assert expected_msg in str(exc.value)
else:
suite_files._validate_reg(reg)
From 2763e648f65de51bd4c675740bd1049fd706a9b7 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 21 Dec 2020 18:25:23 +0000
Subject: [PATCH 09/21] SuiteDatabaseManager improvements
- Ensure DB is closed using a try...finally statement in a couple of
places where it was lacking
- Refactor checking of the DB compatibility slightly
---
cylc/flow/suite_db_mgr.py | 34 ++++++++++++-------
tests/functional/restart/54-incompatible-db.t | 4 +--
2 files changed, 23 insertions(+), 15 deletions(-)
diff --git a/cylc/flow/suite_db_mgr.py b/cylc/flow/suite_db_mgr.py
index 58583af1111..8b697392773 100644
--- a/cylc/flow/suite_db_mgr.py
+++ b/cylc/flow/suite_db_mgr.py
@@ -557,29 +557,38 @@ def recover_pub_from_pri(self):
def on_restart(self):
"""Check & vacuum the runtime DB on restart."""
- if not os.path.isfile(self.pri_path):
+ try:
+ self.check_suite_db_compatibility()
+ except FileNotFoundError:
+ raise SuiteServiceFileError(
+ "Cannot restart as the workflow database was not found")
+ except SuiteServiceFileError as exc:
raise SuiteServiceFileError(
- 'Cannot restart as suite database not found')
- self.check_suite_db_compatibility()
+ f"Cannot restart - {exc}")
pri_dao = self.get_pri_dao()
- pri_dao.vacuum()
- self.n_restart = pri_dao.select_suite_params_restart_count() + 1
- self.put_suite_params_1(self.KEY_RESTART_COUNT, self.n_restart)
- pri_dao.close()
+ try:
+ pri_dao.vacuum()
+ self.n_restart = pri_dao.select_suite_params_restart_count() + 1
+ self.put_suite_params_1(self.KEY_RESTART_COUNT, self.n_restart)
+ finally:
+ pri_dao.close()
def check_suite_db_compatibility(self):
"""Raises SuiteServiceFileError if the existing suite database is
incompatible with the current version of Cylc."""
+ if not os.path.isfile(self.pri_path):
+ raise FileNotFoundError(self.pri_path)
+ incompat_msg = (
+ f"Workflow database is incompatible with Cylc {CYLC_VERSION}")
pri_dao = self.get_pri_dao()
try:
last_run_ver = pri_dao.connect().execute(
f'SELECT value FROM {self.TABLE_SUITE_PARAMS} '
f'WHERE key == "{self.KEY_CYLC_VERSION}"').fetchone()[0]
except TypeError:
- raise SuiteServiceFileError(
- 'Cannot restart suite as the suite database is incompatible '
- f'with Cylc {CYLC_VERSION}')
- pri_dao.close()
+ raise SuiteServiceFileError(incompat_msg)
+ finally:
+ pri_dao.close()
try:
last_run_ver = packaging.version.Version(last_run_ver)
except packaging.version.InvalidVersion:
@@ -588,5 +597,4 @@ def check_suite_db_compatibility(self):
CylcSuiteDAO.RESTART_INCOMPAT_VERSION)
if last_run_ver <= restart_incompat_ver:
raise SuiteServiceFileError(
- f'Cannot restart suite last run with Cylc {last_run_ver} as '
- f'the suite database is incompatible with Cylc {CYLC_VERSION}')
+ f"{incompat_msg} (workflow last run with Cylc {last_run_ver})")
diff --git a/tests/functional/restart/54-incompatible-db.t b/tests/functional/restart/54-incompatible-db.t
index c565aa8236a..57ec44b47b2 100644
--- a/tests/functional/restart/54-incompatible-db.t
+++ b/tests/functional/restart/54-incompatible-db.t
@@ -30,7 +30,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"
TEST_NAME="${TEST_NAME_BASE}-restart-fail"
suite_run_fail "$TEST_NAME" cylc restart "${SUITE_NAME}"
-grep_ok 'suite database is incompatible' "${TEST_NAME}.stderr"
+grep_ok 'Workflow database is incompatible' "${TEST_NAME}.stderr"
purge
@@ -42,7 +42,7 @@ install_suite
TEST_NAME="54-no-db-restart-fail"
suite_run_fail "$TEST_NAME" cylc restart "${SUITE_NAME}"
-grep_ok 'Cannot restart as suite database not found' "${TEST_NAME}.stderr"
+grep_ok 'the workflow database was not found' "${TEST_NAME}.stderr"
purge
exit
From 6a7aab74a8fd5a83f2fd4ce496ef1833853d9ef3 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 21 Dec 2020 18:25:29 +0000
Subject: [PATCH 10/21] cylc clean: refactor and add/update unit tests
---
cylc/flow/suite_files.py | 126 +++++++++++++++++++------------
tests/unit/test_suite_files.py | 134 ++++++++++++++++++++++++++++-----
2 files changed, 192 insertions(+), 68 deletions(-)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 58d35be28e2..02bdbd3605f 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -578,60 +578,19 @@ def init_clean(reg):
if not local_run_dir.is_dir() and not local_run_dir.is_symlink():
LOG.info(f'No workflow directory to clean at {local_run_dir}')
return
- # Get platform names from DB:
- suite_db_mgr = SuiteDatabaseManager(
- local_run_dir.joinpath(SuiteFiles.Service.DIRNAME))
- if Path(suite_db_mgr.pri_path).is_file():
- pri_dao = suite_db_mgr.get_pri_dao()
- platform_names = pri_dao.select_task_job_platforms()
- pri_dao.close()
- else:
+ platform_names = None
+ try:
+ platform_names = get_platforms_from_db(local_run_dir)
+ except FileNotFoundError:
LOG.warning(
'The workflow database is missing - will not be able to clean on '
'any remote platforms')
- platform_names = None
+ except SuiteServiceFileError as exc:
+ raise SuiteServiceFileError(f"Cannot clean - {exc}")
if platform_names and platform_names != {'localhost'}:
- # Clean on remote platforms
- try:
- install_targets_map = (
- get_install_target_to_platforms_map(platform_names))
- except PlatformLookupError as exc:
- raise PlatformLookupError(
- 'Cannot clean on remote platforms as the workflow database is '
- f'out of date/inconsistent with the global config - {exc}')
- queue = []
- for target, platforms in install_targets_map.items():
- if target == 'localhost':
- continue
- shuffle(platforms)
- # Issue ssh command:
- queue.append(
- (remote_clean(reg, platforms[0]), target, platforms)
- )
- for proc, target, platforms in queue:
- ret_code = proc.wait()
- out, err = (f.decode() for f in proc.communicate())
- if out:
- LOG.info(out)
- if err:
- LOG.warning(err)
- if ret_code:
- # Try again on the next platform for this install target:
- this_platform = platforms.pop(0)
- exc = TaskRemoteMgmtError(
- TaskRemoteMgmtError.MSG_TIDY, this_platform['name'],
- " ".join(proc.args), ret_code, out, err)
- LOG.error(exc)
- if platforms:
- queue.append(
- (remote_clean(reg, platforms[0]), target, platforms)
- )
- else: # Exhausted list of platforms
- raise CylcError(
- f'Could not clean on install target: {target}')
-
- # Finally clean on local filesystem:
+ remote_clean(reg, platform_names)
+ # Lastly, clean on local filesystem:
clean(reg)
@@ -688,7 +647,56 @@ def clean(reg, run_dir=None):
_remove_empty_reg_parents(reg, run_dir)
-def remote_clean(reg, platform):
+def remote_clean(reg, platform_names):
+ """Clean on remote install targets (not localhost), given a set of
+ platform names to look up.
+
+ Args:
+ reg (str): Workflow name.
+ platform_names (list): List of platform names to look up in the global
+ config, in order to determine the install targets to clean on.
+ """
+ try:
+ install_targets_map = (
+ get_install_target_to_platforms_map(platform_names))
+ except PlatformLookupError as exc:
+ raise PlatformLookupError(
+ "Cannot clean on remote platforms as the workflow database is "
+ f"out of date/inconsistent with the global config - {exc}")
+
+ queue = []
+ for target, platforms in install_targets_map.items():
+ if target == 'localhost':
+ continue
+ shuffle(platforms)
+ # Issue ssh command:
+ queue.append(
+ (_remote_clean_cmd(reg, platforms[0]), target, platforms)
+ )
+ for proc, target, platforms in queue:
+ ret_code = proc.wait()
+ out, err = (f.decode() for f in proc.communicate())
+ if out:
+ LOG.info(out)
+ if err:
+ LOG.warning(err)
+ if ret_code:
+ # Try again on the next platform for this install target:
+ this_platform = platforms.pop(0)
+ exc = TaskRemoteMgmtError(
+ TaskRemoteMgmtError.MSG_TIDY, this_platform['name'],
+ " ".join(proc.args), ret_code, out, err)
+ LOG.error(exc)
+ if platforms:
+ queue.append(
+ (_remote_clean_cmd(reg, platforms[0]), target, platforms)
+ )
+ else: # Exhausted list of platforms
+ raise CylcError(
+ f"Could not clean on install target: {target}")
+
+
+def _remote_clean_cmd(reg, platform):
"""Remove a stopped workflow on a remote host.
Call "cylc remote-clean" over ssh and return the subprocess.
@@ -805,6 +813,24 @@ def _load_local_item(item, path):
return None
+def get_platforms_from_db(run_dir):
+ """Load the set of names of platforms (that jobs ran on) from the
+ workflow database.
+
+ Args:
+ run_dir (str): The workflow run directory.
+ """
+ suite_db_mgr = SuiteDatabaseManager(
+ os.path.join(run_dir, SuiteFiles.Service.DIRNAME))
+ suite_db_mgr.check_suite_db_compatibility()
+ try:
+ pri_dao = suite_db_mgr.get_pri_dao()
+ platform_names = pri_dao.select_task_job_platforms()
+ return platform_names
+ finally:
+ pri_dao.close()
+
+
def _validate_reg(reg):
"""Check suite name is valid.
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index 8bad9a09257..e2f022ca1c9 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -14,14 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from cylc.flow.suite_files import check_nested_run_dirs
+import logging
+import os.path
+from pathlib import Path
import pytest
from unittest import mock
-import os.path
-from pathlib import Path
+from cylc.flow import CYLC_LOG
from cylc.flow import suite_files
from cylc.flow.exceptions import SuiteServiceFileError, WorkflowFilesError
+from cylc.flow.suite_files import check_nested_run_dirs
def get_register_test_cases():
@@ -344,22 +346,118 @@ def test_validate_reg(reg, expected_err, expected_msg):
@pytest.mark.parametrize(
- 'reg, props',
+ 'reg, props, clean_called, remote_clean_called',
[
- ('foo/bar/', {}),
- ('foo', {'no dir': True}),
- ('foo/..', {
- 'no dir': True,
- 'err': WorkflowFilesError,
- 'err msg': ('cannot be a path that points to the cylc-run '
- 'directory or above')
- }),
- ('foo/../..', {
+ ('foo/bar', {
'no dir': True,
- 'err': WorkflowFilesError,
- 'err msg': ('cannot be a path that points to the cylc-run '
- 'directory or above')
- }),
+ 'log': (logging.INFO, "No workflow directory to clean")
+ }, False, False),
+ ('foo/bar', {
+ 'no db': True,
+ 'log': (logging.WARNING, "The workflow database is missing")
+ }, True, False),
+ ('foo/bar', {
+ 'db platforms': ['localhost', 'localhost']
+ }, True, False),
+ ('foo/bar', {
+ 'db platforms': ['horse']
+ }, True, True)
+ ]
+)
+def test_init_clean_ok(
+ reg, props, clean_called, remote_clean_called,
+ monkeypatch, tmp_path, caplog):
+ """Test the init_clean() function logic.
+
+ Params:
+ reg (str): Workflow name.
+ props (dict): Possible values are (all optional):
+ 'no dir' (bool): If True, do not create run dir for this test case.
+ 'log' (tuple): Of form (severity, msg):
+ severity (logging level): Expected level e.g. logging.INFO.
+ msg (str): Message that is expected to be logged.
+ 'db platforms' (list): Platform names that would be loaded from
+ the database.
+ 'no db' (bool): If True, workflow database doesn't exist.
+ clean_called (bool): If a local clean is expected to go ahead.
+ remote_clean_called (bool): If a remote clean is expected to go ahead.
+ """
+ # --- Setup ---
+ expected_log = props.get('log')
+ if expected_log:
+ level, msg = expected_log
+ caplog.set_level(level, CYLC_LOG)
+
+ tmp_path.joinpath('cylc-run').mkdir()
+ run_dir = tmp_path.joinpath('cylc-run', reg)
+ if not props.get('no dir'):
+ run_dir.mkdir(parents=True)
+
+ mocked_clean = mock.Mock()
+ monkeypatch.setattr('cylc.flow.suite_files.clean', mocked_clean)
+ mocked_remote_clean = mock.Mock()
+ monkeypatch.setattr('cylc.flow.suite_files.remote_clean',
+ mocked_remote_clean)
+ monkeypatch.setattr('cylc.flow.suite_files.get_suite_run_dir',
+ lambda x: tmp_path.joinpath('cylc-run', x))
+
+ _get_platforms_from_db = suite_files.get_platforms_from_db
+
+ def mocked_get_platforms_from_db(run_dir):
+ if props.get('no dir') or props.get('no db'):
+ return _get_platforms_from_db(run_dir) # Handle as normal
+ return set(props.get('db platforms'))
+
+ monkeypatch.setattr('cylc.flow.suite_files.get_platforms_from_db',
+ mocked_get_platforms_from_db)
+
+ # --- The actual test ---
+ suite_files.init_clean(reg)
+ if expected_log:
+ assert msg in caplog.text
+ if clean_called:
+ assert mocked_clean.called is True
+ else:
+ assert mocked_clean.called is False
+ if remote_clean_called:
+ assert mocked_remote_clean.called is True
+ else:
+ assert mocked_remote_clean.called is False
+
+
+@pytest.mark.parametrize(
+ 'reg, err, err_msg',
+ [('foo/..', WorkflowFilesError,
+ "cannot be a path that points to the cylc-run directory or above"),
+ ('foo/../..', WorkflowFilesError,
+ "cannot be a path that points to the cylc-run directory or above")]
+)
+def test_init_clean_bad(reg, err, err_msg, monkeypatch):
+ """Test the init_clean() function fails appropriately.
+
+ Params:
+ reg (str): Workflow name.
+ err (Exception): Expected error.
+ err_msg (str): Message that is expected to be in the exception.
+ """
+ # We don't want to accidentally delete any files during this test
+ nerfed_os_rmdir = mock.Mock()
+ monkeypatch.setattr('os.rmdir', nerfed_os_rmdir)
+ nerfed_os_remove = mock.Mock()
+ monkeypatch.setattr('os.remove', nerfed_os_remove)
+
+ with pytest.raises(err) as exc:
+ suite_files.init_clean(reg)
+ assert err_msg in str(exc.value)
+ assert nerfed_os_rmdir.called is False
+ assert nerfed_os_remove.called is False
+
+
+@pytest.mark.parametrize(
+ 'reg, props',
+ [
+ ('foo/bar/', {}), # Works ok
+ ('foo', {'no dir': True}), # Nothing to clean
('foo', {
'not stopped': True,
'err': SuiteServiceFileError,
@@ -476,7 +574,7 @@ def mocked_detect_old_contact_file(reg):
def test_clean_broken_symlink_run_dir(monkeypatch, tmp_path):
- """Test removing a run dir that is a broken symlink."""
+ """Test clean() for removing a run dir that is a broken symlink."""
reg = 'foo/bar'
run_dir = tmp_path.joinpath('cylc-run', reg)
run_dir.parent.mkdir(parents=True)
From 6d38cbc297df5bc3dc388ef5a56c9b1667a1ea51 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Wed, 23 Dec 2020 18:36:56 +0000
Subject: [PATCH 11/21] cylc clean: fix order of removing symlink dirs
Fixes a bug where if log and share symlink dirs were the same, but
share/cycle was different, then share/cycle's target wouldn't get
removed
---
cylc/flow/suite_files.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 02bdbd3605f..7cee2efd3da 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -619,8 +619,8 @@ def clean(reg, run_dir=None):
f'Cannot remove running workflow.\n\n{exc}')
possible_symlinks = [(Path(name), Path(run_dir, name)) for name in [
- 'log', 'share/cycle', 'share', 'work', '']]
- # Note: 'share/cycle' must come before 'share', and '' must come last
+ 'share/cycle', 'share', 'log', 'work', '']]
+ # Note: 'share/cycle' must come first, and '' must come last
for name, path in possible_symlinks:
if path.is_symlink():
# Ensure symlink is pointing to expected directory. If not,
From a41c66c9a39bc9bc4991ce82600a1f67b417cdc3 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 24 Dec 2020 10:04:11 +0000
Subject: [PATCH 12/21] Tidy
Plus: in CylcSuiteDAO, make db_file_name arg non-optional, as it will
raise an exception anyway if not given
---
cylc/flow/rundb.py | 9 +++++----
cylc/flow/suite_files.py | 18 +++++++++---------
tests/functional/cylc-clean/00-basic.t | 26 ++++++--------------------
3 files changed, 20 insertions(+), 33 deletions(-)
diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py
index 6ef8eb3f107..e671d86366b 100644
--- a/cylc/flow/rundb.py
+++ b/cylc/flow/rundb.py
@@ -297,11 +297,12 @@ class CylcSuiteDAO:
],
}
- def __init__(self, db_file_name=None, is_public=False):
- """Initialise object.
+ def __init__(self, db_file_name, is_public=False):
+ """Initialise database access object.
- db_file_name - Path to the database file
- is_public - If True, allow retries, etc
+ Args:
+ db_file_name (str): Path to the database file.
+ is_public (bool): If True, allow retries, etc.
"""
self.db_file_name = expandvars(db_file_name)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 7cee2efd3da..922e3ed4459 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -595,7 +595,7 @@ def init_clean(reg):
def clean(reg, run_dir=None):
- """Remove a stopped workflow from the local filesystem.
+ """Remove a stopped workflow from the local filesystem only.
Deletes the workflow run directory and any symlink dirs. Note: if the
run dir has already been manually deleted, it will not be possible to
@@ -603,7 +603,7 @@ def clean(reg, run_dir=None):
Args:
reg (str): Workflow name.
- run_dir (str): Path to the run dir on the filesystem.
+ run_dir (str): Path to the workflow run dir on the filesystem.
"""
if run_dir:
run_dir = Path(run_dir)
@@ -618,10 +618,10 @@ def clean(reg, run_dir=None):
raise SuiteServiceFileError(
f'Cannot remove running workflow.\n\n{exc}')
- possible_symlinks = [(Path(name), Path(run_dir, name)) for name in [
- 'share/cycle', 'share', 'log', 'work', '']]
# Note: 'share/cycle' must come first, and '' must come last
- for name, path in possible_symlinks:
+ for possible_symlink in ('share/cycle', 'share', 'log', 'work', ''):
+ name = Path(possible_symlink)
+ path = Path(run_dir, possible_symlink)
if path.is_symlink():
# Ensure symlink is pointing to expected directory. If not,
# something is wrong and we should abort
@@ -648,8 +648,8 @@ def clean(reg, run_dir=None):
def remote_clean(reg, platform_names):
- """Clean on remote install targets (not localhost), given a set of
- platform names to look up.
+ """Run subprocesses to clean workflows on remote install targets
+ (skip localhost), given a set of platform names to look up.
Args:
reg (str): Workflow name.
@@ -707,8 +707,8 @@ def _remote_clean_cmd(reg, platform):
workflow.
"""
LOG.info(
- f'Cleaning on install target: {platform["install target"]}, '
- f'platform: {platform["name"]}')
+ f'Cleaning on install target: {platform["install target"]} '
+ f'(platform: {platform["name"]})')
cmd = ['remote-clean', reg, get_remote_suite_run_dir(platform, reg)]
cmd = construct_ssh_cmd(cmd, platform, timeout='10s')
LOG.debug(" ".join(cmd))
diff --git a/tests/functional/cylc-clean/00-basic.t b/tests/functional/cylc-clean/00-basic.t
index 5c09be7635d..0f0336ccb98 100644
--- a/tests/functional/cylc-clean/00-basic.t
+++ b/tests/functional/cylc-clean/00-basic.t
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
@@ -49,23 +49,13 @@ mkdir "${TEST_DIR}/${SYM_NAME}-log/cylc-run/cylctb-${CYLC_TEST_TIME_INIT}/leave-
FUNCTIONAL_DIR="${TEST_SOURCE_DIR_BASE%/*}"
# -----------------------------------------------------------------------------
-TEST_NAME="run-dir-tree-pre-clean"
-tree --charset=ascii "$SUITE_RUN_DIR" > "${TEST_NAME}.stdout"
-# Remove last line of output:
-sed -i '$d' "${TEST_NAME}.stdout"
+TEST_NAME="run-dir-readlink-pre-clean"
+readlink "$SUITE_RUN_DIR" > "${TEST_NAME}.stdout"
-cmp_ok "${TEST_NAME}.stdout" << __TREE__
-${HOME}/cylc-run/${SUITE_NAME}
-|-- log -> ${TEST_DIR}/${SYM_NAME}-log/cylc-run/${SUITE_NAME}/log
-|-- share -> ${TEST_DIR}/${SYM_NAME}-share/cylc-run/${SUITE_NAME}/share
-\`-- work -> ${TEST_DIR}/${SYM_NAME}-work/cylc-run/${SUITE_NAME}/work
-
-__TREE__
+cmp_ok "${TEST_NAME}.stdout" <<< "${TEST_DIR}/${SYM_NAME}-run/cylc-run/${SUITE_NAME}"
TEST_NAME="test-dir-tree-pre-clean"
-tree --charset=ascii "${TEST_DIR}/${SYM_NAME}-"* > "${TEST_NAME}.stdout"
-# Remove last line of output:
-sed -i '$d' "${TEST_NAME}.stdout"
+tree --noreport --charset=ascii "${TEST_DIR}/${SYM_NAME}-"* > "${TEST_NAME}.stdout"
# Note: backticks need to be escaped in the heredoc
cmp_ok "${TEST_NAME}.stdout" << __TREE__
${TEST_DIR}/${SYM_NAME}-cycle
@@ -108,7 +98,6 @@ ${TEST_DIR}/${SYM_NAME}-work
\`-- cylc-clean
\`-- ${TEST_NAME_BASE}
\`-- work
-
__TREE__
# -----------------------------------------------------------------------------
run_ok "cylc-clean" cylc clean "$SUITE_NAME"
@@ -117,9 +106,7 @@ TEST_NAME="run-dir-not-exist-post-clean"
exists_fail "$SUITE_RUN_DIR"
TEST_NAME="test-dir-tree-post-clean"
-tree --charset=ascii "${TEST_DIR}/${SYM_NAME}-"* > "${TEST_NAME}.stdout"
-# Remove last line of output:
-sed -i '$d' "${TEST_NAME}.stdout"
+tree --noreport --charset=ascii "${TEST_DIR}/${SYM_NAME}-"* > "${TEST_NAME}.stdout"
cmp_ok "${TEST_NAME}.stdout" << __TREE__
${TEST_DIR}/${SYM_NAME}-cycle
@@ -134,7 +121,6 @@ ${TEST_DIR}/${SYM_NAME}-share
\`-- cylc-run
${TEST_DIR}/${SYM_NAME}-work
\`-- cylc-run
-
__TREE__
# -----------------------------------------------------------------------------
purge
From 8fb87630553b889b1c5c660ebd9b7034fa591023 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 24 Dec 2020 10:08:05 +0000
Subject: [PATCH 13/21] cylc clean: refactor
Move initial checking of whether a workflow can be cleaned to its own
function
---
cylc/flow/suite_files.py | 51 +++++++++++++++++++----------
tests/unit/test_suite_files.py | 60 ++++++++++++++++++----------------
2 files changed, 65 insertions(+), 46 deletions(-)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 922e3ed4459..2036792aa01 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -561,30 +561,50 @@ def register(reg=None, source=None, redirect=False):
return reg
-def init_clean(reg):
- """Remove a stopped workflow from the local scheduler filesystem and remote
- hosts.
+def _clean_check(reg, run_dir):
+ """Check whether a workflow can be cleaned.
Args:
- reg (str): workflow name.
+ reg (str): Workflow name.
+ run_dir (str): Path to the workflow run dir on the filesystem.
"""
_validate_reg(reg)
reg = os.path.normpath(reg)
if reg.startswith('.'):
raise WorkflowFilesError(
- 'Workflow name cannot be a path that points to the cylc-run '
- 'directory or above')
+ "Workflow name cannot be a path that points to the cylc-run "
+ "directory or above")
+ if not run_dir.is_dir() and not run_dir.is_symlink():
+ msg = f"No directory to clean at {run_dir}"
+ raise FileNotFoundError(msg)
+ try:
+ detect_old_contact_file(reg)
+ except SuiteServiceFileError as exc:
+ raise SuiteServiceFileError(
+ f"Cannot remove running workflow.\n\n{exc}")
+
+
+def init_clean(reg):
+ """Initiate the process of removing a stopped workflow from the local
+ scheduler filesystem and remote hosts.
+
+ Args:
+ reg (str): Workflow name.
+ """
local_run_dir = Path(get_suite_run_dir(reg))
- if not local_run_dir.is_dir() and not local_run_dir.is_symlink():
- LOG.info(f'No workflow directory to clean at {local_run_dir}')
+ try:
+ _clean_check(reg, local_run_dir)
+ except FileNotFoundError as exc:
+ LOG.info(str(exc))
return
+
platform_names = None
try:
platform_names = get_platforms_from_db(local_run_dir)
except FileNotFoundError:
LOG.warning(
- 'The workflow database is missing - will not be able to clean on '
- 'any remote platforms')
+ "The workflow database is missing - will not be able to clean on "
+ "any remote platforms")
except SuiteServiceFileError as exc:
raise SuiteServiceFileError(f"Cannot clean - {exc}")
@@ -609,14 +629,11 @@ def clean(reg, run_dir=None):
run_dir = Path(run_dir)
else:
run_dir = Path(get_suite_run_dir(reg))
- if not run_dir.is_dir() and not run_dir.is_symlink():
- LOG.info(f'No workflow directory to clean at {run_dir}')
- return
try:
- detect_old_contact_file(reg)
- except SuiteServiceFileError as exc:
- raise SuiteServiceFileError(
- f'Cannot remove running workflow.\n\n{exc}')
+ _clean_check(reg, run_dir)
+ except FileNotFoundError as exc:
+ LOG.info(str(exc))
+ return
# Note: 'share/cycle' must come first, and '' must come last
for possible_symlink in ('share/cycle', 'share', 'log', 'work', ''):
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index e2f022ca1c9..91f19e9e3a9 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -345,12 +345,42 @@ def test_validate_reg(reg, expected_err, expected_msg):
suite_files._validate_reg(reg)
+@pytest.mark.parametrize(
+ 'reg, not_stopped, err, err_msg',
+ [('foo/..', False, WorkflowFilesError,
+ "cannot be a path that points to the cylc-run directory or above"),
+ ('foo/../..', False, WorkflowFilesError,
+ "cannot be a path that points to the cylc-run directory or above"),
+ ('foo', True, SuiteServiceFileError, "Cannot remove running workflow")]
+)
+def test_clean_check(reg, not_stopped, err, err_msg, monkeypatch):
+ """Test that _clean_check() fails appropriately.
+
+ Params:
+ reg (str): Workflow name.
+ err (Exception): Expected error.
+ err_msg (str): Message that is expected to be in the exception.
+ """
+ run_dir = mock.Mock()
+
+ def mocked_detect_old_contact_file(reg):
+ if not_stopped:
+ raise SuiteServiceFileError('Mocked error')
+
+ monkeypatch.setattr('cylc.flow.suite_files.detect_old_contact_file',
+ mocked_detect_old_contact_file)
+
+ with pytest.raises(err) as exc:
+ suite_files._clean_check(reg, run_dir)
+ assert err_msg in str(exc.value)
+
+
@pytest.mark.parametrize(
'reg, props, clean_called, remote_clean_called',
[
('foo/bar', {
'no dir': True,
- 'log': (logging.INFO, "No workflow directory to clean")
+ 'log': (logging.INFO, "No directory to clean")
}, False, False),
('foo/bar', {
'no db': True,
@@ -425,34 +455,6 @@ def mocked_get_platforms_from_db(run_dir):
assert mocked_remote_clean.called is False
-@pytest.mark.parametrize(
- 'reg, err, err_msg',
- [('foo/..', WorkflowFilesError,
- "cannot be a path that points to the cylc-run directory or above"),
- ('foo/../..', WorkflowFilesError,
- "cannot be a path that points to the cylc-run directory or above")]
-)
-def test_init_clean_bad(reg, err, err_msg, monkeypatch):
- """Test the init_clean() function fails appropriately.
-
- Params:
- reg (str): Workflow name.
- err (Exception): Expected error.
- err_msg (str): Message that is expected to be in the exception.
- """
- # We don't want to accidentally delete any files during this test
- nerfed_os_rmdir = mock.Mock()
- monkeypatch.setattr('os.rmdir', nerfed_os_rmdir)
- nerfed_os_remove = mock.Mock()
- monkeypatch.setattr('os.remove', nerfed_os_remove)
-
- with pytest.raises(err) as exc:
- suite_files.init_clean(reg)
- assert err_msg in str(exc.value)
- assert nerfed_os_rmdir.called is False
- assert nerfed_os_remove.called is False
-
-
@pytest.mark.parametrize(
'reg, props',
[
From 7741ac0c0ae72b817dfed698958d65e5a6adb0a0 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 24 Dec 2020 11:11:48 +0000
Subject: [PATCH 14/21] cylc clean: functional test for remote clean
---
dockerfiles/cylc-dev/Dockerfile | 2 +-
tests/functional/cylc-clean/00-basic.t | 4 +-
tests/functional/cylc-clean/01-remote.t | 141 ++++++++++++++++++++++++
tests/functional/lib/bash/test_header | 17 ++-
4 files changed, 156 insertions(+), 8 deletions(-)
create mode 100644 tests/functional/cylc-clean/01-remote.t
diff --git a/dockerfiles/cylc-dev/Dockerfile b/dockerfiles/cylc-dev/Dockerfile
index 257dd9e7078..dee4f4914d4 100644
--- a/dockerfiles/cylc-dev/Dockerfile
+++ b/dockerfiles/cylc-dev/Dockerfile
@@ -33,7 +33,7 @@ COPY "$CYLC_FLOW_DIR" "cylc"
RUN apt-get update && \
# build-deps: build-essential
# run deps: procps, rsync
- apt-get -qq -y install build-essential procps rsync && \
+ apt-get -qq -y install build-essential procps rsync tree && \
# install conda stuff
conda init bash && \
. ./usr/local/etc/profile.d/conda.sh && \
diff --git a/tests/functional/cylc-clean/00-basic.t b/tests/functional/cylc-clean/00-basic.t
index 0f0336ccb98..a5c755e85f2 100644
--- a/tests/functional/cylc-clean/00-basic.t
+++ b/tests/functional/cylc-clean/00-basic.t
@@ -100,7 +100,9 @@ ${TEST_DIR}/${SYM_NAME}-work
\`-- work
__TREE__
# -----------------------------------------------------------------------------
-run_ok "cylc-clean" cylc clean "$SUITE_NAME"
+TEST_NAME="cylc-clean"
+run_ok "$TEST_NAME" cylc clean "$SUITE_NAME"
+dump_std "$TEST_NAME"
# -----------------------------------------------------------------------------
TEST_NAME="run-dir-not-exist-post-clean"
exists_fail "$SUITE_RUN_DIR"
diff --git a/tests/functional/cylc-clean/01-remote.t b/tests/functional/cylc-clean/01-remote.t
new file mode 100644
index 00000000000..650e4706dfb
--- /dev/null
+++ b/tests/functional/cylc-clean/01-remote.t
@@ -0,0 +1,141 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+# -----------------------------------------------------------------------------
+# Test that cylc clean succesfully removes the workflow on remote host
+
+export REQUIRE_PLATFORM='loc:remote fs:indep'
+. "$(dirname "$0")/test_header"
+
+SSH_CMD="$(cylc get-global-config -i "[platforms][${CYLC_TEST_PLATFORM}]ssh command") ${CYLC_TEST_HOST}"
+
+if ! $SSH_CMD command -v 'tree' > '/dev/null'; then
+ skip_all "'tree' command not available on remote host ${CYLC_TEST_HOST}"
+fi
+set_test_number 8
+
+# Generate random name for symlink dirs to avoid any clashes with other tests
+SYM_NAME="$(mktemp -u)"
+SYM_NAME="${SYM_NAME##*tmp.}"
+
+create_test_global_config "" "
+[symlink dirs]
+ [[${CYLC_TEST_INSTALL_TARGET}]]
+ run = ${TEST_DIR}/${SYM_NAME}-run
+ log = ${TEST_DIR}/${SYM_NAME}-other
+ share = ${TEST_DIR}/${SYM_NAME}-other
+ share/cycle = ${TEST_DIR}/${SYM_NAME}-cycle
+ work = ${TEST_DIR}/${SYM_NAME}-other
+"
+init_suite "${TEST_NAME_BASE}" << __FLOW__
+[scheduling]
+ [[graph]]
+ R1 = santa
+[runtime]
+ [[root]]
+ platform = ${CYLC_TEST_PLATFORM}
+__FLOW__
+
+FUNCTIONAL_DIR="${TEST_SOURCE_DIR_BASE%/*}"
+
+run_ok "${TEST_NAME_BASE}-validate" cylc validate "$SUITE_NAME"
+
+suite_run_ok "${TEST_NAME_BASE}-run" cylc run "$SUITE_NAME"
+poll_suite_stopped
+
+# Create a fake sibling workflow dir:
+$SSH_CMD mkdir "${TEST_DIR}/${SYM_NAME}-cycle/cylc-run/cylctb-${CYLC_TEST_TIME_INIT}/leave-me-alone"
+
+# -----------------------------------------------------------------------------
+
+TEST_NAME="run-dir-readlink-pre-clean.remote"
+$SSH_CMD readlink "\$HOME/cylc-run/${SUITE_NAME}" > "${TEST_NAME}.stdout"
+cmp_ok "${TEST_NAME}.stdout" <<< "${TEST_DIR}/${SYM_NAME}-run/cylc-run/${SUITE_NAME}"
+
+TEST_NAME="test-dir-tree-pre-clean.remote"
+$SSH_CMD tree -L 8 --noreport --charset=ascii "${TEST_DIR}/${SYM_NAME}-"'*' > "${TEST_NAME}.stdout"
+# Note: backticks need to be escaped in the heredoc
+cmp_ok "${TEST_NAME}.stdout" << __TREE__
+${TEST_DIR}/${SYM_NAME}-cycle
+\`-- cylc-run
+ \`-- cylctb-${CYLC_TEST_TIME_INIT}
+ |-- ${FUNCTIONAL_DIR}
+ | \`-- cylc-clean
+ | \`-- ${TEST_NAME_BASE}
+ | \`-- share
+ | \`-- cycle
+ \`-- leave-me-alone
+${TEST_DIR}/${SYM_NAME}-other
+\`-- cylc-run
+ \`-- cylctb-${CYLC_TEST_TIME_INIT}
+ \`-- ${FUNCTIONAL_DIR}
+ \`-- cylc-clean
+ \`-- ${TEST_NAME_BASE}
+ |-- log
+ | \`-- job
+ | \`-- 1
+ |-- share
+ | \`-- cycle -> ${TEST_DIR}/${SYM_NAME}-cycle/cylc-run/${SUITE_NAME}/share/cycle
+ \`-- work
+ \`-- 1
+${TEST_DIR}/${SYM_NAME}-run
+\`-- cylc-run
+ \`-- cylctb-${CYLC_TEST_TIME_INIT}
+ \`-- ${FUNCTIONAL_DIR}
+ \`-- cylc-clean
+ \`-- ${TEST_NAME_BASE}
+ |-- log -> ${TEST_DIR}/${SYM_NAME}-other/cylc-run/${SUITE_NAME}/log
+ |-- share -> ${TEST_DIR}/${SYM_NAME}-other/cylc-run/${SUITE_NAME}/share
+ \`-- work -> ${TEST_DIR}/${SYM_NAME}-other/cylc-run/${SUITE_NAME}/work
+__TREE__
+
+# -----------------------------------------------------------------------------
+
+TEST_NAME="cylc-clean"
+run_ok "$TEST_NAME" cylc clean "$SUITE_NAME"
+dump_std "$TEST_NAME"
+
+TEST_NAME="run-dir-not-exist-post-clean.local"
+# (Could use the function `exists_ok` here instead, but this keeps it consistent with the remote test below)
+if [[ ! -a "$SUITE_RUN_DIR" ]]; then
+ ok "$TEST_NAME"
+else
+ fail "$TEST_NAME"
+fi
+
+TEST_NAME="run-dir-not-exist-post-clean.remote"
+if $SSH_CMD [[ ! -a "\$HOME/cylc-run/${SUITE_NAME}" ]]; then
+ ok "$TEST_NAME"
+else
+ fail "$TEST_NAME"
+fi
+
+TEST_NAME="test-dir-tree-post-clean.remote"
+$SSH_CMD tree --noreport --charset=ascii "${TEST_DIR}/${SYM_NAME}-"'*' > "${TEST_NAME}.stdout"
+# Note: backticks need to be escaped in the heredoc
+cmp_ok "${TEST_NAME}.stdout" << __TREE__
+${TEST_DIR}/${SYM_NAME}-cycle
+\`-- cylc-run
+ \`-- cylctb-${CYLC_TEST_TIME_INIT}
+ \`-- leave-me-alone
+${TEST_DIR}/${SYM_NAME}-other
+\`-- cylc-run
+${TEST_DIR}/${SYM_NAME}-run
+\`-- cylc-run
+__TREE__
+
+purge
+exit
diff --git a/tests/functional/lib/bash/test_header b/tests/functional/lib/bash/test_header
index 7525edd499e..3cb87cdb865 100644
--- a/tests/functional/lib/bash/test_header
+++ b/tests/functional/lib/bash/test_header
@@ -127,6 +127,8 @@
# Get the FQDN of the current host using the same mechanism Cylc uses.
# get_fqdn [TARGET]
# SSH to TARGET and return `hostname -f`.
+# dump_std TEST_NAME
+# Dump stdout and stderr of TEST_NAME to the test log dir.
#-------------------------------------------------------------------------------
set -eu
@@ -190,14 +192,19 @@ fail() {
fi
}
+dump_std() {
+ local TEST_NAME="$1"
+ mkdir -p "${TEST_LOG_DIR}" # directory may not exist if run fails very early
+ cp -p "${TEST_NAME}.stdout" "${TEST_LOG_DIR}/${TEST_NAME}.stdout"
+ cp -p "${TEST_NAME}.stderr" "${TEST_LOG_DIR}/${TEST_NAME}.stderr"
+}
+
run_ok() {
local TEST_NAME="$1"
shift 1
if ! "$@" 1>"${TEST_NAME}.stdout" 2>"${TEST_NAME}.stderr"; then
fail "${TEST_NAME}"
- mkdir -p "${TEST_LOG_DIR}"
- cp -p "${TEST_NAME}.stdout" "${TEST_LOG_DIR}/${TEST_NAME}.stdout"
- cp -p "${TEST_NAME}.stderr" "${TEST_LOG_DIR}/${TEST_NAME}.stderr"
+ dump_std "${TEST_NAME}"
return
fi
ok "${TEST_NAME}"
@@ -208,9 +215,7 @@ run_fail() {
shift 1
if "$@" 1>"${TEST_NAME}.stdout" 2>"${TEST_NAME}.stderr"; then
fail "${TEST_NAME}"
- mkdir -p "${TEST_LOG_DIR}"
- cp -p "${TEST_NAME}.stdout" "${TEST_LOG_DIR}/${TEST_NAME}.stdout"
- cp -p "${TEST_NAME}.stderr" "${TEST_LOG_DIR}/${TEST_NAME}.stderr"
+ dump_std "${TEST_NAME}"
return
fi
ok "${TEST_NAME}"
From b36ddebf5429f0565b5dba2b30a342b369f3653f Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Tue, 5 Jan 2021 13:38:26 +0000
Subject: [PATCH 15/21] cylc clean: replace remote-clean cmd with an opt for
normal clean
plus 1 or 2 other code review suggestions
---
cylc/flow/rundb.py | 2 +-
cylc/flow/scripts/clean.py | 18 ++++++++---
cylc/flow/scripts/remote_clean.py | 53 -------------------------------
cylc/flow/suite_files.py | 18 +++--------
setup.cfg | 1 -
5 files changed, 20 insertions(+), 72 deletions(-)
delete mode 100644 cylc/flow/scripts/remote_clean.py
diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py
index e671d86366b..00fe05328f8 100644
--- a/cylc/flow/rundb.py
+++ b/cylc/flow/rundb.py
@@ -612,7 +612,7 @@ def select_task_job_run_times(self, callback):
def select_task_job_platforms(self):
"""Return the set of platform names from task_jobs table."""
- stmt = f"SELECT platform_name FROM {self.TABLE_TASK_JOBS}"
+ stmt = f"SELECT DISTINCT platform_name FROM {self.TABLE_TASK_JOBS}"
return set(i[0] for i in self.connect().execute(stmt))
def select_submit_nums(self, name, point):
diff --git a/cylc/flow/scripts/clean.py b/cylc/flow/scripts/clean.py
index c916d6f78ae..e793724ca0d 100644
--- a/cylc/flow/scripts/clean.py
+++ b/cylc/flow/scripts/clean.py
@@ -24,7 +24,7 @@
this is run for a workflow that was instead written directly in ~/cylc-run and
not backed up elsewhere, it will be lost.
-It will also remove an symlink directory targets.
+It will also remove any symlink directory targets.
Suite names can be hierarchical, corresponding to the path under ~/cylc-run.
@@ -36,20 +36,30 @@
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.terminal import cli_function
-from cylc.flow.suite_files import init_clean
+from cylc.flow.suite_files import clean, init_clean
def get_option_parser():
parser = COP(
__doc__,
- argdoc=[("REG", "Suite name")]
+ argdoc=[('REG', "Workflow name")]
)
+
+ parser.add_option(
+ '--local-only', '--local',
+ help="Only clean on the local filesystem (not remote hosts).",
+ action='store_true', dest='local_only'
+ )
+
return parser
@cli_function(get_option_parser)
def main(parser, opts, reg):
- init_clean(reg)
+ if opts.local_only:
+ clean(reg)
+ else:
+ init_clean(reg)
if __name__ == "__main__":
diff --git a/cylc/flow/scripts/remote_clean.py b/cylc/flow/scripts/remote_clean.py
deleted file mode 100644
index 2a869c81e60..00000000000
--- a/cylc/flow/scripts/remote_clean.py
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/usr/bin/env python3
-
-# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
-# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-"""cylc remote-clean [OPTIONS] ARGS
-
-(This command is for internal use.)
-
-Remove a stopped workflow from the remote host. This is called on any remote
-hosts when "cylc clean" is called on localhost.
-
-"""
-
-from cylc.flow.option_parsers import CylcOptionParser as COP
-from cylc.flow.terminal import cli_function
-from cylc.flow.suite_files import clean
-
-
-INTERNAL = True
-
-
-def get_option_parser():
- parser = COP(
- __doc__,
- argdoc=[
- ("REG", "Suite name"),
- ("[RUND]", "The run directory of the suite")
- ]
- )
- return parser
-
-
-@cli_function(get_option_parser)
-def main(parser, opts, reg, rund=None):
- clean(reg, rund)
-
-
-if __name__ == "__main__":
- main()
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 2036792aa01..9929e5473b2 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -16,9 +16,6 @@
"""Suite service files management."""
-# Note: Some modules are NOT imported in the header. Expensive modules are only
-# imported on demand.
-
import os
from pathlib import Path
from random import shuffle
@@ -34,8 +31,7 @@
CylcError, PlatformLookupError, SuiteServiceFileError, TaskRemoteMgmtError,
WorkflowFilesError)
from cylc.flow.pathutil import (
- get_remote_suite_run_dir, get_suite_run_dir, make_localhost_symlinks,
- remove_dir)
+ get_suite_run_dir, make_localhost_symlinks, remove_dir)
from cylc.flow.platforms import (
get_platform, get_install_target_to_platforms_map)
from cylc.flow.hostuserutil import (
@@ -614,7 +610,7 @@ def init_clean(reg):
clean(reg)
-def clean(reg, run_dir=None):
+def clean(reg):
"""Remove a stopped workflow from the local filesystem only.
Deletes the workflow run directory and any symlink dirs. Note: if the
@@ -623,12 +619,8 @@ def clean(reg, run_dir=None):
Args:
reg (str): Workflow name.
- run_dir (str): Path to the workflow run dir on the filesystem.
"""
- if run_dir:
- run_dir = Path(run_dir)
- else:
- run_dir = Path(get_suite_run_dir(reg))
+ run_dir = Path(get_suite_run_dir(reg))
try:
_clean_check(reg, run_dir)
except FileNotFoundError as exc:
@@ -716,7 +708,7 @@ def remote_clean(reg, platform_names):
def _remote_clean_cmd(reg, platform):
"""Remove a stopped workflow on a remote host.
- Call "cylc remote-clean" over ssh and return the subprocess.
+ Call "cylc clean --local-only" over ssh and return the subprocess.
Args:
reg (str): Workflow name.
@@ -726,7 +718,7 @@ def _remote_clean_cmd(reg, platform):
LOG.info(
f'Cleaning on install target: {platform["install target"]} '
f'(platform: {platform["name"]})')
- cmd = ['remote-clean', reg, get_remote_suite_run_dir(platform, reg)]
+ cmd = ['clean', '--local-only', reg]
cmd = construct_ssh_cmd(cmd, platform, timeout='10s')
LOG.debug(" ".join(cmd))
return Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
diff --git a/setup.cfg b/setup.cfg
index f7a775ab20b..ed594936aea 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -98,7 +98,6 @@ cylc.command =
register = cylc.flow.scripts.register:main
release = cylc.flow.scripts.release:main
reload = cylc.flow.scripts.reload:main
- remote-clean = cylc.flow.scripts.remote_clean:main
remote-init = cylc.flow.scripts.remote_init:main
remote-tidy = cylc.flow.scripts.remote_tidy:main
remove = cylc.flow.scripts.remove:main
From 2ceabe7181944aaf1c53ab21081a7bf42fa01113 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Tue, 5 Jan 2021 18:10:15 +0000
Subject: [PATCH 16/21] Update changelog
---
CHANGES.md | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index f1037fc938e..2c7242605f6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -133,8 +133,10 @@ hierarchy and ability to set site config directory.
[#3883](https://github.com/cylc/cylc-flow/pull/3883) - Added a new workflow
config option `[scheduling]stop after cycle point`.
-[#3961](https://github.com/cylc/cylc-flow/pull/3961) - Added a new command:
-`cylc clean`.
+[#3961](https://github.com/cylc/cylc-flow/pull/3961),
+[#4017](https://github.com/cylc/cylc-flow/pull/4017) - Added a new command:
+`cylc clean`, for removing stopped workflows on the local and any remote
+filesystems.
### Fixes
From 380aa1944e1a6d2edba796a87e5cd6a1725b1513 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 7 Jan 2021 17:27:44 +0000
Subject: [PATCH 17/21] remote clean: if it fails on an install target, defer
raise to loop end
---
cylc/flow/suite_files.py | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 9929e5473b2..25d2877ba8f 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -682,6 +682,7 @@ def remote_clean(reg, platform_names):
queue.append(
(_remote_clean_cmd(reg, platforms[0]), target, platforms)
)
+ failed_targets = []
for proc, target, platforms in queue:
ret_code = proc.wait()
out, err = (f.decode() for f in proc.communicate())
@@ -701,8 +702,10 @@ def remote_clean(reg, platform_names):
(_remote_clean_cmd(reg, platforms[0]), target, platforms)
)
else: # Exhausted list of platforms
- raise CylcError(
- f"Could not clean on install target: {target}")
+ failed_targets.append(target)
+ if failed_targets:
+ raise CylcError(
+ f"Could not clean on install targets: {', '.join(failed_targets)}")
def _remote_clean_cmd(reg, platform):
From 06cf5f8ec1d8eafb5fdabe171a98e610cab8430d Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 8 Jan 2021 13:36:27 +0000
Subject: [PATCH 18/21] Write unit test for remote_clean
---
cylc/flow/exceptions.py | 12 +--
tests/unit/test_suite_files.py | 137 ++++++++++++++++++++++++++++++++-
2 files changed, 142 insertions(+), 7 deletions(-)
diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py
index 5be6b5ac1b5..862e296a00b 100644
--- a/cylc/flow/exceptions.py
+++ b/cylc/flow/exceptions.py
@@ -84,18 +84,18 @@ class WorkflowFilesError(CylcError):
class TaskRemoteMgmtError(CylcError):
"""Exceptions initialising suite run directory of remote job host."""
- MSG_INIT = '%s: initialisation did not complete:\n' # %s owner_at_host
- MSG_SELECT = '%s: host selection failed:\n' # %s host
- MSG_TIDY = '%s: clean up did not complete:\n' # %s owner_at_host
+ MSG_INIT = "initialisation did not complete"
+ MSG_SELECT = "host selection failed"
+ MSG_TIDY = "clean up did not complete"
def __str__(self):
msg, platform_n, cmd_str, ret_code, out, err = self.args
- ret = (msg + 'COMMAND FAILED (%d): %s\n') % (
- platform_n, ret_code, cmd_str)
+ ret = (f"{platform_n}: {msg}:\n"
+ f"COMMAND FAILED ({ret_code}): {cmd_str}\n")
for label, item in ('STDOUT', out), ('STDERR', err):
if item:
for line in item.splitlines(True): # keep newline chars
- ret += 'COMMAND %s: %s' % (label, line)
+ ret += f"COMMAND {label}: {line}"
return ret
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index 91f19e9e3a9..fc0dcfe1ae3 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -22,7 +22,8 @@
from cylc.flow import CYLC_LOG
from cylc.flow import suite_files
-from cylc.flow.exceptions import SuiteServiceFileError, WorkflowFilesError
+from cylc.flow.exceptions import (
+ CylcError, SuiteServiceFileError, TaskRemoteMgmtError, WorkflowFilesError)
from cylc.flow.suite_files import check_nested_run_dirs
@@ -592,6 +593,140 @@ def test_clean_broken_symlink_run_dir(monkeypatch, tmp_path):
assert run_dir.parent.is_dir() is False
+PLATFORMS = {
+ 'enterprise': {
+ 'hosts': ['kirk', 'picard'],
+ 'install target': 'picard',
+ 'name': 'enterprise'
+ },
+ 'voyager': {
+ 'hosts': ['janeway'],
+ 'install target': 'janeway',
+ 'name': 'voyager'
+ },
+ 'stargazer': {
+ 'hosts': ['picard'],
+ 'install target': 'picard',
+ 'name': 'stargazer'
+ },
+ 'exeter': {
+ 'hosts': ['localhost'],
+ 'install target': 'localhost',
+ 'name': 'exeter'
+ }
+}
+
+
+@pytest.mark.parametrize(
+ 'install_targets_map, failed_platforms, expected_platforms, expected_err',
+ [
+ (
+ {'localhost': [PLATFORMS['exeter']]}, None, None, None
+ ),
+ (
+ {
+ 'localhost': [PLATFORMS['exeter']],
+ 'picard': [PLATFORMS['enterprise']]
+ },
+ None,
+ ['enterprise'],
+ None
+ ),
+ (
+ {
+ 'picard': [PLATFORMS['enterprise'], PLATFORMS['stargazer']],
+ 'janeway': [PLATFORMS['voyager']]
+ },
+ None,
+ ['enterprise', 'voyager'],
+ None
+ ),
+ (
+ {
+ 'picard': [PLATFORMS['enterprise'], PLATFORMS['stargazer']],
+ 'janeway': [PLATFORMS['voyager']]
+ },
+ ['enterprise'],
+ ['enterprise', 'stargazer', 'voyager'],
+ None
+ ),
+ (
+ {
+ 'picard': [PLATFORMS['enterprise'], PLATFORMS['stargazer']],
+ 'janeway': [PLATFORMS['voyager']]
+ },
+ ['enterprise', 'stargazer'],
+ ['enterprise', 'stargazer', 'voyager'],
+ (CylcError, "Could not clean on install targets: picard")
+ ),
+ (
+ {
+ 'picard': [PLATFORMS['enterprise']],
+ 'janeway': [PLATFORMS['voyager']]
+ },
+ ['enterprise', 'voyager'],
+ ['enterprise', 'voyager'],
+ (CylcError, "Could not clean on install targets: picard, janeway")
+ )
+ ]
+)
+def test_remote_clean(install_targets_map, failed_platforms,
+ expected_platforms, expected_err, monkeypatch, caplog):
+ """Test remote_clean() logic.
+
+ Params:
+ install_targets_map (dict): The map that would be returned by
+ platforms.get_install_target_to_platforms_map()
+ failed_platforms (list): If specified, any platforms that clean will
+ artificially fail on in this test case.
+ expected_platforms (list): If specified, all the platforms that the
+ remote clean cmd is expected to run on.
+ expected_err (tuple): If specified, a tuple of the form
+ (Exception, str) giving an exception that is expected to be raised.
+ """
+ # ----- Setup -----
+ caplog.set_level(logging.ERROR, CYLC_LOG)
+ monkeypatch.setattr(
+ 'cylc.flow.suite_files.get_install_target_to_platforms_map',
+ lambda x: install_targets_map)
+ # Remove randomness:
+ mocked_shuffle = mock.Mock()
+ monkeypatch.setattr('cylc.flow.suite_files.shuffle', mocked_shuffle)
+
+ def mocked_remote_clean_cmd_side_effect(reg, platform):
+ proc_ret_code = 0
+ if failed_platforms and platform['name'] in failed_platforms:
+ proc_ret_code = 1
+ return mock.Mock(
+ wait=lambda: proc_ret_code,
+ communicate=lambda: (b"", b""),
+ args=[])
+
+ mocked_remote_clean_cmd = mock.Mock(
+ side_effect=mocked_remote_clean_cmd_side_effect)
+ monkeypatch.setattr(
+ 'cylc.flow.suite_files._remote_clean_cmd', mocked_remote_clean_cmd)
+ # ----- Test -----
+ reg = 'foo'
+ platform_names = (
+ "This arg bypassed as we provide the install targets map in the test")
+ if expected_err:
+ err, msg = expected_err
+ with pytest.raises(err) as exc:
+ suite_files.remote_clean(reg, platform_names)
+ assert msg in str(exc.value)
+ else:
+ suite_files.remote_clean(reg, platform_names)
+ if expected_platforms:
+ for p_name in expected_platforms:
+ mocked_remote_clean_cmd.assert_any_call(reg, PLATFORMS[p_name])
+ else:
+ mocked_remote_clean_cmd.assert_not_called()
+ if failed_platforms:
+ for p_name in failed_platforms:
+ assert f"{p_name}: {TaskRemoteMgmtError.MSG_TIDY}" in caplog.text
+
+
def test_remove_empty_reg_parents(tmp_path):
"""Test that _remove_empty_parents() doesn't remove parents containing a
sibling."""
From ecbc0f4f2d2c91cc155f93c2aa0466ee2f7bb2db Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Wed, 13 Jan 2021 16:10:56 +0000
Subject: [PATCH 19/21] cylc clean: make remote clean subprocs more concurrent
---
cylc/flow/suite_files.py | 52 ++++++++++++++++++++--------------
tests/unit/test_suite_files.py | 2 +-
2 files changed, 31 insertions(+), 23 deletions(-)
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 08d21807950..3c5b9519a6f 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -22,6 +22,7 @@
import re
import shutil
from subprocess import Popen, PIPE, DEVNULL
+import time
import zmq.auth
import aiofiles
@@ -664,36 +665,43 @@ def remote_clean(reg, platform_names):
"Cannot clean on remote platforms as the workflow database is "
f"out of date/inconsistent with the global config - {exc}")
- queue = []
+ pool = []
for target, platforms in install_targets_map.items():
if target == 'localhost':
continue
shuffle(platforms)
# Issue ssh command:
- queue.append(
+ pool.append(
(_remote_clean_cmd(reg, platforms[0]), target, platforms)
)
failed_targets = []
- for proc, target, platforms in queue:
- ret_code = proc.wait()
- out, err = (f.decode() for f in proc.communicate())
- if out:
- LOG.info(out)
- if err:
- LOG.warning(err)
- if ret_code:
- # Try again on the next platform for this install target:
- this_platform = platforms.pop(0)
- exc = TaskRemoteMgmtError(
- TaskRemoteMgmtError.MSG_TIDY, this_platform['name'],
- " ".join(proc.args), ret_code, out, err)
- LOG.error(exc)
- if platforms:
- queue.append(
- (_remote_clean_cmd(reg, platforms[0]), target, platforms)
- )
- else: # Exhausted list of platforms
- failed_targets.append(target)
+ # Handle subproc pool results almost concurrently:
+ while pool:
+ for proc, target, platforms in pool:
+ ret_code = proc.poll()
+ if ret_code is None: # proc still running
+ continue
+ pool.remove((proc, target, platforms))
+ out, err = (f.decode() for f in proc.communicate())
+ if out:
+ LOG.info(out)
+ if err:
+ LOG.warning(err)
+ if ret_code:
+ # Try again using the next platform for this install target:
+ this_platform = platforms.pop(0)
+ exc = TaskRemoteMgmtError(
+ TaskRemoteMgmtError.MSG_TIDY, this_platform['name'],
+ " ".join(proc.args), ret_code, out, err)
+ LOG.error(exc)
+ if platforms:
+ pool.append(
+ (_remote_clean_cmd(reg, platforms[0]),
+ target, platforms)
+ )
+ else: # Exhausted list of platforms
+ failed_targets.append(target)
+ time.sleep(0.2)
if failed_targets:
raise CylcError(
f"Could not clean on install targets: {', '.join(failed_targets)}")
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index fc0dcfe1ae3..00fc275fe45 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -698,7 +698,7 @@ def mocked_remote_clean_cmd_side_effect(reg, platform):
if failed_platforms and platform['name'] in failed_platforms:
proc_ret_code = 1
return mock.Mock(
- wait=lambda: proc_ret_code,
+ poll=lambda: proc_ret_code,
communicate=lambda: (b"", b""),
args=[])
From add6e8b25a9f25994fcd6261adeaa941b5cce399 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Wed, 13 Jan 2021 12:11:40 +0000
Subject: [PATCH 20/21] cylc clean: Add timeout option for remote clean
Increase default timeout to 2 mins
---
cylc/flow/scripts/clean.py | 9 ++++++++-
cylc/flow/suite_files.py | 17 ++++++++++-------
tests/unit/test_suite_files.py | 11 ++++++-----
3 files changed, 24 insertions(+), 13 deletions(-)
diff --git a/cylc/flow/scripts/clean.py b/cylc/flow/scripts/clean.py
index e793724ca0d..34bda020545 100644
--- a/cylc/flow/scripts/clean.py
+++ b/cylc/flow/scripts/clean.py
@@ -51,6 +51,13 @@ def get_option_parser():
action='store_true', dest='local_only'
)
+ parser.add_option(
+ '--timeout',
+ help="The number of seconds to wait for cleaning to take place on "
+ "remote hosts before cancelling.",
+ action='store', default='120', dest='remote_timeout'
+ )
+
return parser
@@ -59,7 +66,7 @@ def main(parser, opts, reg):
if opts.local_only:
clean(reg)
else:
- init_clean(reg)
+ init_clean(reg, opts)
if __name__ == "__main__":
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 3c5b9519a6f..01428f6b031 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -572,12 +572,13 @@ def _clean_check(reg, run_dir):
f"Cannot remove running workflow.\n\n{exc}")
-def init_clean(reg):
+def init_clean(reg, opts):
"""Initiate the process of removing a stopped workflow from the local
scheduler filesystem and remote hosts.
Args:
reg (str): Workflow name.
+ opts (optparse.Values): CLI options object for cylc clean.
"""
local_run_dir = Path(get_suite_run_dir(reg))
try:
@@ -597,7 +598,7 @@ def init_clean(reg):
raise SuiteServiceFileError(f"Cannot clean - {exc}")
if platform_names and platform_names != {'localhost'}:
- remote_clean(reg, platform_names)
+ remote_clean(reg, platform_names, opts.remote_timeout)
# Lastly, clean on local filesystem:
clean(reg)
@@ -648,7 +649,7 @@ def clean(reg):
_remove_empty_reg_parents(reg, run_dir)
-def remote_clean(reg, platform_names):
+def remote_clean(reg, platform_names, timeout):
"""Run subprocesses to clean workflows on remote install targets
(skip localhost), given a set of platform names to look up.
@@ -656,6 +657,7 @@ def remote_clean(reg, platform_names):
reg (str): Workflow name.
platform_names (list): List of platform names to look up in the global
config, in order to determine the install targets to clean on.
+ timeout (str): Number of seconds to wait before cancelling.
"""
try:
install_targets_map = (
@@ -672,7 +674,7 @@ def remote_clean(reg, platform_names):
shuffle(platforms)
# Issue ssh command:
pool.append(
- (_remote_clean_cmd(reg, platforms[0]), target, platforms)
+ (_remote_clean_cmd(reg, platforms[0], timeout), target, platforms)
)
failed_targets = []
# Handle subproc pool results almost concurrently:
@@ -696,7 +698,7 @@ def remote_clean(reg, platform_names):
LOG.error(exc)
if platforms:
pool.append(
- (_remote_clean_cmd(reg, platforms[0]),
+ (_remote_clean_cmd(reg, platforms[0], timeout),
target, platforms)
)
else: # Exhausted list of platforms
@@ -707,7 +709,7 @@ def remote_clean(reg, platform_names):
f"Could not clean on install targets: {', '.join(failed_targets)}")
-def _remote_clean_cmd(reg, platform):
+def _remote_clean_cmd(reg, platform, timeout):
"""Remove a stopped workflow on a remote host.
Call "cylc clean --local-only" over ssh and return the subprocess.
@@ -716,12 +718,13 @@ def _remote_clean_cmd(reg, platform):
reg (str): Workflow name.
platform (dict): Config for the platform on which to remove the
workflow.
+ timeout (str): Number of seconds to wait before cancelling the command.
"""
LOG.info(
f'Cleaning on install target: {platform["install target"]} '
f'(platform: {platform["name"]})')
cmd = ['clean', '--local-only', reg]
- cmd = construct_ssh_cmd(cmd, platform, timeout='10s')
+ cmd = construct_ssh_cmd(cmd, platform, timeout=timeout)
LOG.debug(" ".join(cmd))
return Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index 00fc275fe45..809166f6e49 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -443,7 +443,7 @@ def mocked_get_platforms_from_db(run_dir):
mocked_get_platforms_from_db)
# --- The actual test ---
- suite_files.init_clean(reg)
+ suite_files.init_clean(reg, opts=mock.Mock())
if expected_log:
assert msg in caplog.text
if clean_called:
@@ -693,7 +693,7 @@ def test_remote_clean(install_targets_map, failed_platforms,
mocked_shuffle = mock.Mock()
monkeypatch.setattr('cylc.flow.suite_files.shuffle', mocked_shuffle)
- def mocked_remote_clean_cmd_side_effect(reg, platform):
+ def mocked_remote_clean_cmd_side_effect(reg, platform, timeout):
proc_ret_code = 0
if failed_platforms and platform['name'] in failed_platforms:
proc_ret_code = 1
@@ -713,13 +713,14 @@ def mocked_remote_clean_cmd_side_effect(reg, platform):
if expected_err:
err, msg = expected_err
with pytest.raises(err) as exc:
- suite_files.remote_clean(reg, platform_names)
+ suite_files.remote_clean(reg, platform_names, timeout='irrelevant')
assert msg in str(exc.value)
else:
- suite_files.remote_clean(reg, platform_names)
+ suite_files.remote_clean(reg, platform_names, timeout='irrelevant')
if expected_platforms:
for p_name in expected_platforms:
- mocked_remote_clean_cmd.assert_any_call(reg, PLATFORMS[p_name])
+ mocked_remote_clean_cmd.assert_any_call(
+ reg, PLATFORMS[p_name], 'irrelevant')
else:
mocked_remote_clean_cmd.assert_not_called()
if failed_platforms:
From c24244b35333ded9228a9fae5ba598e62cc08ca0 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 14 Jan 2021 12:32:39 +0000
Subject: [PATCH 21/21] Reduce verbosity of cylc clean logging
---
cylc/flow/pathutil.py | 8 ++++----
cylc/flow/scripts/clean.py | 9 +++++++++
cylc/flow/suite_files.py | 25 ++++++++++++++-----------
tests/unit/test_suite_files.py | 5 +++--
4 files changed, 30 insertions(+), 17 deletions(-)
diff --git a/cylc/flow/pathutil.py b/cylc/flow/pathutil.py
index 0079754a5fc..25281a6ae04 100644
--- a/cylc/flow/pathutil.py
+++ b/cylc/flow/pathutil.py
@@ -219,15 +219,15 @@ def remove_dir(path):
if os.path.islink(path):
if os.path.exists(path):
target = os.path.realpath(path)
- LOG.info(
+ LOG.debug(
f'Removing symlink target directory: ({path} ->) {target}')
rmtree(target)
- LOG.info(f'Removing symlink: {path}')
+ LOG.debug(f'Removing symlink: {path}')
else:
- LOG.info(f'Removing broken symlink: {path}')
+ LOG.debug(f'Removing broken symlink: {path}')
os.remove(path)
elif not os.path.exists(path):
raise FileNotFoundError(path)
else:
- LOG.info(f'Removing directory: {path}')
+ LOG.debug(f'Removing directory: {path}')
rmtree(path)
diff --git a/cylc/flow/scripts/clean.py b/cylc/flow/scripts/clean.py
index 34bda020545..513b5f62a3e 100644
--- a/cylc/flow/scripts/clean.py
+++ b/cylc/flow/scripts/clean.py
@@ -34,6 +34,9 @@
"""
+import cylc.flow.flags
+from cylc.flow import LOG
+from cylc.flow.loggingutil import CylcLogFormatter
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.terminal import cli_function
from cylc.flow.suite_files import clean, init_clean
@@ -63,6 +66,12 @@ def get_option_parser():
@cli_function(get_option_parser)
def main(parser, opts, reg):
+ if not cylc.flow.flags.debug:
+ # for readability omit timestamps from logging unless in debug mode
+ for handler in LOG.handlers:
+ if isinstance(handler.formatter, CylcLogFormatter):
+ handler.formatter.configure(timestamp=False)
+
if opts.local_only:
clean(reg)
else:
diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py
index 01428f6b031..9e7310a3e0d 100644
--- a/cylc/flow/suite_files.py
+++ b/cylc/flow/suite_files.py
@@ -31,6 +31,7 @@
from cylc.flow.exceptions import (
CylcError, PlatformLookupError, SuiteServiceFileError, TaskRemoteMgmtError,
WorkflowFilesError)
+import cylc.flow.flags
from cylc.flow.pathutil import (
get_suite_run_dir, make_localhost_symlinks, remove_dir)
from cylc.flow.platforms import (
@@ -591,15 +592,13 @@ def init_clean(reg, opts):
try:
platform_names = get_platforms_from_db(local_run_dir)
except FileNotFoundError:
- LOG.warning(
- "The workflow database is missing - will not be able to clean on "
- "any remote platforms")
+ LOG.info("No workflow database - will only clean locally")
except SuiteServiceFileError as exc:
raise SuiteServiceFileError(f"Cannot clean - {exc}")
if platform_names and platform_names != {'localhost'}:
remote_clean(reg, platform_names, opts.remote_timeout)
- # Lastly, clean on local filesystem:
+ LOG.info("Cleaning on local filesystem")
clean(reg)
@@ -672,6 +671,8 @@ def remote_clean(reg, platform_names, timeout):
if target == 'localhost':
continue
shuffle(platforms)
+ LOG.info(
+ f"Cleaning on install target: {platforms[0]['install target']}")
# Issue ssh command:
pool.append(
(_remote_clean_cmd(reg, platforms[0], timeout), target, platforms)
@@ -686,16 +687,14 @@ def remote_clean(reg, platform_names, timeout):
pool.remove((proc, target, platforms))
out, err = (f.decode() for f in proc.communicate())
if out:
- LOG.info(out)
- if err:
- LOG.warning(err)
+ LOG.debug(out)
if ret_code:
# Try again using the next platform for this install target:
this_platform = platforms.pop(0)
exc = TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_TIDY, this_platform['name'],
" ".join(proc.args), ret_code, out, err)
- LOG.error(exc)
+ LOG.debug(exc)
if platforms:
pool.append(
(_remote_clean_cmd(reg, platforms[0], timeout),
@@ -703,6 +702,8 @@ def remote_clean(reg, platform_names, timeout):
)
else: # Exhausted list of platforms
failed_targets.append(target)
+ elif err:
+ LOG.debug(err)
time.sleep(0.2)
if failed_targets:
raise CylcError(
@@ -720,10 +721,12 @@ def _remote_clean_cmd(reg, platform, timeout):
workflow.
timeout (str): Number of seconds to wait before cancelling the command.
"""
- LOG.info(
+ LOG.debug(
f'Cleaning on install target: {platform["install target"]} '
- f'(platform: {platform["name"]})')
+ f'(using platform: {platform["name"]})')
cmd = ['clean', '--local-only', reg]
+ if cylc.flow.flags.debug:
+ cmd.append('--debug')
cmd = construct_ssh_cmd(cmd, platform, timeout=timeout)
LOG.debug(" ".join(cmd))
return Popen(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE)
@@ -753,7 +756,7 @@ def _remove_empty_reg_parents(reg, path):
continue
try:
parent.rmdir()
- LOG.info(f'Removing directory: {parent}')
+ LOG.debug(f'Removing directory: {parent}')
except OSError:
break
diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py
index 809166f6e49..66b213a4017 100644
--- a/tests/unit/test_suite_files.py
+++ b/tests/unit/test_suite_files.py
@@ -385,7 +385,8 @@ def mocked_detect_old_contact_file(reg):
}, False, False),
('foo/bar', {
'no db': True,
- 'log': (logging.WARNING, "The workflow database is missing")
+ 'log': (logging.INFO,
+ "No workflow database - will only clean locally")
}, True, False),
('foo/bar', {
'db platforms': ['localhost', 'localhost']
@@ -685,7 +686,7 @@ def test_remote_clean(install_targets_map, failed_platforms,
(Exception, str) giving an exception that is expected to be raised.
"""
# ----- Setup -----
- caplog.set_level(logging.ERROR, CYLC_LOG)
+ caplog.set_level(logging.DEBUG, CYLC_LOG)
monkeypatch.setattr(
'cylc.flow.suite_files.get_install_target_to_platforms_map',
lambda x: install_targets_map)