Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cylc install #4000

Merged
merged 13 commits into from
Jan 31, 2021
7 changes: 5 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ The filenames `suite.rc` and `global.rc` are now deprecated in favour of
compatibility, the `cylc run` command will automatically symlink an existing
`suite.rc` file to `flow.cylc`.

Remove cylc register's option `--run-dir=DIR`, which created a run directory
symlink to `DIR` (see #3884).
Remove `cylc register` command
([#4000](https://github.com/cylc/cylc-flow/pull/4000)).

### Enhancements

[#4014](https://github.com/cylc/cylc-flow/pull/4014) - Rename "ready" task
state to "preparing".

[#4000](https://github.com/cylc/cylc-flow/pull/4000) - Cylc install command
added. Install workflows into cylc run directory from arbitrary locations.

[#3992](https://github.com/cylc/cylc-flow/pull/3992) - Rename
batch system to job runner.

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import cylc.flow.flags
from cylc.flow.graphnode import GraphNodeParser
from cylc.flow.pathutil import (
get_suite_run_dir,
get_workflow_run_dir,
get_suite_run_log_dir,
get_suite_run_share_dir,
get_suite_run_work_dir,
Expand Down Expand Up @@ -157,7 +157,7 @@ def __init__(
self.suite = suite # suite name
self.fpath = fpath # suite definition
self.fdir = os.path.dirname(fpath)
self.run_dir = run_dir or get_suite_run_dir(self.suite)
self.run_dir = run_dir or get_workflow_run_dir(self.suite)
self.log_dir = log_dir or get_suite_run_log_dir(self.suite)
self.share_dir = share_dir or get_suite_run_share_dir(self.suite)
self.work_dir = work_dir or get_suite_run_work_dir(self.suite)
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc-bash-completion
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ _cylc() {
cur="${COMP_WORDS[COMP_CWORD]}"
sec="${COMP_WORDS[1]}"
opts="$(cylc scan -t name 2>/dev/null)"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-directory|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|graph-diff|hold|insert|kill|list|log|ls|tui|ping|poll|print|register|release|unhold|reload|remove|report-timings|reset|restart|run|start|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|graph-diff|hold|insert|install|kill|list|log|ls|tui|ping|poll|print|release|unhold|reload|remove|report-timings|reset|restart|run|start|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"


if [[ ${COMP_CWORD} -eq 1 ]]; then
Expand Down
64 changes: 45 additions & 19 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import os
from os.path import expandvars
import re
from shutil import rmtree

from cylc.flow import LOG
Expand Down Expand Up @@ -46,47 +47,47 @@ def get_remote_suite_work_dir(platform, suite, *args):
)


def get_suite_run_dir(suite, *args):
"""Return local suite run directory, join any extra args."""
def get_workflow_run_dir(flow_name, *args):
"""Return local workflow run directory, join any extra args."""
return expandvars(
os.path.join(
get_platform()['run directory'], suite, *args
get_platform()['run directory'], flow_name, *args
)
)


def get_suite_run_job_dir(suite, *args):
"""Return suite run job (log) directory, join any extra args."""
return expandvars(
get_suite_run_dir(suite, 'log', 'job', *args)
get_workflow_run_dir(suite, 'log', 'job', *args)
)


def get_suite_run_log_dir(suite, *args):
"""Return suite run log directory, join any extra args."""
return expandvars(get_suite_run_dir(suite, 'log', 'suite', *args))
return expandvars(get_workflow_run_dir(suite, 'log', 'suite', *args))


def get_suite_run_log_name(suite):
"""Return suite run log file path."""
path = get_suite_run_dir(suite, 'log', 'suite', 'log')
path = get_workflow_run_dir(suite, 'log', 'suite', 'log')
return expandvars(path)


def get_suite_file_install_log_name(suite):
"""Return suite file install log file path."""
path = get_suite_run_dir(suite, 'log', 'suite', 'file-installation-log')
path = get_workflow_run_dir(suite, 'log', 'suite', 'file-installation-log')
return expandvars(path)


def get_suite_run_config_log_dir(suite, *args):
"""Return suite run flow.cylc log directory, join any extra args."""
return expandvars(get_suite_run_dir(suite, 'log', 'flow-config', *args))
return expandvars(get_workflow_run_dir(suite, 'log', 'flow-config', *args))


def get_suite_run_pub_db_name(suite):
"""Return suite run public database file path."""
return expandvars(get_suite_run_dir(suite, 'log', 'db'))
return expandvars(get_workflow_run_dir(suite, 'log', 'db'))


def get_suite_run_share_dir(suite, *args):
Expand All @@ -105,15 +106,16 @@ def get_suite_run_work_dir(suite, *args):

def get_suite_test_log_name(suite):
"""Return suite run ref test log file path."""
return expandvars(get_suite_run_dir(suite, 'log', 'suite', 'reftest.log'))
return expandvars(
get_workflow_run_dir(suite, 'log', 'suite', 'reftest.log'))


def make_suite_run_tree(suite):
"""Create all top-level cylc-run output dirs on the suite host."""
cfg = glbl_cfg().get()
# Roll archive
archlen = cfg['scheduler']['run directory rolling archive length']
dir_ = os.path.expandvars(get_suite_run_dir(suite))
dir_ = os.path.expandvars(get_workflow_run_dir(suite))
for i in range(archlen, -1, -1): # archlen...0
if i > 0:
dpath = f'{dir_}.{i}'
Expand All @@ -128,7 +130,7 @@ def make_suite_run_tree(suite):
os.rename(dpath, f'{dir_}.{i + 1}')
# Create
for dir_ in (
get_suite_run_dir(suite),
get_workflow_run_dir(suite),
get_suite_run_log_dir(suite),
get_suite_run_job_dir(suite),
get_suite_run_config_log_dir(suite),
Expand All @@ -141,10 +143,19 @@ def make_suite_run_tree(suite):
LOG.debug(f'{dir_}: directory created')


def make_localhost_symlinks(suite):
"""Creates symlinks for any configured symlink dirs from glbl_cfg."""
dirs_to_symlink = get_dirs_to_symlink('localhost', suite)
rund = get_suite_run_dir(suite)
def make_localhost_symlinks(rund, named_sub_dir):
"""Creates symlinks for any configured symlink dirs from glbl_cfg.
Args:
rund: the entire run directory path
named_sub_dir: e.g flow_name/run1

Returns:
dict - A dictionary of Symlinks with sources as keys and
destinations as values: ``{source: destination}``

"""
dirs_to_symlink = get_dirs_to_symlink('localhost', named_sub_dir)
symlinks_created = {}
for key, value in dirs_to_symlink.items():
if key == 'run':
dst = rund
Expand All @@ -157,9 +168,13 @@ def make_localhost_symlinks(suite):
f' \'{value}\' contains an invalid environment variable.'
' Please check configuration.')
make_symlink(src, dst)
# symlink info returned for logging purposes, symlinks created
# before logs as this dir may be a symlink.
symlinks_created[src] = dst
return symlinks_created


def get_dirs_to_symlink(install_target, suite):
def get_dirs_to_symlink(install_target, flow_name):
"""Returns dictionary of directories to symlink from glbcfg."""
dirs_to_symlink = {}
symlink_conf = glbl_cfg().get(['symlink dirs'])
Expand All @@ -168,12 +183,12 @@ def get_dirs_to_symlink(install_target, suite):
return dirs_to_symlink
base_dir = symlink_conf[install_target]['run']
if base_dir is not None:
dirs_to_symlink['run'] = os.path.join(base_dir, 'cylc-run', suite)
dirs_to_symlink['run'] = os.path.join(base_dir, 'cylc-run', flow_name)
for dir_ in ['log', 'share', 'share/cycle', 'work']:
link = symlink_conf[install_target][dir_]
if link is None or link == base_dir:
continue
dirs_to_symlink[dir_] = os.path.join(link, 'cylc-run', suite, dir_)
dirs_to_symlink[dir_] = os.path.join(link, 'cylc-run', flow_name, dir_)
return dirs_to_symlink


Expand Down Expand Up @@ -231,3 +246,14 @@ def remove_dir(path):
else:
LOG.debug(f'Removing directory: {path}')
rmtree(path)


def get_next_rundir_number(run_path):
"""Return the new run number"""
run_n_path = os.path.expanduser(os.path.join(run_path, "runN"))
try:
old_run_path = os.readlink(run_n_path)
last_run_num = re.search(r'(?:run)(\d*$)', old_run_path).group(1)
return int(last_run_num) + 1
except OSError:
return 1
57 changes: 15 additions & 42 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import os
from queue import Empty, Queue
from shlex import quote
from shutil import copytree, rmtree
from subprocess import Popen, PIPE, DEVNULL
import sys
from threading import Barrier
Expand Down Expand Up @@ -63,7 +62,7 @@
from cylc.flow.parsec.util import printcfg
from cylc.flow.parsec.validate import DurationFloat
from cylc.flow.pathutil import (
get_suite_run_dir,
get_workflow_run_dir,
get_suite_run_log_dir,
get_suite_run_config_log_dir,
get_suite_run_share_dir,
Expand All @@ -81,7 +80,6 @@
from cylc.flow.suite_db_mgr import SuiteDatabaseManager
from cylc.flow.suite_events import (
SuiteEventContext, SuiteEventHandler)
from cylc.flow.exceptions import SuiteServiceFileError
from cylc.flow.suite_status import StopMode, AutoRestartMode
from cylc.flow import suite_files
from cylc.flow.taskdef import TaskDef
Expand Down Expand Up @@ -261,9 +259,8 @@ def __init__(self, reg, options, is_restart=False):
)

# directory information
self.suite_dir = suite_files.get_suite_source_dir(self.suite)
self.flow_file = suite_files.get_flow_file(self.suite)
self.suite_run_dir = get_suite_run_dir(self.suite)
self.suite_run_dir = get_workflow_run_dir(self.suite)
self.suite_work_dir = get_suite_run_work_dir(self.suite)
self.suite_share_dir = get_suite_run_share_dir(self.suite)
self.suite_log_dir = get_suite_run_log_dir(self.suite)
Expand All @@ -279,19 +276,20 @@ def __init__(self, reg, options, is_restart=False):

async def install(self):
"""Get the filesystem in the right state to run the flow.

* Register.
* Validate flowfiles
* Install authentication files.
* Build the directory tree.
* Copy Python files.

"""
# Register
try:
suite_files.get_suite_source_dir(self.suite)
except SuiteServiceFileError:
# Source path is assumed to be the run directory
suite_files.register(self.suite, get_suite_run_dir(self.suite))
# Install
source = suite_files.get_suite_source_dir()
if source is None:
# register workflow
rund = get_workflow_run_dir(self.suite)
suite_files.register(self.suite, source=rund)

make_suite_run_tree(self.suite)

# Create ZMQ keys
key_housekeeping(self.suite, platform=self.options.host or 'localhost')
Expand All @@ -300,22 +298,12 @@ async def install(self):
extract_resources(
suite_files.get_suite_srv_dir(self.suite),
['etc/job.sh'])

make_suite_run_tree(self.suite)
# Copy local python modules from source to run directory
# Add python dirs to sys.path
for sub_dir in ["python", os.path.join("lib", "python")]:
# TODO - eventually drop the deprecated "python" sub-dir.
suite_py = os.path.join(self.suite_dir, sub_dir)
if (os.path.realpath(self.suite_dir) !=
os.path.realpath(self.suite_run_dir) and
os.path.isdir(suite_py)):
suite_run_py = os.path.join(self.suite_run_dir, sub_dir)
try:
rmtree(suite_run_py)
except OSError:
pass
copytree(suite_py, suite_run_py)
sys.path.append(os.path.join(self.suite_dir, sub_dir))
suite_py = os.path.join(self.suite_run_dir, sub_dir)
if os.path.isdir(suite_py):
sys.path.append(os.path.join(self.suite_run_dir, sub_dir))

async def initialise(self):
"""Initialise the components and sub-systems required to run the flow.
Expand Down Expand Up @@ -376,7 +364,6 @@ async def initialise(self):
proc_pool=self.proc_pool,
suite_run_dir=self.suite_run_dir,
suite_share_dir=self.suite_share_dir,
suite_source_dir=self.suite_dir
)

self.task_events_mgr = TaskEventsManager(
Expand Down Expand Up @@ -419,20 +406,6 @@ async def configure(self):
pri_dao.select_suite_template_vars(self._load_template_vars)
pri_dao.execute_queued_items()

# Copy local python modules from source to run directory
for sub_dir in ["python", os.path.join("lib", "python")]:
# TODO - eventually drop the deprecated "python" sub-dir.
suite_py = os.path.join(self.suite_dir, sub_dir)
if (os.path.realpath(self.suite_dir) !=
os.path.realpath(self.suite_run_dir) and
os.path.isdir(suite_py)):
suite_run_py = os.path.join(self.suite_run_dir, sub_dir)
try:
rmtree(suite_run_py)
except OSError:
pass
copytree(suite_py, suite_run_py)

self.profiler.log_memory("scheduler.py: before load_flow_file")
self.load_flow_file()
self.profiler.log_memory("scheduler.py: after load_flow_file")
Expand Down
Loading