Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File installation via rsync and install target installation added #3796

Merged
merged 31 commits into from
Oct 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1d12c1d
Add rsync file installation
datamel Aug 13, 2020
d2aed91
install target added
datamel Aug 15, 2020
8a8c74f
add sharedfs test
datamel Aug 19, 2020
bbe2c9d
file installation
datamel Aug 20, 2020
a37ce0d
add rsync log file
datamel Aug 25, 2020
bd78bc6
remote tidy needs fixing
datamel Aug 26, 2020
26c5f68
added test for rsync logging and fixed remote tidy
datamel Aug 27, 2020
cc4b16c
restart not working
datamel Aug 27, 2020
c2c6337
fix restart
datamel Aug 31, 2020
bf6aaba
install target and file installation
datamel Aug 31, 2020
d4682cf
tidy up commit
datamel Aug 31, 2020
5b3c357
Fix naming
datamel Aug 31, 2020
708cde4
Install target for restart
datamel Aug 31, 2020
1e91d32
Fix docstrings for install_target
datamel Sep 7, 2020
a363623
Fix functional tests with install target
datamel Sep 8, 2020
e75d882
Replace platform[name] with host for comparison. Fix host-to-platform…
datamel Sep 9, 2020
554e9dc
cylc validate changed to include includes validation
datamel Sep 10, 2020
64a2a6a
update logging for rsync
datamel Sep 14, 2020
2463592
RST fixes
datamel Sep 14, 2020
7ee2065
pycodestyle fix
datamel Sep 14, 2020
1ac041e
rsync timeout
datamel Sep 14, 2020
0c28286
fix rst and verbose mode of rsync
datamel Sep 16, 2020
9c0d9c0
rysnc error timeout
datamel Sep 21, 2020
6914120
fix import
datamel Sep 21, 2020
e5a3751
change logging of file installation and respond to OS review feedback
datamel Sep 22, 2020
ff2a272
Remove global shh log-level
datamel Sep 23, 2020
6ba1eea
Fix job-submission functional test
datamel Sep 23, 2020
5a78266
Fix functional test authentication/01
datamel Sep 23, 2020
13ea1b4
Respond to feedback from HO
datamel Sep 29, 2020
551e402
fix localhost for file install
datamel Sep 29, 2020
71d47ea
fix broken import
datamel Oct 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ compatibility, the `cylc run` command will automatically symlink an existing
[#3816](https://github.com/cylc/cylc-flow/pull/3816) - change `cylc spawn`
command name to `cylc set-outputs` to better reflect its role in Cylc 8.

[#3796](https://github.com/cylc/cylc-flow/pull/3796) - Remote installation is
now on a per install target rather than a per platform basis. app/ bin/ etc/ lib/ directories are now installed on the target, configurable in flow.cylc.

[#3724](https://github.com/cylc/cylc-flow/pull/3724) - Re-implemented
the `cylc scan` command line interface and added a Python API for accessing
workflow scanning functionality.
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@


CYLC_LOG = 'cylc'
FILE_INSTALL_LOG = 'cylc-rsync'

LOG = logging.getLogger(CYLC_LOG)
LOG.addHandler(logging.NullHandler()) # Start with a null handler
RSYNC_LOG = logging.getLogger(FILE_INSTALL_LOG)
RSYNC_LOG.addHandler(logging.NullHandler())

LOG_LEVELS = {
"INFO": logging.INFO,
Expand Down
15 changes: 14 additions & 1 deletion cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@
similar interface to ``scp``.
''')
Conf('ssh command',
VDR.V_STRING, 'ssh -oBatchMode=yes -oConnectTimeout=10',
VDR.V_STRING,
'ssh -oBatchMode=yes -oConnectTimeout=10',
desc='''
A string for the command used to invoke commands on this host.
This is not used on the suite host unless you run local tasks
Expand Down Expand Up @@ -477,6 +478,18 @@
accepts up to 236 characters.
''')
Conf('owner', VDR.V_STRING)
Conf('install target', VDR.V_STRING, desc='''
This defaults to the platform name. This will be used as the
target for remote file installation.
For example, to indicate to Cylc that Platform_A shares a file
system with localhost, we would configure as follows:

.. code-block:: cylc

[platforms]
[[Platform_A]]
install target = localhost
''')
with Conf('localhost', meta=Platform):
Conf('hosts', VDR.V_STRING_LIST, ['localhost'])

Expand Down
36 changes: 36 additions & 0 deletions cylc/flow/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,42 @@
"suite-priority".
''')

with Conf('scheduler'):
Conf('install', VDR.V_STRING_LIST, desc='''
Configure the directories and files to be included in the remote
file installation.

.. note::
These, as standard, include the following directories:

* app
* bin
* etc
* lib

And include the server.key file (from the .service
directory), this is required for authentication.

These should be located in the top level of your Cylc workflow,
i.e. the directory that contains your flow.cylc file.

Directories must have a trailing slash.
For example, to add the following items to your file installation:

.. code-block:: none

~/cylc-run/workflow_x
|__dir1/
|__dir2/
|__file1
|__file2

.. code-block:: cylc

[scheduler]
install = dir/, dir2/, file1, file2
''')

with Conf('cylc'):
Conf('UTC mode', VDR.V_BOOLEAN)
Conf('cycle point format', VDR.V_CYCLE_POINT_FORMAT, desc='''
Expand Down
17 changes: 17 additions & 0 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ def __init__(
self.cfg = self.pcfg.get(sparse=True)
self.mem_log("config.py: after get(sparse=True)")

if 'scheduler' in self.cfg and 'install' in self.cfg['scheduler']:
self.get_validated_rsync_includes()

# First check for the essential scheduling section.
if 'scheduling' not in self.cfg:
raise SuiteConfigError("missing [scheduling] section.")
Expand Down Expand Up @@ -2335,3 +2338,17 @@ def get_expected_failed_tasks(self):
return []
else:
return None

def get_validated_rsync_includes(self):
"""Validate and return items to be included in the file installation"""
includes = self.cfg['scheduler']['install']
illegal_includes = []
for include in includes:
if include.count("/") > 1:
illegal_includes.append(f"{include}")
if len(illegal_includes) > 0:
raise SuiteConfigError(
"Error in [scheduler] install. "
"Directories can only be from the top level, please "
"reconfigure:" + str(illegal_includes)[1:-1])
return includes
5 changes: 2 additions & 3 deletions cylc/flow/loggingutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from cylc.flow.wallclock import (get_current_time_string,
get_time_string_from_unix_time)
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.pathutil import get_suite_run_log_name


class CylcLogFormatter(logging.Formatter):
Expand Down Expand Up @@ -118,8 +117,8 @@ class TimestampRotatingFileHandler(logging.FileHandler):
GLBL_KEY = 'suite logging'
MIN_BYTES = 1024

def __init__(self, suite, no_detach=False, timestamp=True):
logging.FileHandler.__init__(self, get_suite_run_log_name(suite))
def __init__(self, log_file_path, no_detach=False, timestamp=True):
logging.FileHandler.__init__(self, log_file_path)
self.no_detach = no_detach
self.stamp = None
self.formatter = CylcLogFormatter(timestamp=timestamp)
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def key_housekeeping(reg, platform=None, create=True):
"client_public_key": KeyInfo(
KeyType.PUBLIC,
KeyOwner.CLIENT,
suite_srv_dir=suite_srv_dir, platform=platform),
suite_srv_dir=suite_srv_dir, install_target=platform),
"client_private_key": KeyInfo(
KeyType.PRIVATE,
KeyOwner.CLIENT,
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from ansimarkup import parse as cparse

from cylc.flow import LOG
from cylc.flow import LOG, RSYNC_LOG
import cylc.flow.flags
from cylc.flow.loggingutil import CylcLogFormatter

Expand Down Expand Up @@ -255,6 +255,7 @@ def parse_args(self, remove_opts=None):
else:
LOG.setLevel(logging.INFO)
# Remove NullHandler before add the StreamHandler
RSYNC_LOG.setLevel(logging.INFO)
while LOG.handlers:
LOG.handlers[0].close()
LOG.removeHandler(LOG.handlers[0])
Expand Down
6 changes: 6 additions & 0 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ def get_suite_run_log_name(suite):
return expandvars(path)


def get_suite_file_install_log_name(suite):
"""Return suite file install log file path."""
path = get_suite_run_dir(suite, 'log', 'suite', 'file-installation-log')
return expandvars(path)


def get_suite_run_config_log_dir(suite, *args):
"""Return suite run flow.cylc log directory, join any extra args."""
return expandvars(get_suite_run_dir(suite, 'log', 'flow-config', *args))
Expand Down
25 changes: 24 additions & 1 deletion cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from cylc.flow.exceptions import PlatformLookupError
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.hostuserutil import is_remote_host


FORBIDDEN_WITH_PLATFORM = (
Expand Down Expand Up @@ -315,7 +316,7 @@ def platform_from_job_info(platforms, job, remote):
# We have some special logic to identify whether task host and task
# batch system match the platform in question.
if (
task_host == 'localhost' and
not is_remote_host(task_host) and
task_batch_system == 'background'
):
return 'localhost'
Expand Down Expand Up @@ -402,3 +403,25 @@ def fail_if_platform_and_host_conflict(task_conf, task_name, warn_only=False):
f"\"{task_name}\" has the following settings which "
f"are not compatible:\n{fail_items}"
)


def get_install_target_from_platform(platform):
"""Sets install target to configured or default platform name.

Args:
platform (dict):
A dict representing a platform.

Returns install target."""

if not platform['install target']:
platform['install target'] = platform['name']

return platform.get('install target')


def is_platform_with_target_in_list(
install_target, distinct_platforms_list):
"""Determines whether install target is in the list of platforms"""
for distinct_platform in distinct_platforms_list:
return install_target == distinct_platform['install target']
68 changes: 68 additions & 0 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import cylc.flow.flags
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow import LOG
from cylc.flow.platforms import get_platform, get_host_from_platform


Expand Down Expand Up @@ -154,6 +155,73 @@ def construct_platform_ssh_cmd(raw_cmd, platform, **kwargs):
return ret


def get_includes_to_rsync(rsync_includes=None):
"""Returns list of configured dirs/files for remote file installation."""

configured_includes = []

if rsync_includes is not None:
for include in rsync_includes:
if include.endswith("/"): # item is a directory
configured_includes.append("/" + include + "***")
else: # item is a file
configured_includes.append("/" + include)

return configured_includes


def construct_rsync_over_ssh_cmd(
src_path, dst_path, platform, rsync_includes=None):
"""Constructs the rsync command used for remote file installation.

Includes as standard the directories: app, bin, etc, lib; and the server
key, used for ZMQ authentication.

Args:
src_path(string): source path
dst_path(string): path of target
platform(dict)): contains info relating to platform
logfile(str): the path to the file logging the rsync
rsync_includes(list): files and directories to be included in the rsync

"""
dst_host = get_host_from_platform(platform)
rsync_cmd = ["rsync"]
ssh_cmd = platform['ssh command']
rsync_options = [
"-v",
"--perms",
"--recursive",
"--links",
"--checksum",
"--delete",
"--rsh=" + ssh_cmd,
"--include=/.service/",
"--include=/.service/server.key"
]
rsync_cmd.extend(rsync_options)
# Note to future devs - be wary of changing the order of the following
# rsync options, rsync is very particular about order of in/ex-cludes.
datamel marked this conversation as resolved.
Show resolved Hide resolved

for exclude in ['log', 'share', 'work']:
rsync_cmd.append(f"--exclude={exclude}")
default_includes = [
'/app/***',
datamel marked this conversation as resolved.
Show resolved Hide resolved
'/bin/***',
'/etc/***',
'/lib/***']
for include in default_includes:
rsync_cmd.append(f"--include={include}")
for include in get_includes_to_rsync(rsync_includes):
rsync_cmd.append(f"--include={include}")
# The following excludes are required in case these are added to the
rsync_cmd.append("--exclude=*") # exclude everything else
rsync_cmd.append(f"{src_path}/")
rsync_cmd.append(f"{dst_host}:{dst_path}/")
LOG.debug(f"rsync cmd use for file install: {' '.join(rsync_cmd)}")
return rsync_cmd


def construct_ssh_cmd(
raw_cmd, user=None, host=None, forward_x11=False, stdin=False,
ssh_cmd=None, ssh_login_shell=None, ssh_cylc=None, set_UTC=False,
Expand Down
46 changes: 29 additions & 17 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
get_suite_test_log_name,
make_suite_run_tree,
)
from cylc.flow.platforms import (
get_install_target_from_platform,
get_platform,
is_platform_with_target_in_list)
from cylc.flow.profiler import Profiler
from cylc.flow.resources import extract_resources
from cylc.flow.subprocpool import SubProcPool
Expand Down Expand Up @@ -101,7 +105,6 @@
get_time_string_from_unix_time as time2str,
get_utc_mode)
from cylc.flow.xtrigger_mgr import XtriggerManager
from cylc.flow.platforms import get_platform


class SchedulerStop(CylcError):
Expand Down Expand Up @@ -719,26 +722,33 @@ def restart_remote_init(self):
Note: tasks should all be in the runahead pool at this point.

"""
auths = set()

distinct_install_target_platforms = []

for itask in self.pool.get_rh_tasks():
itask.platform['install target'] = (
get_install_target_from_platform(itask.platform))
if itask.state(*TASK_STATUSES_ACTIVE):
auths.add(itask.platform['name'])
while auths:
for platform_name in auths.copy():
if (
self.task_job_mgr.task_remote_mgr.remote_init(
platform_name, self.curve_auth,
self.client_pub_key_dir
if not (
is_platform_with_target_in_list(
itask.platform['install target'],
distinct_install_target_platforms
)
is not None
):
auths.remove(
platform_name
)
if auths:
sleep(1.0)
# Remote init is done via process pool
self.proc_pool.process()
distinct_install_target_platforms.append(itask.platform)

incomplete_init = False
for platform in distinct_install_target_platforms:
if (self.task_job_mgr.task_remote_mgr.remote_init(
platform, self.curve_auth,
self.client_pub_key_dir) is None):
incomplete_init = True
datamel marked this conversation as resolved.
Show resolved Hide resolved
break
if incomplete_init:
# TODO: Review whether this sleep is needed.
sleep(1.0)
datamel marked this conversation as resolved.
Show resolved Hide resolved
# Remote init is done via process pool
self.proc_pool.process()
self.command_poll_tasks()

def _load_task_run_times(self, row_idx, row):
Expand Down Expand Up @@ -1232,6 +1242,8 @@ def process_task_pool(self):
itasks = self.pool.get_ready_tasks()
if itasks:
self.is_updated = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
for itask in self.task_job_mgr.submit_task_jobs(
self.suite,
itasks,
Expand Down
Loading