Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean: only attempt to clean platforms which have been installed to #4951

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class CylcWorkflowDAO:
TABLE_TASKS_TO_HOLD = "tasks_to_hold"
TABLE_XTRIGGERS = "xtriggers"
TABLE_ABS_OUTPUTS = "absolute_outputs"
TABLE_REMOTE_INIT = "remote_init"

TABLES_ATTRS = {
TABLE_BROADCAST_EVENTS: [
Expand Down Expand Up @@ -311,6 +312,10 @@ class CylcWorkflowDAO:
["name"],
["cycle"],
],
TABLE_REMOTE_INIT: [
["install_target", {"is_primary_key": True}],
["status"],
],
}

def __init__(self, db_file_name, is_public=False):
Expand Down Expand Up @@ -700,6 +705,27 @@ def select_task_job_platforms(self):
''' # nosec (table name is code constant)
return {i[0] for i in self.connect().execute(stmt)}

def select_install_targets_to_clean(self):
"""Return install targets onto which a workflow has installed files.

Use this to determine which install targets require remote clean.
"""
from cylc.flow.task_remote_mgr import REMOTE_INIT_CHANGES_MADE
statuses = ", ".join(
f'"{status}"'
for status in REMOTE_INIT_CHANGES_MADE
)
print(statuses)
stmt = rf'''
SELECT
install_target
FROM
{self.TABLE_REMOTE_INIT}
WHERE
status in ( {statuses} )
''' # nosec (table name is code constant)
return {i[0] for i in self.connect().execute(stmt)}

def select_submit_nums(self, name, point):
"""Select submit_num and flow_nums from task_states table.

Expand Down
18 changes: 13 additions & 5 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@
TASK_OUTPUT_SUCCEEDED
)
from cylc.flow.task_remote_mgr import (
REMOTE_FILE_INSTALL_255,
REMOTE_FILE_INSTALL_DONE,
REMOTE_FILE_INSTALL_FAILED,
REMOTE_FILE_INSTALL_IN_PROGRESS,
REMOTE_INIT_IN_PROGRESS,
REMOTE_INIT_255,
REMOTE_FILE_INSTALL_255,
REMOTE_INIT_DONE, REMOTE_INIT_FAILED,
REMOTE_INIT_DONE,
REMOTE_INIT_FAILED,
REMOTE_INIT_IN_PROGRESS,
REMOTE_INIT_NO_HOSTS,
TaskRemoteMgr
)
from cylc.flow.task_state import (
Expand Down Expand Up @@ -152,7 +154,11 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.bad_hosts = bad_hosts
self.bad_hosts_to_clear = set()
self.task_remote_mgr = TaskRemoteMgr(
workflow, proc_pool, self.bad_hosts)
workflow,
proc_pool,
self.bad_hosts,
workflow_db_mgr.put_remote_init_item,
)

def check_task_jobs(self, workflow, task_pool):
"""Check submission and execution timeout and polling timers.
Expand Down Expand Up @@ -453,7 +459,9 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
continue

if ri_map[install_target] in {
REMOTE_INIT_FAILED, REMOTE_FILE_INSTALL_FAILED
REMOTE_INIT_FAILED,
REMOTE_FILE_INSTALL_FAILED,
REMOTE_INIT_NO_HOSTS,
}:
# Remote init or install failed. Set submit-failed for all
# affected tasks and remove target from remote init map
Expand Down
43 changes: 40 additions & 3 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,30 +69,67 @@
# Remote installation literals
REMOTE_INIT_DONE = 'REMOTE INIT DONE'
REMOTE_INIT_FAILED = 'REMOTE INIT FAILED'
REMOTE_INIT_NO_HOSTS = 'REMOTE INIT FAILED - NO AVAILABLE HOSTS'
REMOTE_INIT_IN_PROGRESS = 'REMOTE INIT IN PROGRESS'
REMOTE_FILE_INSTALL_DONE = 'REMOTE FILE INSTALL DONE'
REMOTE_FILE_INSTALL_IN_PROGRESS = 'REMOTE FILE INSTALL IN PROGRESS'
REMOTE_FILE_INSTALL_FAILED = 'REMOTE FILE INSTALL FAILED'
REMOTE_INIT_255 = 'REMOTE INIT 255'
REMOTE_FILE_INSTALL_255 = 'REMOTE FILE INSTALL 255'

# the set of remote init states for which files will have or may have been
# installed onto the install target
# (used to determine when remote clean should be attempted)
REMOTE_INIT_CHANGES_MADE = {
# remote init completed
REMOTE_INIT_DONE,
# remote init in progress - files may have been installed
REMOTE_INIT_IN_PROGRESS,
REMOTE_FILE_INSTALL_IN_PROGRESS,
# remote init failed - files may have been installed
REMOTE_FILE_INSTALL_FAILED,
REMOTE_INIT_FAILED,
# file install failed - the first step (i.e. the remote init) must have
# succeeded for the file install to have started
REMOTE_FILE_INSTALL_255,
}


class RemoteTidyQueueTuple(NamedTuple):
platform: Dict[str, Any]
host: str
proc: 'Popen[str]'


class RIMap(dict):
"""Remote-init map.

The mapping which stores the status of remote-init operations on install
targets consisting of install_target:status pairs.

This is a reactive dictionary, remote init statuses are automatically
queued for DB write when updated.
"""

def __init__(self, update, *args, **kwargs):
self._update = update
dict.__init__(self, *args, **kwargs)

def __setitem__(self, key, value):
dict.__setitem__(self, key, value)
self._update(key, value)


class TaskRemoteMgr:
"""Manage task job remote initialisation, tidy, selection."""

def __init__(self, workflow, proc_pool, bad_hosts):
def __init__(self, workflow, proc_pool, bad_hosts, put_remote_init_item):
self.workflow = workflow
self.proc_pool = proc_pool
# self.remote_command_map = {command: host|PlatformError|None}
self.remote_command_map = {}
# self.remote_init_map = {(install target): status, ...}
self.remote_init_map = {}
self.remote_init_map = RIMap(put_remote_init_item)
self.uuid_str = None
# This flag is turned on when a host init/select command completes
self.ready = False
Expand Down Expand Up @@ -237,7 +274,7 @@ def remote_init(
)
)
self.remote_init_map[
platform['install target']] = REMOTE_INIT_FAILED
platform['install target']] = REMOTE_INIT_NO_HOSTS
self.bad_hosts -= set(platform['hosts'])
self.ready = True
else:
Expand Down
14 changes: 14 additions & 0 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ def delete_workflow_stop_task(self):
"""Delete workflow stop task from workflow_params table."""
self.delete_workflow_params(self.KEY_STOP_TASK)

def delete_remote_init_item(self, install_target):
self.db_deletes_map.setdefault(CylcWorkflowDAO.TABLE_REMOTE_INIT, [])
self.db_deletes_map[CylcWorkflowDAO.TABLE_REMOTE_INIT].append(
{'install_target': install_target}
)

def get_pri_dao(self):
"""Return the primary DAO."""
return CylcWorkflowDAO(self.pri_path)
Expand Down Expand Up @@ -627,6 +633,14 @@ def put_update_task_outputs(self, itask):
self.db_updates_map[self.TABLE_TASK_OUTPUTS].append(
(set_args, where_args))

def put_remote_init_item(self, install_target: str, status: str) -> None:
"""Insert or update a remote init / fileinstall status."""
self.db_inserts_map.setdefault(CylcWorkflowDAO.TABLE_REMOTE_INIT, [])
self.db_inserts_map[CylcWorkflowDAO.TABLE_REMOTE_INIT].append({
'install_target': install_target,
'status': status,
})

def _put_update_task_x(self, table_name, itask, set_args):
"""Put UPDATE statement for a task_* table."""
where_args = {
Expand Down
40 changes: 28 additions & 12 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
get_host_from_platform,
get_install_target_to_platforms_map,
get_localhost_install_target,
get_install_target_from_platform,
get_platform,
)
from cylc.flow.hostuserutil import (
get_user,
Expand Down Expand Up @@ -785,7 +787,7 @@ def init_clean(reg: str, opts: 'Values') -> None:
if not opts.local_only:
platform_names = None
try:
platform_names = get_platforms_from_db(local_run_dir)
install_target_map = get_platforms_from_db(local_run_dir)
except FileNotFoundError:
if opts.remote_only:
raise ServiceFileError(
Expand All @@ -797,10 +799,15 @@ def init_clean(reg: str, opts: 'Values') -> None:
)
except ServiceFileError as exc:
raise ServiceFileError(f"Cannot clean {reg} - {exc}")
except PlatformLookupError as exc:
raise PlatformLookupError(
f"Cannot clean {reg} on remote platforms as the workflow"
" database is out of date/inconsistent with the global config"
f" - {exc}")

if platform_names and platform_names != {'localhost'}:
if install_target_map and install_target_map != {'localhost'}:
remote_clean(
reg, platform_names, opts.rm_dirs, opts.remote_timeout
reg, install_target_map, opts.rm_dirs, opts.remote_timeout
)

if not opts.remote_only:
Expand Down Expand Up @@ -968,7 +975,7 @@ def _clean_using_glob(

def remote_clean(
reg: str,
platform_names: Iterable[str],
install_targets_map,
rm_dirs: Optional[List[str]] = None,
timeout: str = '120'
) -> None:
Expand All @@ -982,13 +989,6 @@ def remote_clean(
rm_dirs: Sub dirs to remove instead of the whole run dir.
timeout: Number of seconds to wait before cancelling.
"""
try:
install_targets_map = (
get_install_target_to_platforms_map(platform_names))
except PlatformLookupError as exc:
raise PlatformLookupError(
f"Cannot clean {reg} on remote platforms as the workflow database "
f"is out of date/inconsistent with the global config - {exc}")
queue: Deque[RemoteCleanQueueTuple] = deque()
remote_clean_cmd = partial(
_remote_clean_cmd, reg=reg, rm_dirs=rm_dirs, timeout=timeout
Expand Down Expand Up @@ -1167,6 +1167,9 @@ def get_platforms_from_db(run_dir):
"""Load the set of names of platforms (that jobs ran on) from the
workflow database.

Note this filters out any platforms on which remote-init had not made
any changes (e.g. total failure).

Args:
run_dir (str): The workflow run directory.
"""
Expand All @@ -1176,10 +1179,23 @@ def get_platforms_from_db(run_dir):
try:
pri_dao = workflow_db_mgr.get_pri_dao()
platform_names = pri_dao.select_task_job_platforms()
return platform_names
install_targets = pri_dao.select_install_targets_to_clean()
finally:
pri_dao.close()

install_target_map = (
get_install_target_to_platforms_map(platform_names)
)

for install_target in set(install_target_map) - set(install_targets):
LOG.debug(
f'skipping install target {install_target}'
' - remote init not attempted'
)
install_target_map.pop(install_target)

return install_target_map


def check_deprecation(path, warn=True):
"""Warn and turn on back-compat flag if Cylc 7 suite.rc detected.
Expand Down