Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rsync to subprocpool #3953

Merged
merged 11 commits into from
Dec 10, 2020
2 changes: 0 additions & 2 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import cylc.flow.flags
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow import LOG
from cylc.flow.platforms import get_platform, get_host_from_platform


Expand Down Expand Up @@ -199,7 +198,6 @@ def construct_rsync_over_ssh_cmd(
rsync_cmd.append("--exclude=*") # exclude everything else
rsync_cmd.append(f"{src_path}/")
rsync_cmd.append(f"{dst_host}:{dst_path}/")
LOG.debug(f"rsync cmd use for file install: {' '.join(rsync_cmd)}")
return rsync_cmd


Expand Down
17 changes: 13 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
from cylc.flow.task_job_mgr import TaskJobManager
from cylc.flow.task_pool import TaskPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_remote_mgr import (
REMOTE_FILE_INSTALL_IN_PROGRESS, REMOTE_INIT_DONE,
REMOTE_INIT_IN_PROGRESS)
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
Expand Down Expand Up @@ -727,15 +730,21 @@ def restart_remote_init(self):

incomplete_init = False
for platform in distinct_install_target_platforms:
if (self.task_job_mgr.task_remote_mgr.remote_init(
platform, self.curve_auth,
self.client_pub_key_dir) is None):
self.task_job_mgr.task_remote_mgr.remote_init(
platform, self.curve_auth,
self.client_pub_key_dir)
if (self.task_job_mgr.task_remote_mgr.remote_init_map[platform[
'install target']] in [REMOTE_INIT_IN_PROGRESS,
REMOTE_FILE_INSTALL_IN_PROGRESS]):
incomplete_init = True
break
if self.task_job_mgr.task_remote_mgr.remote_init_map[
platform['install target']] == REMOTE_INIT_DONE:
self.task_job_mgr.task_remote_mgr.file_install(platform)
if incomplete_init:
# TODO: Review whether this sleep is needed.
sleep(1.0)
# Remote init is done via process pool
# Remote init/file-install is done via process pool
self.proc_pool.process()
self.command_poll_tasks()

Expand Down
125 changes: 92 additions & 33 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,86 @@
"""

import json
from logging import DEBUG, CRITICAL, INFO, WARNING
import os
from copy import deepcopy
from logging import (
CRITICAL,
DEBUG,
INFO,
WARNING
)
from shutil import rmtree
from time import time
from copy import deepcopy

from cylc.flow.parsec.util import pdeepcopy, poverride

from cylc.flow import LOG
from cylc.flow.batch_sys_manager import JobPollContext
from cylc.flow.exceptions import (
PlatformLookupError,
SuiteConfigError,
TaskRemoteMgmtError
)
from cylc.flow.hostuserutil import (
get_host, is_remote_platform
get_host,
is_remote_host,
is_remote_platform
)
from cylc.flow.job_file import JobFileWriter
from cylc.flow.parsec.util import (
pdeepcopy,
poverride
)
from cylc.flow.pathutil import get_remote_suite_run_job_dir
from cylc.flow.platforms import (
get_platform, get_host_from_platform, get_install_target_from_platform,
HOST_REC_COMMAND, PLATFORM_REC_COMMAND
HOST_REC_COMMAND,
PLATFORM_REC_COMMAND,
get_host_from_platform,
get_install_target_from_platform,
get_platform
)
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.remote import construct_ssh_cmd
from cylc.flow.subprocctx import SubProcContext
from cylc.flow.task_action_timer import TaskActionTimer
from cylc.flow.task_events_mgr import TaskEventsManager, log_task_job_activity
from cylc.flow.task_message import FAIL_MESSAGE_PREFIX
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_action_timer import (
TaskActionTimer,
TimerFlags
)
from cylc.flow.task_events_mgr import (
TaskEventsManager,
log_task_job_activity
)
from cylc.flow.task_job_logs import (
JOB_LOG_JOB, get_task_job_log, get_task_job_job_log,
get_task_job_activity_log, get_task_job_id, NN)
JOB_LOG_JOB,
NN,
get_task_job_activity_log,
get_task_job_id,
get_task_job_job_log,
get_task_job_log
)
from cylc.flow.task_message import FAIL_MESSAGE_PREFIX
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED)
TASK_OUTPUT_FAILED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUCCEEDED
)
from cylc.flow.task_remote_mgr import (
REMOTE_INIT_FAILED, TaskRemoteMgr)
REMOTE_FILE_INSTALL_DONE,
REMOTE_FILE_INSTALL_FAILED,
REMOTE_FILE_INSTALL_IN_PROGRESS,
REMOTE_INIT_DONE, REMOTE_INIT_FAILED,
TaskRemoteMgr
)
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUS_FAILED,
TASK_STATUS_READY,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED
TASK_STATUSES_ACTIVE
)
from cylc.flow.task_action_timer import TimerFlags
from cylc.flow.wallclock import get_current_time_string, get_utc_mode
from cylc.flow.remote import construct_ssh_cmd
from cylc.flow.exceptions import (
PlatformLookupError, SuiteConfigError, TaskRemoteMgmtError
from cylc.flow.wallclock import (
get_current_time_string,
get_utc_mode
)


Expand All @@ -91,6 +126,7 @@ class TaskJobManager:
POLL_FAIL = 'poll failed'
REMOTE_SELECT_MSG = 'waiting for remote host selection'
REMOTE_INIT_MSG = 'remote host initialising'
REMOTE_FILE_INSTALL_MSG = 'file installation in progress'
KEY_EXECUTE_TIME_LIMIT = TaskEventsManager.KEY_EXECUTE_TIME_LIMIT

def __init__(self, suite, proc_pool, suite_db_mgr,
Expand Down Expand Up @@ -217,27 +253,46 @@ def submit_task_jobs(self, suite, itasks, curve_auth,

# Group task jobs by (install target)
auth_itasks = {} # {install target: [itask, ...], ...}

for itask in prepared_tasks:
install_target = get_install_target_from_platform(itask.platform)
auth_itasks.setdefault(install_target, [])
auth_itasks[install_target].append(itask)
# Submit task jobs for each platform
done_tasks = bad_tasks
for install_target, itasks in sorted(auth_itasks.items()):
ri_map = self.task_remote_mgr.remote_init_map

# Re-fetch a copy of platform
platform = itasks[0].platform
is_init = self.task_remote_mgr.remote_init(
platform, curve_auth, client_pub_key_dir)
if is_init is None:
# Remote is waiting to be initialised
# Skip both remote init and remote file install for localhost
if (install_target == 'localhost' or
not is_remote_host(get_host_from_platform(platform))):
LOG.debug(f"REMOTE INIT NOT REQUIRED for {install_target}")
ri_map[install_target] = (REMOTE_FILE_INSTALL_DONE)
# Remote init not in progress, start it
if install_target not in ri_map.keys():
self.task_remote_mgr.remote_init(
platform, curve_auth, client_pub_key_dir)
for itask in itasks:
itask.set_summary_message(self.REMOTE_INIT_MSG)
self.job_pool.add_job_msg(
get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num),
self.REMOTE_INIT_MSG)
continue
# Already done remote so move on to file install
elif (ri_map[install_target] == REMOTE_INIT_DONE):
self.task_remote_mgr.file_install(platform)
# # Already doing file install
elif (ri_map[install_target] == REMOTE_FILE_INSTALL_IN_PROGRESS):
for itask in itasks:
itask.set_summary_message(self.REMOTE_FILE_INSTALL_MSG)
self.job_pool.add_job_msg(
get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num),
self.REMOTE_FILE_INSTALL_MSG)
continue

# Ensure that localhost background/at jobs are recorded as running
# on the host name of the current suite host, rather than just
# "localhost". On suite restart on a different suite host, this
Expand Down Expand Up @@ -268,16 +323,20 @@ def submit_task_jobs(self, suite, itasks, curve_auth,
'batch_sys_name': itask.summary['batch_sys_name'],
})
itask.is_manual_submit = False
if is_init == REMOTE_INIT_FAILED:
# Remote has failed to initialise
# Set submit-failed for all affected tasks

if (ri_map[install_target] in [REMOTE_INIT_FAILED,
REMOTE_FILE_INSTALL_FAILED]):
init_error = (ri_map[install_target])
# Remote has failed to initialise, remove target from remote
# init map and set submit-failed for all affected tasks
del ri_map[install_target]
for itask in itasks:
itask.local_job_file_path = None # reset for retry
log_task_job_activity(
SubProcContext(
self.JOBS_SUBMIT,
'(init %s)' % host,
err=REMOTE_INIT_FAILED,
err=init_error,
ret_code=1),
suite, itask.point, itask.tdef.name)
self.task_events_mgr.process_message(
Expand Down
9 changes: 4 additions & 5 deletions cylc/flow/task_remote_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
)
from cylc.flow.pathutil import make_symlink
from cylc.flow.resources import extract_resources


REMOTE_INIT_DONE = 'REMOTE INIT DONE'
REMOTE_INIT_NOT_REQUIRED = 'REMOTE INIT NOT REQUIRED'
REMOTE_INIT_FAILED = 'REMOTE INIT FAILED'
from cylc.flow.task_remote_mgr import (
REMOTE_INIT_DONE,
REMOTE_INIT_FAILED
)


def remove_keys_on_client(srvd, install_target, full_clean=False):
Expand Down
Loading