Skip to content

Commit

Permalink
Merge pull request #29 from NREL/pp/duplicate_commands
Browse files Browse the repository at this point in the history
Duplicate commands
  • Loading branch information
ppinchuk authored Sep 29, 2023
2 parents 324acba + 2567432 commit 1b9f51e
Show file tree
Hide file tree
Showing 20 changed files with 509 additions and 279 deletions.
1 change: 1 addition & 0 deletions gaps/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,5 @@ def _main_cb(ctx, verbose):
"""Set the obj and verbose settings of the commands."""
ctx.ensure_object(dict)
ctx.obj["VERBOSE"] = verbose
ctx.obj["PIPELINE_STEP"] = ctx.invoked_subcommand
ctx.max_content_width = 92
16 changes: 16 additions & 0 deletions gaps/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input argument.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : str
Path to the configuration file specified by the
user.
Expand Down Expand Up @@ -260,6 +264,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input above.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : Path
Path to the configuration file specified by the
user.
Expand Down Expand Up @@ -436,6 +444,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input argument.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : str
Path to the configuration file specified by the
user.
Expand Down Expand Up @@ -566,6 +578,10 @@ def __init__(
command_name : str
Name of the command being run. This is equivalent to
the ``name`` input above.
pipeline_step : str
Name of the pipeline step being run. This is often
the same as `command_name`, but can be different if
a pipeline contains duplicate steps.
config_file : Path
Path to the configuration file specified by the
user.
Expand Down
20 changes: 17 additions & 3 deletions gaps/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
_CMD_LIST = [
"from gaps.cli.config import run_with_status_updates",
"from {run_func_module} import {run_func_name}",
'su_args = "{project_dir}", "{command}", "{job_name}"',
'su_args = "{project_dir}", "{pipeline_step}", "{job_name}"',
"run_with_status_updates("
" {run_func_name}, {node_specific_config}, {logging_options}, su_args, "
" {exclude_from_status}"
Expand All @@ -42,6 +42,7 @@
GAPS_SUPPLIED_ARGS = {
"tag",
"command_name",
"pipeline_step",
"config_file",
"project_dir",
"job_name",
Expand All @@ -66,6 +67,9 @@ def __init__(self, ctx, config_file, command_config):
config_file : path-like
Path to input file containing key-value pairs as input to
function.
step : str
Name of step being run. This will be used to key the status
dictionary, so it must be unique to the pipeline.
command_config : `gaps.cli.cli.CLICommandFromFunction`
A command configuration object containing info such as the
command name, run function, pre-processing function,
Expand Down Expand Up @@ -95,6 +99,11 @@ def command_name(self):
"""str: Name of command being run."""
return self.command_config.name

@property
def pipeline_step(self):
"""str: Name of pipeline_step being run."""
return self.ctx.obj.get("PIPELINE_STEP", self.command_name)

@property
def job_name(self):
"""str: Name of job being run."""
Expand Down Expand Up @@ -131,6 +140,7 @@ def preprocess_config(self):
preprocessor_kwargs = {
"config": self.config,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file,
"project_dir": self.project_dir,
"job_name": self.job_name,
Expand Down Expand Up @@ -214,6 +224,7 @@ def set_exclude_from_status(self):
def prepare_context(self):
"""Add required key-val;ue pairs to context object."""
self.ctx.obj["COMMAND_NAME"] = self.command_name
self.ctx.obj["PIPELINE_STEP"] = self.pipeline_step
self.ctx.obj["OUT_DIR"] = self.project_dir
return self

Expand Down Expand Up @@ -245,6 +256,7 @@ def kickoff_jobs(self):
{
"tag": tag,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file.as_posix(),
"project_dir": self.project_dir.as_posix(),
"job_name": job_name,
Expand All @@ -270,7 +282,7 @@ def kickoff_jobs(self):
project_dir=self.project_dir.as_posix(),
logging_options=as_script_str(self.logging_options),
exclude_from_status=as_script_str(self.exclude_from_status),
command=self.command_name,
pipeline_step=self.pipeline_step,
job_name=job_name,
)
cmd = f"python -c {cmd!r}"
Expand Down Expand Up @@ -372,8 +384,10 @@ def run(self):


@click.pass_context
def from_config(ctx, config_file, command_config):
def from_config(ctx, config_file, command_config, pipeline_step=None):
"""Run command from a config file."""
if pipeline_step is not None:
ctx.obj["PIPELINE_STEP"] = pipeline_step
_FromConfig(ctx, config_file, command_config).run()


Expand Down
21 changes: 17 additions & 4 deletions gaps/cli/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,23 @@
----------
pipeline : list of dicts
A list of dictionaries, where each dictionary represents one
step in the pipeline. Each dictionary should have one
key-value pair, where the key is the name of the CLI
command to run, and the value is the path to a config
file containing the configuration for that command.
step in the pipeline. Each dictionary should have one of two
configurations:
- A single key-value pair, where the key is the name of
the CLI command to run, and the value is the path to
a config file containing the configuration for that
command
- Exactly two key-value pairs, where one of the keys is
``"command"``, with a value that points to the name of
a command to execute, while the second key is a _unique_
user-defined name of the pipeline step to execute, with
a value that points to the path to a config file
containing the configuration for the command specified
by the other key. This configuration allows users to
specify duplicate commands as part of their pipeline
execution.
logging : dict, optional
Dictionary containing keyword-argument pairs to pass to
`init_logger <https://tinyurl.com/47hakp7f/>`_. This
Expand Down
8 changes: 5 additions & 3 deletions gaps/cli/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def kickoff_job(ctx, cmd, exec_kwargs):
Path to output directory.
COMMAND_NAME : str
Name of command being run.
PIPELINE_STEP: str
Name of pipeline step being run.
cmd : str
String form of command to kickoff.
Expand Down Expand Up @@ -120,7 +122,7 @@ def _kickoff_local_job(ctx, cmd):
logger.debug("Submitting the following command:\n%s", cmd)
Status.mark_job_as_submitted(
ctx.obj["OUT_DIR"],
command=ctx.obj["COMMAND_NAME"],
pipeline_step=ctx.obj["PIPELINE_STEP"],
job_name=name,
replace=True,
job_attrs={
Expand Down Expand Up @@ -153,7 +155,7 @@ def _kickoff_hpc_job(ctx, cmd, hardware_option, **kwargs):

Status.mark_job_as_submitted(
ctx.obj["OUT_DIR"],
command=ctx.obj["COMMAND_NAME"],
pipeline_step=ctx.obj["PIPELINE_STEP"],
job_name=name,
replace=True,
job_attrs={
Expand All @@ -173,7 +175,7 @@ def _should_run(ctx):
out_dir = ctx.obj["OUT_DIR"]
status = Status.retrieve_job_status(
out_dir,
command=ctx.obj["COMMAND_NAME"],
pipeline_step=ctx.obj["PIPELINE_STEP"],
job_name=name,
subprocess_manager=ctx.obj.get("MANAGER"),
)
Expand Down
10 changes: 4 additions & 6 deletions gaps/cli/reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ def _reset_status(ctx, directory, force=False, after_step=None):
)
warn(msg, gapsWarning)
continue
logger.info(
"Resetting status for all commands after %r", after_step
)
logger.info("Resetting status for all steps after %r", after_step)
status.update_from_all_job_files()
status.reset_after(after_step)
status.dump()
Expand Down Expand Up @@ -89,9 +87,9 @@ def reset_command():
param_decls=["--after-step", "-a"],
multiple=False,
default=None,
help="Reset pipeline starting after the given command. The status "
"of this command will remain unaffected, but the status of "
"commands following it will be reset completely.",
help="Reset pipeline starting after the given pipeline step. The "
"status of this step will remain unaffected, but the status of "
"steps following it will be reset completely.",
),
]
return _WrappedCommand(
Expand Down
34 changes: 17 additions & 17 deletions gaps/cli/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ def _color_string(string):
return string


def _print_intro(print_folder, commands, status, monitor_pid):
"""Print intro including project folder and command/status filters."""
def _print_intro(print_folder, steps, status, monitor_pid):
"""Print intro including project folder and steps/status filters."""
extras = []
commands = ", ".join(commands or [])
if commands:
extras.append(f"commands={commands}")
steps = ", ".join(steps or [])
if steps:
extras.append(f"steps={steps}")
status = ", ".join(status or [])
if status:
extras.append(f"status={status}")
Expand Down Expand Up @@ -273,7 +273,7 @@ def _print_disclaimer():
def _color_print(
df,
print_folder,
commands,
steps,
status,
walltime,
runtime_stats,
Expand All @@ -283,7 +283,7 @@ def _color_print(
):
"""Color the status portion of the print out."""

_print_intro(print_folder, commands, status, monitor_pid)
_print_intro(print_folder, steps, status, monitor_pid)
_print_df(df)
_print_job_status_statistics(df)

Expand All @@ -296,15 +296,15 @@ def _color_print(
_print_disclaimer()


def main_monitor(folder, commands, status, include, recursive):
def main_monitor(folder, pipe_steps, status, include, recursive):
"""Run the appropriate monitor functions for a folder.
Parameters
----------
folder : path-like
Path to folder for which to print status.
commands : container of str
Container with the commands to display.
pipe_steps : container of str
Container with the pipeline steps to display.
status : container of str
Container with the statuses to display.
include : container of str
Expand All @@ -329,7 +329,7 @@ def main_monitor(folder, commands, status, include, recursive):

include_with_runtime = list(include) + [StatusField.RUNTIME_SECONDS]
df = pipe_status.as_df(
commands=commands, include_cols=include_with_runtime
pipe_steps=pipe_steps, include_cols=include_with_runtime
)
if status:
df = _filter_df_for_status(df, status)
Expand All @@ -348,7 +348,7 @@ def main_monitor(folder, commands, status, include, recursive):
_color_print(
df[list(df.columns)[:-1]].copy(),
directory.name,
commands,
pipe_steps,
status,
walltime,
runtime_stats,
Expand All @@ -375,13 +375,13 @@ def status_command():
type=click.Path(exists=True),
),
click.Option(
param_decls=["--commands", "-c"],
param_decls=["--pipe_steps", "-ps"],
multiple=True,
default=None,
help="Filter status for the given command(s). Multiple commands "
"can be specified by repeating this option (e.g. :code:`-c "
"command1 -c command2 ...`) By default, the status of all "
"commands is displayed.",
help="Filter status for the given pipeline step(s). Multiple "
"steps can be specified by repeating this option (e.g. :code:`-ps "
"step1 -ps step2 ...`) By default, the status of all "
"pipeline steps is displayed.",
),
click.Option(
param_decls=["--status", "-s"],
Expand Down
11 changes: 8 additions & 3 deletions gaps/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def add_job(
"""
cls.mark_job_as_submitted(
status_dir=status_dir,
command=module,
pipeline_step=module,
job_name=job_name,
replace=replace,
job_attrs=job_attrs,
Expand All @@ -147,7 +147,7 @@ def make_job_file(status_dir, module, job_name, attrs):
"""
gaps.status.Status.make_single_job_file(
status_dir=status_dir,
command=module,
pipeline_step=module,
job_name=job_name,
attrs=attrs,
)
Expand Down Expand Up @@ -215,7 +215,7 @@ def __init__(self, pipeline, monitor=True, verbose=False):
represents the command config filepath to substitute into
the :attr:`CMD_BASE` string.
You must also call :meth:`_init_status` in the initializer.
You must also call `self._init_status()` in the initializer.
If you want logging outputs during the submit step, make sure
to init the "gaps" logger.
Expand Down Expand Up @@ -281,6 +281,11 @@ def _submit(self, step):
if stderr:
logger.warning("Subprocess received stderr: \n%s", stderr)

def _get_command_config(self, step):
"""Get the (command, config) key pair."""
pipe_step = self._run_list[step]
return pipe_step.command, pipe_step.config_path

def _get_cmd(self, command, f_config):
"""Get the python cli call string."""

Expand Down
Loading

0 comments on commit 1b9f51e

Please sign in to comment.