Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate commands #29

Merged
merged 9 commits into from
Sep 29, 2023
1 change: 1 addition & 0 deletions gaps/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,5 @@ def _main_cb(ctx, verbose):
"""Set the obj and verbose settings of the commands."""
ctx.ensure_object(dict)
ctx.obj["VERBOSE"] = verbose
ctx.obj["PIPELINE_STEP"] = ctx.invoked_subcommand
ctx.max_content_width = 92
16 changes: 16 additions & 0 deletions gaps/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input argument.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : str
Path to the configuration file specified by the
user.
Expand Down Expand Up @@ -260,6 +264,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input above.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : Path
Path to the configuration file specified by the
user.
Expand Down Expand Up @@ -436,6 +444,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input argument.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : str
Path to the configuration file specified by the
user.
Expand Down Expand Up @@ -566,6 +578,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input above.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : Path
Path to the configuration file specified by the
user.
Expand Down
20 changes: 17 additions & 3 deletions gaps/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
_CMD_LIST = [
"from gaps.cli.config import run_with_status_updates",
"from {run_func_module} import {run_func_name}",
'su_args = "{project_dir}", "{command}", "{job_name}"',
'su_args = "{project_dir}", "{pipeline_step}", "{job_name}"',
"run_with_status_updates("
" {run_func_name}, {node_specific_config}, {logging_options}, su_args, "
" {exclude_from_status}"
Expand All @@ -42,6 +42,7 @@
GAPS_SUPPLIED_ARGS = {
"tag",
"command_name",
"pipeline_step",
"config_file",
"project_dir",
"job_name",
Expand All @@ -66,6 +67,9 @@ def __init__(self, ctx, config_file, command_config):
config_file : path-like
Path to input file containing key-value pairs as input to
function.
step : str
Name of step being run. This will be used to key the status
dictionary, so it must be unique to the pipeline.
command_config : `gaps.cli.cli.CLICommandFromFunction`
A command configuration object containing info such as the
command name, run function, pre-processing function,
Expand Down Expand Up @@ -95,6 +99,11 @@ def command_name(self):
"""str: Name of command being run."""
return self.command_config.name

@property
def pipeline_step(self):
"""str: Name of pipeline_step being run."""
return self.ctx.obj.get("PIPELINE_STEP", self.command_name)

@property
def job_name(self):
"""str: Name of job being run."""
Expand Down Expand Up @@ -131,6 +140,7 @@ def preprocess_config(self):
preprocessor_kwargs = {
"config": self.config,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file,
"project_dir": self.project_dir,
"job_name": self.job_name,
Expand Down Expand Up @@ -214,6 +224,7 @@ def set_exclude_from_status(self):
def prepare_context(self):
"""Add required key-val;ue pairs to context object."""
self.ctx.obj["COMMAND_NAME"] = self.command_name
self.ctx.obj["PIPELINE_STEP"] = self.pipeline_step
self.ctx.obj["OUT_DIR"] = self.project_dir
return self

Expand Down Expand Up @@ -245,6 +256,7 @@ def kickoff_jobs(self):
{
"tag": tag,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file.as_posix(),
"project_dir": self.project_dir.as_posix(),
"job_name": job_name,
Expand All @@ -270,7 +282,7 @@ def kickoff_jobs(self):
project_dir=self.project_dir.as_posix(),
logging_options=as_script_str(self.logging_options),
exclude_from_status=as_script_str(self.exclude_from_status),
command=self.command_name,
pipeline_step=self.pipeline_step,
job_name=job_name,
)
cmd = f"python -c {cmd!r}"
Expand Down Expand Up @@ -372,8 +384,10 @@ def run(self):


@click.pass_context
def from_config(ctx, config_file, command_config):
def from_config(ctx, config_file, command_config, pipeline_step=None):
"""Run command from a config file."""
if pipeline_step is not None:
ctx.obj["PIPELINE_STEP"] = pipeline_step
_FromConfig(ctx, config_file, command_config).run()


Expand Down
21 changes: 17 additions & 4 deletions gaps/cli/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,23 @@
----------
pipeline : list of dicts
A list of dictionaries, where each dictionary represents one
step in the pipeline. Each dictionary should have one
key-value pair, where the key is the name of the CLI
command to run, and the value is the path to a config
file containing the configuration for that command.
step in the pipeline. Each dictionary should have one of two
configurations:

- A single key-value pair, where the key is the name of
the CLI command to run, and the value is the path to
a config file containing the configuration for that
command
- Exactly two key-value pairs, where one of the keys is
``"command"``, with a value that points to the name of
a command to execute, while the second key is a _unique_
user-defined name of the pipeline step to execute, with
a value that points to the path to a config file
containing the configuration for the command specified
by the other key. This configuration allows users to
specify duplicate commands as part of their pipeline
execution.

logging : dict, optional
Dictionary containing keyword-argument pairs to pass to
`init_logger <https://tinyurl.com/47hakp7f/>`_. This
Expand Down
8 changes: 5 additions & 3 deletions gaps/cli/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def kickoff_job(ctx, cmd, exec_kwargs):
Path to output directory.
COMMAND_NAME : str
Name of command being run.
PIPELINE_STEP: str
Name of pipeline step being run.

cmd : str
String form of command to kickoff.
Expand Down Expand Up @@ -120,7 +122,7 @@ def _kickoff_local_job(ctx, cmd):
logger.debug("Submitting the following command:\n%s", cmd)
Status.mark_job_as_submitted(
ctx.obj["OUT_DIR"],
command=ctx.obj["COMMAND_NAME"],
pipeline_step=ctx.obj["PIPELINE_STEP"],
job_name=name,
replace=True,
job_attrs={
Expand Down Expand Up @@ -153,7 +155,7 @@ def _kickoff_hpc_job(ctx, cmd, hardware_option, **kwargs):

Status.mark_job_as_submitted(
ctx.obj["OUT_DIR"],
command=ctx.obj["COMMAND_NAME"],
pipeline_step=ctx.obj["PIPELINE_STEP"],
job_name=name,
replace=True,
job_attrs={
Expand All @@ -173,7 +175,7 @@ def _should_run(ctx):
out_dir = ctx.obj["OUT_DIR"]
status = Status.retrieve_job_status(
out_dir,
command=ctx.obj["COMMAND_NAME"],
pipeline_step=ctx.obj["PIPELINE_STEP"],
job_name=name,
subprocess_manager=ctx.obj.get("MANAGER"),
)
Expand Down
10 changes: 4 additions & 6 deletions gaps/cli/reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ def _reset_status(ctx, directory, force=False, after_step=None):
)
warn(msg, gapsWarning)
continue
logger.info(
"Resetting status for all commands after %r", after_step
)
logger.info("Resetting status for all steps after %r", after_step)
status.update_from_all_job_files()
status.reset_after(after_step)
status.dump()
Expand Down Expand Up @@ -89,9 +87,9 @@ def reset_command():
param_decls=["--after-step", "-a"],
multiple=False,
default=None,
help="Reset pipeline starting after the given command. The status "
"of this command will remain unaffected, but the status of "
"commands following it will be reset completely.",
help="Reset pipeline starting after the given pipeline step. The "
"status of this step will remain unaffected, but the status of "
"steps following it will be reset completely.",
),
]
return _WrappedCommand(
Expand Down
34 changes: 17 additions & 17 deletions gaps/cli/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ def _color_string(string):
return string


def _print_intro(print_folder, commands, status, monitor_pid):
"""Print intro including project folder and command/status filters."""
def _print_intro(print_folder, steps, status, monitor_pid):
"""Print intro including project folder and steps/status filters."""
extras = []
commands = ", ".join(commands or [])
if commands:
extras.append(f"commands={commands}")
steps = ", ".join(steps or [])
if steps:
extras.append(f"steps={steps}")
status = ", ".join(status or [])
if status:
extras.append(f"status={status}")
Expand Down Expand Up @@ -273,7 +273,7 @@ def _print_disclaimer():
def _color_print(
df,
print_folder,
commands,
steps,
status,
walltime,
runtime_stats,
Expand All @@ -283,7 +283,7 @@ def _color_print(
):
"""Color the status portion of the print out."""

_print_intro(print_folder, commands, status, monitor_pid)
_print_intro(print_folder, steps, status, monitor_pid)
_print_df(df)
_print_job_status_statistics(df)

Expand All @@ -296,15 +296,15 @@ def _color_print(
_print_disclaimer()


def main_monitor(folder, commands, status, include, recursive):
def main_monitor(folder, pipe_steps, status, include, recursive):
"""Run the appropriate monitor functions for a folder.

Parameters
----------
folder : path-like
Path to folder for which to print status.
commands : container of str
Container with the commands to display.
pipe_steps : container of str
Container with the pipeline steps to display.
status : container of str
Container with the statuses to display.
include : container of str
Expand All @@ -329,7 +329,7 @@ def main_monitor(folder, commands, status, include, recursive):

include_with_runtime = list(include) + [StatusField.RUNTIME_SECONDS]
df = pipe_status.as_df(
commands=commands, include_cols=include_with_runtime
pipe_steps=pipe_steps, include_cols=include_with_runtime
)
if status:
df = _filter_df_for_status(df, status)
Expand All @@ -348,7 +348,7 @@ def main_monitor(folder, commands, status, include, recursive):
_color_print(
df[list(df.columns)[:-1]].copy(),
directory.name,
commands,
pipe_steps,
status,
walltime,
runtime_stats,
Expand All @@ -375,13 +375,13 @@ def status_command():
type=click.Path(exists=True),
),
click.Option(
param_decls=["--commands", "-c"],
param_decls=["--pipe_steps", "-ps"],
multiple=True,
default=None,
help="Filter status for the given command(s). Multiple commands "
"can be specified by repeating this option (e.g. :code:`-c "
"command1 -c command2 ...`) By default, the status of all "
"commands is displayed.",
help="Filter status for the given pipeline step(s). Multiple "
"steps can be specified by repeating this option (e.g. :code:`-ps "
"step1 -ps step2 ...`) By default, the status of all "
"pipeline steps is displayed.",
),
click.Option(
param_decls=["--status", "-s"],
Expand Down
11 changes: 8 additions & 3 deletions gaps/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def add_job(
"""
cls.mark_job_as_submitted(
status_dir=status_dir,
command=module,
pipeline_step=module,
job_name=job_name,
replace=replace,
job_attrs=job_attrs,
Expand All @@ -147,7 +147,7 @@ def make_job_file(status_dir, module, job_name, attrs):
"""
gaps.status.Status.make_single_job_file(
status_dir=status_dir,
command=module,
pipeline_step=module,
job_name=job_name,
attrs=attrs,
)
Expand Down Expand Up @@ -215,7 +215,7 @@ def __init__(self, pipeline, monitor=True, verbose=False):
represents the command config filepath to substitute into
the :attr:`CMD_BASE` string.

You must also call :meth:`_init_status` in the initializer.
You must also call `self._init_status()` in the initializer.
If you want logging outputs during the submit step, make sure
to init the "gaps" logger.

Expand Down Expand Up @@ -281,6 +281,11 @@ def _submit(self, step):
if stderr:
logger.warning("Subprocess received stderr: \n%s", stderr)

def _get_command_config(self, step):
"""Get the (command, config) key pair."""
pipe_step = self._run_list[step]
return pipe_step.command, pipe_step.config_path

def _get_cmd(self, command, f_config):
"""Get the python cli call string."""

Expand Down
Loading
Loading