Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recursive pipeline submission #28

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 54 additions & 8 deletions gaps/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,60 @@ def template_pipeline_config(commands):


@click.pass_context
def pipeline(ctx, config_file, cancel, monitor, background=False):
def pipeline(
ctx, config_file, cancel, monitor, background=False, recursive=False
):
"""Execute multiple steps in an analysis pipeline."""

if recursive:
_submit_recursive_pipelines(ctx, cancel, monitor, background)
return

if config_file is None:
config_file = [
fp
for fp in Path(".").glob("*")
if fp.is_file() and "pipeline" in fp.name
]
if len(config_file) != 1:
config_files = _find_pipeline_config_files(Path("."))
if len(config_files) != 1:
msg = (
f"Could not determine config file - multiple (or no) files "
f" detected with 'pipeline' in the name exist: {config_file}"
)
raise gapsExecutionError(msg)

config_file = config_file[0]
config_file = config_files[0]

init_logging_from_config_file(config_file, background=background)
_run_pipeline(ctx, config_file, cancel, monitor, background)


def _submit_recursive_pipelines(ctx, cancel, monitor, background):
"""Submit pipelines in all recursive subdirectories."""
start_dir = Path(".")
for ind, sub_dir in enumerate(start_dir.glob("**/")):
config_files = _find_pipeline_config_files(sub_dir)
if sub_dir.name == Status.HIDDEN_SUB_DIR:
continue

if len(config_files) > 1:
msg = (
f"Could not determine config file - multiple files detected "
f"with 'pipeline' in the name in the {str(sub_dir)!r} "
"directory!"
)
warn(msg, gapsWarning)
continue
if len(config_files) == 0:
continue

init_logging_from_config_file(config_files[0], background=ind == 0)
_run_pipeline(ctx, config_files[0], cancel, monitor, background)


def _find_pipeline_config_files(directory):
"""Find all files matching *pipeline* in directory"""
return [fp for fp in Path(directory).glob("*pipeline*") if fp.is_file()]


def _run_pipeline(ctx, config_file, cancel, monitor, background):
"""Run a GAPs pipeline for an existing config file."""

if cancel:
Pipeline.cancel_all(config_file)
Expand Down Expand Up @@ -128,6 +163,17 @@ def pipeline_command(template_config):
help="Flag to monitor pipeline jobs continuously. Default is not "
"to monitor (kick off jobs and exit).",
),
click.Option(
param_decls=["--recursive", "-r"],
is_flag=True,
help="Flag to recursively submit pipelines, starting from the "
"current directory and checking every sub-directory therein. The "
"`-c` option will be *completely ignored* if you use this option. "
"Instead, the code will check every sub-directory for exactly one "
"file with the word `pipeline` in it. If found, that file is "
"assumed to be the pipeline config and is used to kick off the "
"pipeline. In any other case, the directory is skipped.",
),
]
if _can_run_background():
params += [
Expand Down
83 changes: 67 additions & 16 deletions tests/cli/test_cli_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import os
import json
import shutil
from pathlib import Path

import click
Expand Down Expand Up @@ -34,19 +35,6 @@
SUCCESS_CONFIG = {"test": "success"}


@pytest.fixture
def runnable_pipeline(tmp_path):
"""Add run to pipeline commands for test only."""
try:
Pipeline.COMMANDS["run"] = run
pipe_config_fp = tmp_path / "config_pipe.json"
with open(pipe_config_fp, "w") as config_file:
json.dump(SAMPLE_CONFIG, config_file)
yield
finally:
Pipeline.COMMANDS.pop("run")


@pytest.fixture
def pipe_config_fp(tmp_path):
"""Add a sample pipeline config to a temp directory."""
Expand All @@ -57,6 +45,16 @@ def pipe_config_fp(tmp_path):
yield pipe_config_fp


@pytest.fixture
def runnable_pipeline(pipe_config_fp):
"""Add run to pipeline commands for test only."""
try:
Pipeline.COMMANDS["run"] = run
yield pipe_config_fp
finally:
Pipeline.COMMANDS.pop("run")


@click.command()
@click.option("--config", "-c", default=".", help="Path to config file")
def run(config):
Expand Down Expand Up @@ -95,7 +93,6 @@ def test_pipeline_command(
tmp_path,
cli_runner,
runnable_pipeline,
pipe_config_fp,
assert_message_was_logged,
):
"""Test the pipeline_command creation."""
Expand All @@ -108,10 +105,10 @@ def test_pipeline_command(
assert "background" in [opt.name for opt in pipe.params]
else:
assert "background" not in [opt.name for opt in pipe.params]
cli_runner.invoke(pipe, ["-c", pipe_config_fp.as_posix()] + extra_args)
cli_runner.invoke(pipe, ["-c", runnable_pipeline.as_posix()] + extra_args)

if not extra_args:
cli_runner.invoke(pipe, ["-c", pipe_config_fp.as_posix()])
cli_runner.invoke(pipe, ["-c", runnable_pipeline.as_posix()])
else:
assert Status(tmp_path).get(StatusField.MONITOR_PID) == os.getpid()

Expand Down Expand Up @@ -249,5 +246,59 @@ def test_pipeline_command_with_running_pid(
)


def test_pipeline_command_recursive(
tmp_cwd, cli_runner, runnable_pipeline, assert_message_was_logged
):
"""Test the pipeline command with recursive directories."""

target_config_fp = tmp_cwd / "config_run.json"
target_config_fp.touch()
runnable_pipeline.rename(runnable_pipeline.parent / "config_pipeline.json")

test_dirs = [
tmp_cwd,
tmp_cwd / "test_run_2",
tmp_cwd / "test_run_2" / "test_run_3",
tmp_cwd / "test_run_4",
tmp_cwd / "test_run_5",
tmp_cwd / "test_run_6",
]

for prev_dir, next_dir in zip(test_dirs[0:-1], test_dirs[1:]):
shutil.copytree(prev_dir, next_dir)

for test_dir in test_dirs:
expected_out_fp = test_dir / "config_run.json"
assert expected_out_fp.exists()
with open(expected_out_fp, "r") as config:
assert not config.read()

shutil.copy(
test_dirs[-2] / "config_pipeline.json",
test_dirs[-2] / "config_pipeline2.json",
)
(test_dirs[-1] / "config_pipeline.json").unlink()

pipe = pipeline_command({})
cli_runner.invoke(pipe, ["-r"])
cli_runner.invoke(pipe, ["-r"])

for test_dir in test_dirs[:-2]:
assert_message_was_logged(test_dir.name, "INFO")
with open(test_dir / "config_run.json", "r") as config:
assert json.load(config) == SUCCESS_CONFIG

for test_dir in test_dirs[-2:]:
with open(test_dir / "config_run.json", "r") as config:
assert not config.read()

assert_message_was_logged("Pipeline job", "INFO")
assert_message_was_logged("is complete.", "INFO")
assert_message_was_logged(
"Could not determine config file - multiple files detected", "WARNING"
)
assert_message_was_logged(test_dirs[-2].name, "WARNING")


if __name__ == "__main__":
pytest.main(["-q", "--show-capture=all", Path(__file__), "-rapP"])
Loading