Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc clean 2: remote clean #4017

Merged
merged 22 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
80572d0
cylc clean: Ensure run dir is removed if broken symlink
MetRonnie Dec 11, 2020
550340e
Tidy
MetRonnie Dec 11, 2020
3972780
cylc clean: remote platforms
MetRonnie Dec 16, 2020
41d1920
cylc clean: keep retrying with next platform in event of errors
MetRonnie Dec 16, 2020
4cd1e54
cylc clean: slight refactor & unit test for getting install targets map
MetRonnie Dec 17, 2020
4e679e7
Tidy
MetRonnie Dec 24, 2020
6b3c72b
Platforms: add unit test for get_install_target_from_platform()
MetRonnie Dec 17, 2020
0751972
Suite files: Add test case to test_validate_reg()
MetRonnie Dec 21, 2020
2763e64
SuiteDatabaseManager improvements
MetRonnie Dec 21, 2020
6a7aab7
cylc clean: refactor and add/update unit tests
MetRonnie Dec 21, 2020
6d38cbc
cylc clean: fix order of removing symlink dirs
MetRonnie Dec 23, 2020
a41c66c
Tidy
MetRonnie Dec 24, 2020
8fb8763
cylc clean: refactor
MetRonnie Dec 24, 2020
7741ac0
cylc clean: functional test for remote clean
MetRonnie Dec 24, 2020
b36ddeb
cylc clean: replace remote-clean cmd with an opt for normal clean
MetRonnie Jan 5, 2021
2ceabe7
Update changelog
MetRonnie Jan 5, 2021
380aa19
remote clean: if it fails on an install target, defer raise to loop end
MetRonnie Jan 7, 2021
06cf5f8
Write unit test for remote_clean
MetRonnie Jan 8, 2021
bb4463b
Merge branch 'master' into cylc-clean
wxtim Jan 11, 2021
ecbc0f4
cylc clean: make remote clean subprocs more concurrent
MetRonnie Jan 13, 2021
add6e8b
cylc clean: Add timeout option for remote clean
MetRonnie Jan 13, 2021
c24244b
Reduce verbosity of cylc clean logging
MetRonnie Jan 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ hierarchy and ability to set site config directory.
[#3883](https://github.com/cylc/cylc-flow/pull/3883) - Added a new workflow
config option `[scheduling]stop after cycle point`.

[#3961](https://github.com/cylc/cylc-flow/pull/3961) - Added a new command:
`cylc clean`.
[#3961](https://github.com/cylc/cylc-flow/pull/3961),
[#4017](https://github.com/cylc/cylc-flow/pull/4017) - Added a new command:
`cylc clean`, for removing stopped workflows on the local and any remote
filesystems.

[#3913](https://github.com/cylc/cylc-flow/pull/3913) - Added the ability to
use plugins to parse suite templating variables and additional files to
Expand Down
16 changes: 8 additions & 8 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,16 +548,16 @@
accepts up to 236 characters.
''')
Conf('install target', VDR.V_STRING, desc='''
This defaults to the platform name. This will be used as the
target for remote file installation.
For example, to indicate to Cylc that Platform_A shares a file
system with localhost, we would configure as follows:
This defaults to the platform name. This will be used as the
target for remote file installation.
For example, to indicate to Cylc that Platform_A shares a file
system with localhost, we would configure as follows:

.. code-block:: cylc
.. code-block:: cylc

[platforms]
[[Platform_A]]
install target = localhost
[platforms]
[[Platform_A]]
install target = localhost
''')

Conf('clean job submission environment', VDR.V_BOOLEAN, False,
Expand Down
12 changes: 6 additions & 6 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,18 @@ class WorkflowFilesError(CylcError):
class TaskRemoteMgmtError(CylcError):
"""Exceptions initialising suite run directory of remote job host."""

MSG_INIT = '%s: initialisation did not complete:\n' # %s owner_at_host
MSG_SELECT = '%s: host selection failed:\n' # %s host
MSG_TIDY = '%s: clean up did not complete:\n' # %s owner_at_host
MSG_INIT = "initialisation did not complete"
MSG_SELECT = "host selection failed"
MSG_TIDY = "clean up did not complete"

def __str__(self):
msg, platform_n, cmd_str, ret_code, out, err = self.args
ret = (msg + 'COMMAND FAILED (%d): %s\n') % (
platform_n, ret_code, cmd_str)
ret = (f"{platform_n}: {msg}:\n"
f"COMMAND FAILED ({ret_code}): {cmd_str}\n")
for label, item in ('STDOUT', out), ('STDERR', err):
if item:
for line in item.splitlines(True): # keep newline chars
ret += 'COMMAND %s: %s' % (label, line)
ret += f"COMMAND {label}: {line}"
return ret


Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def make_suite_run_tree(suite):
dir_ = os.path.expandvars(get_suite_run_dir(suite))
for i in range(archlen, -1, -1): # archlen...0
if i > 0:
dpath = dir_ + '.' + str(i)
dpath = f'{dir_}.{i}'
else:
dpath = dir_
if os.path.exists(dpath):
Expand All @@ -125,7 +125,7 @@ def make_suite_run_tree(suite):
rmtree(dpath)
else:
# roll others over
os.rename(dpath, dir_ + '.' + str(i + 1))
os.rename(dpath, f'{dir_}.{i + 1}')
# Create
for dir_ in (
get_suite_run_dir(suite),
Expand All @@ -138,7 +138,7 @@ def make_suite_run_tree(suite):
dir_ = os.path.expandvars(dir_)
if dir_:
os.makedirs(dir_, exist_ok=True)
LOG.debug('%s: directory created', dir_)
LOG.debug(f'{dir_}: directory created')


def make_localhost_symlinks(suite):
Expand Down
24 changes: 22 additions & 2 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,28 @@ def get_install_target_from_platform(platform):
return platform.get('install target')


def is_platform_with_target_in_list(
install_target, distinct_platforms_list):
def get_install_target_to_platforms_map(platform_names):
"""Get a dictionary of unique install targets and the platforms which use
them.

Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...}

Args:
platform_names (list): List of platform names to look up in the
global config.
"""
platform_names = set(platform_names)
platforms = [get_platform(p_name) for p_name in platform_names]
install_targets = set(get_install_target_from_platform(platform)
for platform in platforms)
return {
target: [platform for platform in platforms
if get_install_target_from_platform(platform) == target]
for target in install_targets
}


def is_platform_with_target_in_list(install_target, distinct_platforms_list):
"""Determines whether install target is in the list of platforms"""
for distinct_platform in distinct_platforms_list:
return install_target == distinct_platform['install target']
14 changes: 10 additions & 4 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,12 @@ class CylcSuiteDAO:
],
}

def __init__(self, db_file_name=None, is_public=False):
"""Initialise object.
def __init__(self, db_file_name, is_public=False):
"""Initialise database access object.

db_file_name - Path to the database file
is_public - If True, allow retries, etc
Args:
db_file_name (str): Path to the database file.
is_public (bool): If True, allow retries, etc.

"""
self.db_file_name = expandvars(db_file_name)
Expand Down Expand Up @@ -609,6 +610,11 @@ def select_task_job_run_times(self, callback):
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_task_job_platforms(self):
"""Return the set of platform names from task_jobs table."""
stmt = f"SELECT DISTINCT platform_name FROM {self.TABLE_TASK_JOBS}"
return set(i[0] for i in self.connect().execute(stmt))

def select_submit_nums(self, name, point):
"""Select submit_num and flow_label from task_states table.

Expand Down
25 changes: 17 additions & 8 deletions cylc/flow/scripts/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,48 @@

"""cylc clean [OPTIONS] ARGS

Remove a stopped workflow from the local scheduler filesystem.
Remove a stopped workflow from the local scheduler filesystem and remote hosts.

NOTE: this command is intended for workflows installed with `cylc install`. If
this is run for a workflow that was instead written directly in ~/cylc-run and
not backed up elsewhere, it will be lost.

It will also remove an symlink directory targets. For now, it will fail to
remove workflow files/directories on a remote host.
It will also remove any symlink directory targets.

Suite names can be hierarchical, corresponding to the path under ~/cylc-run.

Examples:
# Remove the workflow at ~/cylc-run/foo
$ cylc clean foo
# Remove the workflow at ~/cylc-run/foo/bar
$ cylc clean foo/bar

"""

from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.terminal import cli_function
from cylc.flow.suite_files import clean
from cylc.flow.suite_files import clean, init_clean


def get_option_parser():
parser = COP(
__doc__,
argdoc=[("REG", "Suite name")]
argdoc=[('REG', "Workflow name")]
)

parser.add_option(
'--local-only', '--local',
help="Only clean on the local filesystem (not remote hosts).",
action='store_true', dest='local_only'
)

return parser


@cli_function(get_option_parser)
def main(parser, opts, reg):
clean(reg)
if opts.local_only:
clean(reg)
else:
init_clean(reg)


if __name__ == "__main__":
Expand Down
34 changes: 21 additions & 13 deletions cylc/flow/suite_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,29 +557,38 @@ def recover_pub_from_pri(self):

def on_restart(self):
"""Check & vacuum the runtime DB on restart."""
if not os.path.isfile(self.pri_path):
try:
self.check_suite_db_compatibility()
except FileNotFoundError:
raise SuiteServiceFileError(
"Cannot restart as the workflow database was not found")
except SuiteServiceFileError as exc:
raise SuiteServiceFileError(
'Cannot restart as suite database not found')
self.check_suite_db_compatibility()
f"Cannot restart - {exc}")
pri_dao = self.get_pri_dao()
pri_dao.vacuum()
self.n_restart = pri_dao.select_suite_params_restart_count() + 1
self.put_suite_params_1(self.KEY_RESTART_COUNT, self.n_restart)
pri_dao.close()
try:
pri_dao.vacuum()
self.n_restart = pri_dao.select_suite_params_restart_count() + 1
self.put_suite_params_1(self.KEY_RESTART_COUNT, self.n_restart)
finally:
pri_dao.close()

def check_suite_db_compatibility(self):
"""Raises SuiteServiceFileError if the existing suite database is
incompatible with the current version of Cylc."""
if not os.path.isfile(self.pri_path):
raise FileNotFoundError(self.pri_path)
incompat_msg = (
f"Workflow database is incompatible with Cylc {CYLC_VERSION}")
pri_dao = self.get_pri_dao()
try:
last_run_ver = pri_dao.connect().execute(
f'SELECT value FROM {self.TABLE_SUITE_PARAMS} '
f'WHERE key == "{self.KEY_CYLC_VERSION}"').fetchone()[0]
except TypeError:
raise SuiteServiceFileError(
'Cannot restart suite as the suite database is incompatible '
f'with Cylc {CYLC_VERSION}')
pri_dao.close()
raise SuiteServiceFileError(incompat_msg)
finally:
pri_dao.close()
try:
last_run_ver = packaging.version.Version(last_run_ver)
except packaging.version.InvalidVersion:
Expand All @@ -588,5 +597,4 @@ def check_suite_db_compatibility(self):
CylcSuiteDAO.RESTART_INCOMPAT_VERSION)
if last_run_ver <= restart_incompat_ver:
raise SuiteServiceFileError(
f'Cannot restart suite last run with Cylc {last_run_ver} as '
f'the suite database is incompatible with Cylc {CYLC_VERSION}')
f"{incompat_msg} (workflow last run with Cylc {last_run_ver})")
Loading