Skip to content

Commit

Permalink
Merge pull request #4017 from MetRonnie/cylc-clean
Browse files Browse the repository at this point in the history
cylc clean 2: remote clean
  • Loading branch information
hjoliver authored Jan 20, 2021
2 parents af770d4 + c24244b commit 8aba48f
Show file tree
Hide file tree
Showing 17 changed files with 819 additions and 133 deletions.
6 changes: 4 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ hierarchy and ability to set site config directory.
[#3883](https://github.com/cylc/cylc-flow/pull/3883) - Added a new workflow
config option `[scheduling]stop after cycle point`.

[#3961](https://github.com/cylc/cylc-flow/pull/3961) - Added a new command:
`cylc clean`.
[#3961](https://github.com/cylc/cylc-flow/pull/3961),
[#4017](https://github.com/cylc/cylc-flow/pull/4017) - Added a new command:
`cylc clean`, for removing stopped workflows on the local and any remote
filesystems.

[#3913](https://github.com/cylc/cylc-flow/pull/3913) - Added the ability to
use plugins to parse suite templating variables and additional files to
Expand Down
16 changes: 8 additions & 8 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,16 +548,16 @@
accepts up to 236 characters.
''')
Conf('install target', VDR.V_STRING, desc='''
This defaults to the platform name. This will be used as the
target for remote file installation.
For example, to indicate to Cylc that Platform_A shares a file
system with localhost, we would configure as follows:
This defaults to the platform name. This will be used as the
target for remote file installation.
For example, to indicate to Cylc that Platform_A shares a file
system with localhost, we would configure as follows:
.. code-block:: cylc
.. code-block:: cylc
[platforms]
[[Platform_A]]
install target = localhost
[platforms]
[[Platform_A]]
install target = localhost
''')

Conf('clean job submission environment', VDR.V_BOOLEAN, False,
Expand Down
12 changes: 6 additions & 6 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,18 @@ class WorkflowFilesError(CylcError):
class TaskRemoteMgmtError(CylcError):
"""Exceptions initialising suite run directory of remote job host."""

MSG_INIT = '%s: initialisation did not complete:\n' # %s owner_at_host
MSG_SELECT = '%s: host selection failed:\n' # %s host
MSG_TIDY = '%s: clean up did not complete:\n' # %s owner_at_host
MSG_INIT = "initialisation did not complete"
MSG_SELECT = "host selection failed"
MSG_TIDY = "clean up did not complete"

def __str__(self):
msg, platform_n, cmd_str, ret_code, out, err = self.args
ret = (msg + 'COMMAND FAILED (%d): %s\n') % (
platform_n, ret_code, cmd_str)
ret = (f"{platform_n}: {msg}:\n"
f"COMMAND FAILED ({ret_code}): {cmd_str}\n")
for label, item in ('STDOUT', out), ('STDERR', err):
if item:
for line in item.splitlines(True): # keep newline chars
ret += 'COMMAND %s: %s' % (label, line)
ret += f"COMMAND {label}: {line}"
return ret


Expand Down
14 changes: 7 additions & 7 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def make_suite_run_tree(suite):
dir_ = os.path.expandvars(get_suite_run_dir(suite))
for i in range(archlen, -1, -1): # archlen...0
if i > 0:
dpath = dir_ + '.' + str(i)
dpath = f'{dir_}.{i}'
else:
dpath = dir_
if os.path.exists(dpath):
Expand All @@ -125,7 +125,7 @@ def make_suite_run_tree(suite):
rmtree(dpath)
else:
# roll others over
os.rename(dpath, dir_ + '.' + str(i + 1))
os.rename(dpath, f'{dir_}.{i + 1}')
# Create
for dir_ in (
get_suite_run_dir(suite),
Expand All @@ -138,7 +138,7 @@ def make_suite_run_tree(suite):
dir_ = os.path.expandvars(dir_)
if dir_:
os.makedirs(dir_, exist_ok=True)
LOG.debug('%s: directory created', dir_)
LOG.debug(f'{dir_}: directory created')


def make_localhost_symlinks(suite):
Expand Down Expand Up @@ -219,15 +219,15 @@ def remove_dir(path):
if os.path.islink(path):
if os.path.exists(path):
target = os.path.realpath(path)
LOG.info(
LOG.debug(
f'Removing symlink target directory: ({path} ->) {target}')
rmtree(target)
LOG.info(f'Removing symlink: {path}')
LOG.debug(f'Removing symlink: {path}')
else:
LOG.info(f'Removing broken symlink: {path}')
LOG.debug(f'Removing broken symlink: {path}')
os.remove(path)
elif not os.path.exists(path):
raise FileNotFoundError(path)
else:
LOG.info(f'Removing directory: {path}')
LOG.debug(f'Removing directory: {path}')
rmtree(path)
24 changes: 22 additions & 2 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,28 @@ def get_install_target_from_platform(platform):
return platform.get('install target')


def is_platform_with_target_in_list(
install_target, distinct_platforms_list):
def get_install_target_to_platforms_map(platform_names):
"""Get a dictionary of unique install targets and the platforms which use
them.
Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...}
Args:
platform_names (list): List of platform names to look up in the
global config.
"""
platform_names = set(platform_names)
platforms = [get_platform(p_name) for p_name in platform_names]
install_targets = set(get_install_target_from_platform(platform)
for platform in platforms)
return {
target: [platform for platform in platforms
if get_install_target_from_platform(platform) == target]
for target in install_targets
}


def is_platform_with_target_in_list(install_target, distinct_platforms_list):
"""Determines whether install target is in the list of platforms"""
for distinct_platform in distinct_platforms_list:
return install_target == distinct_platform['install target']
14 changes: 10 additions & 4 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,12 @@ class CylcSuiteDAO:
],
}

def __init__(self, db_file_name=None, is_public=False):
"""Initialise object.
def __init__(self, db_file_name, is_public=False):
"""Initialise database access object.
db_file_name - Path to the database file
is_public - If True, allow retries, etc
Args:
db_file_name (str): Path to the database file.
is_public (bool): If True, allow retries, etc.
"""
self.db_file_name = expandvars(db_file_name)
Expand Down Expand Up @@ -609,6 +610,11 @@ def select_task_job_run_times(self, callback):
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_task_job_platforms(self):
"""Return the set of platform names from task_jobs table."""
stmt = f"SELECT DISTINCT platform_name FROM {self.TABLE_TASK_JOBS}"
return set(i[0] for i in self.connect().execute(stmt))

def select_submit_nums(self, name, point):
"""Select submit_num and flow_label from task_states table.
Expand Down
41 changes: 33 additions & 8 deletions cylc/flow/scripts/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,64 @@

"""cylc clean [OPTIONS] ARGS
Remove a stopped workflow from the local scheduler filesystem.
Remove a stopped workflow from the local scheduler filesystem and remote hosts.
NOTE: this command is intended for workflows installed with `cylc install`. If
this is run for a workflow that was instead written directly in ~/cylc-run and
not backed up elsewhere, it will be lost.
It will also remove an symlink directory targets. For now, it will fail to
remove workflow files/directories on a remote host.
It will also remove any symlink directory targets.
Suite names can be hierarchical, corresponding to the path under ~/cylc-run.
Examples:
# Remove the workflow at ~/cylc-run/foo
$ cylc clean foo
# Remove the workflow at ~/cylc-run/foo/bar
$ cylc clean foo/bar
"""

import cylc.flow.flags
from cylc.flow import LOG
from cylc.flow.loggingutil import CylcLogFormatter
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.terminal import cli_function
from cylc.flow.suite_files import clean
from cylc.flow.suite_files import clean, init_clean


def get_option_parser():
parser = COP(
__doc__,
argdoc=[("REG", "Suite name")]
argdoc=[('REG', "Workflow name")]
)

parser.add_option(
'--local-only', '--local',
help="Only clean on the local filesystem (not remote hosts).",
action='store_true', dest='local_only'
)

parser.add_option(
'--timeout',
help="The number of seconds to wait for cleaning to take place on "
"remote hosts before cancelling.",
action='store', default='120', dest='remote_timeout'
)

return parser


@cli_function(get_option_parser)
def main(parser, opts, reg):
clean(reg)
if not cylc.flow.flags.debug:
# for readability omit timestamps from logging unless in debug mode
for handler in LOG.handlers:
if isinstance(handler.formatter, CylcLogFormatter):
handler.formatter.configure(timestamp=False)

if opts.local_only:
clean(reg)
else:
init_clean(reg, opts)


if __name__ == "__main__":
Expand Down
34 changes: 21 additions & 13 deletions cylc/flow/suite_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,29 +557,38 @@ def recover_pub_from_pri(self):

def on_restart(self):
"""Check & vacuum the runtime DB on restart."""
if not os.path.isfile(self.pri_path):
try:
self.check_suite_db_compatibility()
except FileNotFoundError:
raise SuiteServiceFileError(
"Cannot restart as the workflow database was not found")
except SuiteServiceFileError as exc:
raise SuiteServiceFileError(
'Cannot restart as suite database not found')
self.check_suite_db_compatibility()
f"Cannot restart - {exc}")
pri_dao = self.get_pri_dao()
pri_dao.vacuum()
self.n_restart = pri_dao.select_suite_params_restart_count() + 1
self.put_suite_params_1(self.KEY_RESTART_COUNT, self.n_restart)
pri_dao.close()
try:
pri_dao.vacuum()
self.n_restart = pri_dao.select_suite_params_restart_count() + 1
self.put_suite_params_1(self.KEY_RESTART_COUNT, self.n_restart)
finally:
pri_dao.close()

def check_suite_db_compatibility(self):
"""Raises SuiteServiceFileError if the existing suite database is
incompatible with the current version of Cylc."""
if not os.path.isfile(self.pri_path):
raise FileNotFoundError(self.pri_path)
incompat_msg = (
f"Workflow database is incompatible with Cylc {CYLC_VERSION}")
pri_dao = self.get_pri_dao()
try:
last_run_ver = pri_dao.connect().execute(
f'SELECT value FROM {self.TABLE_SUITE_PARAMS} '
f'WHERE key == "{self.KEY_CYLC_VERSION}"').fetchone()[0]
except TypeError:
raise SuiteServiceFileError(
'Cannot restart suite as the suite database is incompatible '
f'with Cylc {CYLC_VERSION}')
pri_dao.close()
raise SuiteServiceFileError(incompat_msg)
finally:
pri_dao.close()
try:
last_run_ver = packaging.version.Version(last_run_ver)
except packaging.version.InvalidVersion:
Expand All @@ -588,5 +597,4 @@ def check_suite_db_compatibility(self):
CylcSuiteDAO.RESTART_INCOMPAT_VERSION)
if last_run_ver <= restart_incompat_ver:
raise SuiteServiceFileError(
f'Cannot restart suite last run with Cylc {last_run_ver} as '
f'the suite database is incompatible with Cylc {CYLC_VERSION}')
f"{incompat_msg} (workflow last run with Cylc {last_run_ver})")
Loading

0 comments on commit 8aba48f

Please sign in to comment.