Skip to content

Commit

Permalink
Merge pull request #5184 from hjoliver/scan-after-install
Browse files Browse the repository at this point in the history
Scan workflow name during install.
  • Loading branch information
hjoliver authored Oct 25, 2022
2 parents 7972114 + d64c0d9 commit 32d2633
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 21 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ ones in. -->

### Enhancements

[#5184](https://github.com/cylc/cylc-flow/pull/5184) - scan for active
runs of the same workflow at install time.

[#5032](https://github.com/cylc/cylc-flow/pull/5032) - set a default limit of
100 for the "default" queue.

Expand Down
119 changes: 106 additions & 13 deletions cylc/flow/scripts/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,37 @@
multiple workflow run directories that link to the same workflow definition.
"""

from ansimarkup import ansiprint as cprint
import asyncio
from optparse import Values
from pathlib import Path
from typing import Optional, TYPE_CHECKING, Dict, Any
from typing import Optional, Dict, Any

from cylc.flow.scripts.scan import (
get_pipe,
_format_plain,
FLOW_STATE_SYMBOLS,
FLOW_STATE_CMAP
)
from cylc.flow import iter_entry_points
from cylc.flow.exceptions import PluginError, InputError
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.pathutil import EXPLICIT_RELATIVE_PATH_REGEX, expand_path
from cylc.flow.loggingutil import CylcLogFormatter
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
Options
)
from cylc.flow.pathutil import (
EXPLICIT_RELATIVE_PATH_REGEX,
expand_path,
get_workflow_run_dir
)
from cylc.flow.workflow_files import (
install_workflow, search_install_source_dirs, parse_cli_sym_dirs
install_workflow,
parse_cli_sym_dirs,
search_install_source_dirs
)
from cylc.flow.terminal import cli_function

if TYPE_CHECKING:
from optparse import Values


def get_option_parser() -> COP:
parser = COP(
Expand Down Expand Up @@ -150,6 +166,16 @@ def get_option_parser() -> COP:
default=False,
dest="no_run_name")

parser.add_option(
"--no-ping",
help=(
"When scanning for active instances of the workflow, "
"do not attempt to contact the schedulers to get status."
),
action="store_true",
default=False,
dest="no_ping")

parser.add_cylc_rose_options()

return parser
Expand All @@ -162,7 +188,7 @@ def get_source_location(path: Optional[str]) -> Path:
"""
if path is None:
return Path.cwd()
path = path.strip()
path = str(path).strip()
expanded_path = Path(expand_path(path))
if expanded_path.is_absolute():
return expanded_path
Expand All @@ -171,14 +197,79 @@ def get_source_location(path: Optional[str]) -> Path:
return search_install_source_dirs(expanded_path)


async def scan(wf_name: str, ping: bool = True) -> None:
"""Print any instances of wf_name that are already active."""
opts = Values({
'name': [f'{wf_name}/*'],
'states': {'running', 'paused', 'stopping'},
'source': False,
'ping': ping, # get status of scanned workflows
})
active = [
item async for item in get_pipe(
opts, None,
scan_dir=get_workflow_run_dir(wf_name) # restricted scan
)
]
if active:
n = len(active)
grammar = (
["s", "are", "them all"]
if n > 1 else
["", "is", "it"]
)
print(
CylcLogFormatter.COLORS['WARNING'].format(
f'NOTE: {n} run%s of "{wf_name}"'
' %s already active:' % tuple(grammar[:2])
)
)
for item in active:
if opts.ping:
status = item['status']
tag = FLOW_STATE_CMAP[status]
symbol = f" <{tag}>{FLOW_STATE_SYMBOLS[status]}</{tag}>"
else:
symbol = " "
cprint(symbol, _format_plain(item, opts))
pattern = (
f"'{wf_name}/*'"
if n > 1 else
f"{item['name']}"
)
print(
f'You can stop %s with:\n cylc stop {pattern}'
'\nSee "cylc stop --help" for options.' % grammar[-1]
)


InstallOptions = Options(get_option_parser())


@cli_function(get_option_parser)
def main(parser, opts, reg=None):
install(parser, opts, reg)
def main(
_parser: COP,
opts: 'Values',
reg: Optional[str] = None
) -> None:
"""CLI wrapper."""
install_cli(opts, reg)


def install(
parser: COP, opts: 'Values', reg: Optional[str] = None
def install_cli(
opts: 'Values',
reg: Optional[str] = None
) -> None:
"""Install workflow and scan for already-running instances."""
wf_name = install(opts, reg)
asyncio.run(
scan(wf_name, not opts.no_ping)
)


def install(
opts: 'Values', reg: Optional[str] = None
) -> str:
if opts.no_run_name and opts.run_name:
raise InputError(
"options --no-run-name and --run-name are mutually exclusive."
Expand All @@ -204,7 +295,7 @@ def install(
elif opts.symlink_dirs:
cli_symdirs = parse_cli_sym_dirs(opts.symlink_dirs)

source_dir, rundir, _workflow_name = install_workflow(
source_dir, rundir, workflow_name = install_workflow(
source=source,
workflow_name=opts.workflow_name,
run_name=opts.run_name,
Expand All @@ -229,3 +320,5 @@ def install(
entry_point.name,
exc
) from None

return workflow_name
150 changes: 150 additions & 0 deletions tests/integration/test_install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Test cylc install."""

import pytest
from pathlib import Path

from .test_scan import init_flows

from cylc.flow.async_util import pipe
from cylc.flow.scripts import scan
from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.scripts.install import (
InstallOptions,
install_cli
)

from typing import Callable, Tuple

SRV_DIR = Path(WorkflowFiles.Service.DIRNAME)
CONTACT = Path(WorkflowFiles.Service.CONTACT)
RUN_N = Path(WorkflowFiles.RUN_N)
INSTALL = Path(WorkflowFiles.Install.DIRNAME)

INSTALLED_MSG = "INSTALLED {wfrun} from"
WF_ACTIVE_MSG = '1 run of "{wf}" is already active:'
BAD_CONTACT_MSG = "Bad contact file:"


@pytest.fixture()
def patch_graphql_query(
monkeypatch: pytest.MonkeyPatch
):
# Define a mocked graphql_query pipe function.
@pipe
async def _graphql_query(flow, fields, filters=None):
flow.update({"status": "running"})
return flow

# Swap out the function that cylc.flow.scripts.scan.
monkeypatch.setattr(
'cylc.flow.scripts.scan.graphql_query',
_graphql_query,
)


@pytest.fixture()
def src_run_dirs(
mock_glbl_cfg: Callable,
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path
) -> Tuple[Path, Path]:
"""Create some workflow source and run dirs for testing.
Source dirs:
<tmp-src>/w1
<tmp-src>/w2
Run dir:
<tmp-run>/w1/run1
"""
tmp_src_path = tmp_path / 'cylc-src'
tmp_run_path = tmp_path / 'cylc-run'
tmp_src_path.mkdir()
tmp_run_path.mkdir()

init_flows(
tmp_run_path=tmp_run_path,
running=('w1/run1',),
tmp_src_path=tmp_src_path,
src=('w1', 'w2')
)
mock_glbl_cfg(
'cylc.flow.workflow_files.glbl_cfg',
f'''
[install]
source dirs = {tmp_src_path}
'''
)
monkeypatch.setattr('cylc.flow.pathutil._CYLC_RUN_DIR', tmp_run_path)

return tmp_src_path, tmp_run_path


def test_install_scan_no_ping(
src_run_dirs: Callable,
capsys: pytest.CaptureFixture,
caplog: pytest.LogCaptureFixture
) -> None:
"""At install, running intances should be reported.
Ping = False case: don't query schedulers.
"""

opts = InstallOptions()
opts.no_ping = True

install_cli(opts, reg='w1')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w1/run2') in out
assert WF_ACTIVE_MSG.format(wf='w1') in out
# Empty contact file faked with "touch":
assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text

install_cli(opts, reg='w2')
out = capsys.readouterr().out
assert WF_ACTIVE_MSG.format(wf='w2') not in out
assert INSTALLED_MSG.format(wfrun='w2/run1') in out


def test_install_scan_ping(
src_run_dirs: Callable,
capsys: pytest.CaptureFixture,
caplog: pytest.LogCaptureFixture,
patch_graphql_query: Callable
) -> None:
"""At install, running intances should be reported.
Ping = True case: but mock scan's scheduler query method.
"""
opts = InstallOptions()
opts.no_ping = False

install_cli(opts, reg='w1')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w1/run2') in out
assert WF_ACTIVE_MSG.format(wf='w1') in out
assert scan.FLOW_STATE_SYMBOLS["running"] in out
# Empty contact file faked with "touch":
assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text

install_cli(opts, reg='w2')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w2/run1') in out
assert WF_ACTIVE_MSG.format(wf='w2') not in out
26 changes: 18 additions & 8 deletions tests/integration/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@
INSTALL = Path(WorkflowFiles.Install.DIRNAME)


def init_flows(tmp_path, running=None, registered=None, un_registered=None):
def init_flows(tmp_run_path=None, running=None, registered=None,
un_registered=None, tmp_src_path=None, src=None):
"""Create some dummy workflows for scan to discover.
Assume "run1, run2, ..., runN" structure if flow name constains "run".
Optionally create workflow source dirs in a give location too.
"""
def make_registered(name, running=False):
run_d = Path(tmp_path, name)
run_d = Path(tmp_run_path, name)
run_d.mkdir(parents=True, exist_ok=True)
(run_d / "flow.cylc").touch()
if "run" in name:
root = Path(tmp_path, name).parent
root = Path(tmp_run_path, name).parent
with suppress(FileExistsError):
(root / "runN").symlink_to(run_d, target_is_directory=True)
else:
Expand All @@ -63,12 +66,19 @@ def make_registered(name, running=False):
if running:
(srv_d / CONTACT).touch()

def make_src(name):
src_d = Path(tmp_src_path, name)
src_d.mkdir(parents=True, exist_ok=True)
(src_d / "flow.cylc").touch()

for name in (running or []):
make_registered(name, running=True)
for name in (registered or []):
make_registered(name)
for name in (un_registered or []):
Path(tmp_path, name).mkdir(parents=True, exist_ok=True)
Path(tmp_run_path, name).mkdir(parents=True, exist_ok=True)
for name in (src or []):
make_src(name)


@pytest.fixture(scope='session')
Expand Down Expand Up @@ -157,14 +167,14 @@ def source_dirs(mock_glbl_cfg):
src1 = src / '1'
src1.mkdir()
init_flows(
src1,
registered=('a', 'b/c')
tmp_src_path=src1,
src=('a', 'b/c')
)
src2 = src / '2'
src2.mkdir()
init_flows(
src2,
registered=('d', 'e/f')
tmp_src_path=src2,
src=('d', 'e/f')
)
mock_glbl_cfg(
'cylc.flow.scripts.scan.glbl_cfg',
Expand Down

0 comments on commit 32d2633

Please sign in to comment.