Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add list-outputs #248

Merged
merged 21 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cromshell/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .cost import command as cost
from .counts import command as counts
from .list import command as list
from .list_outputs import command as list_outputs
from .logs import command as logs
from .metadata import command as metadata
from .slim_metadata import command as slim_metadata
Expand Down Expand Up @@ -168,6 +169,7 @@ def version():
main_entry.add_command(update_server.main)
main_entry.add_command(timing.main)
main_entry.add_command(list.main)
main_entry.add_command(list_outputs.main)


if __name__ == "__main__":
Expand Down
Empty file.
220 changes: 220 additions & 0 deletions src/cromshell/list_outputs/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import logging

import click
import requests

import cromshell.utilities.http_utils as http_utils
import cromshell.utilities.io_utils as io_utils
from cromshell.metadata import command as metadata_command
from cromshell.utilities import command_setup_utils

LOGGER = logging.getLogger(__name__)


@click.command(name="list-outputs")
@click.argument("workflow_ids", required=True, nargs=-1)
@click.option(
"-d",
"--detailed",
is_flag=True,
default=False,
help="Get the output for a workflow at the task level",
)
@click.option(
"-j",
"--json-summary",
is_flag=True,
default=False,
help="Print a json summary of outputs, including non-file types.",
)
@click.pass_obj
def main(config, workflow_ids, detailed, json_summary):
"""List all output files produced by a workflow."""

LOGGER.info("list-outputs")

return_code = 0

for workflow_id in workflow_ids:
command_setup_utils.resolve_workflow_id_and_server(
workflow_id=workflow_id, cromshell_config=config
)

if not detailed:
SHuang-Broad marked this conversation as resolved.
Show resolved Hide resolved
workflow_outputs = get_workflow_level_outputs(config).get("outputs")

if json_summary:
io_utils.pretty_print_json(format_json=workflow_outputs)
else:
print_file_like_value_in_dict(
SHuang-Broad marked this conversation as resolved.
Show resolved Hide resolved
outputs_metadata=workflow_outputs,
indent=False,
)
else:
task_outputs = get_task_level_outputs(config)

if json_summary:
io_utils.pretty_print_json(format_json=task_outputs)
else:
print_task_level_outputs(task_outputs)

return return_code


def get_workflow_level_outputs(config) -> dict:
"""Get the workflow level outputs from the workflow outputs

Args:
config (dict): The cromshell config object
"""

requests_out = requests.get(
f"{config.cromwell_api_workflow_id}/outputs",
timeout=config.requests_connect_timeout,
verify=config.requests_verify_certs,
headers=http_utils.generate_headers(config),
)

if requests_out.ok:
check_for_empty_output(requests_out.json().get("outputs"), config.workflow_id)
return requests_out.json()
else:
http_utils.check_http_request_status_code(
short_error_message="Failed to retrieve outputs for "
f"workflow: {config.workflow_id}",
response=requests_out,
# Raising exception is set false to allow
# command to retrieve outputs of remaining workflows.
raise_exception=False,
)


def get_task_level_outputs(config) -> dict:
"""Get the task level outputs from the workflow metadata

Args:
config (dict): The cromshell config object
"""
# Get metadata
formatted_metadata_parameter = metadata_command.format_metadata_params(
list_of_keys=config.METADATA_KEYS_TO_OMIT,
exclude_keys=True,
expand_subworkflows=True,
)

workflow_metadata = metadata_command.get_workflow_metadata(
meta_params=formatted_metadata_parameter,
api_workflow_id=config.cromwell_api_workflow_id,
timeout=config.requests_connect_timeout,
verify_certs=config.requests_verify_certs,
headers=http_utils.generate_headers(config),
)

return filter_outputs_from_workflow_metadata(workflow_metadata)


def filter_outputs_from_workflow_metadata(workflow_metadata: dict) -> dict:
"""Get the outputs from the workflow metadata

Args:
workflow_metadata (dict): The workflow metadata
"""
calls_metadata = workflow_metadata["calls"]
output_metadata = {}
extract_task_key = "outputs"

for call, index_list in calls_metadata.items():
if "subWorkflowMetadata" in calls_metadata[call][0]:
output_metadata[call] = []
for scatter in calls_metadata[call]:
output_metadata[call].append(
filter_outputs_from_workflow_metadata(
scatter["subWorkflowMetadata"]
)
)
else:
output_metadata[call] = []
for index in index_list:
output_metadata[call].append(index.get(extract_task_key))
SHuang-Broad marked this conversation as resolved.
Show resolved Hide resolved

check_for_empty_output(output_metadata, workflow_metadata["id"])

return output_metadata


def print_task_level_outputs(output_metadata: dict) -> None:
"""Print the outputs from the workflow metadata
output_metadata: {call_name:[index1{output_name: outputvalue}, index2{...}, ...], call_name:[], ...}

Args:
output_metadata (dict): The output metadata from the workflow
"""
for call, index_list in output_metadata.items():
print(call)
for call_index in index_list:
if call_index is not None:
print_file_like_value_in_dict(outputs_metadata=call_index, indent=True)


def print_file_like_value_in_dict(outputs_metadata: dict, indent: bool) -> None:
"""Print the file like values in the output metadata dictionary

Args:
outputs_metadata (dict): The output metadata
indent (bool): Whether to indent the output
"""

for output_name, output_value in outputs_metadata.items():
if isinstance(output_value, str):
print_output_name_and_file(output_name, output_value, indent=indent)
elif isinstance(output_value, list):
for output_value_item in output_value:
print_output_name_and_file(
output_name, output_value_item, indent=indent
)


def print_output_name_and_file(
output_name: str, output_value: str, indent: bool = True
) -> None:
"""Print the task name and the file name

Args:
output_name (str): The task output name
output_value (str): The task output value
indent (bool): Whether to indent the output"""

i = "\t" if indent else ""

if isinstance(output_value, str):
if is_path_or_url_like(output_value):
print(f"{i}{output_name}: {output_value}")


def is_path_or_url_like(in_string: str) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about future-proofing us a little here: how about we classify the in_string?
I mean classify it as a local regular path, GCS path, or Azure path, or not a path at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Classification would be useful in a command like fetch-outputs where we need to know the classification before downloading the output. Since we're not downloading anything in this function, we'd only be interested in knowing whether its a file.

"""Check if the string is a path or url

Args:
in_string (str): The string to check for path or url like-ness
"""
if (
in_string.startswith("gs://")
or in_string.startswith("/")
or in_string.startswith("http://")
or in_string.startswith("https://")
):
return True
else:
return False


def check_for_empty_output(workflow_outputs: dict, workflow_id: str) -> None:
"""Check if the workflow outputs are empty

Args:
cromwell_outputs (dict): Dictionary of workflow outputs
:param workflow_id: The workflow id
"""
if not workflow_outputs:
LOGGER.error(f"No outputs found for workflow: {workflow_id}")
raise Exception(f"No outputs found for workflow: {workflow_id}")
12 changes: 12 additions & 0 deletions src/cromshell/utilities/command_setup_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,17 @@ def resolve_workflow_id_and_server(workflow_id: str, cromshell_config) -> str:
http_utils.set_and_check_cromwell_server(
config=cromshell_config, workflow_id=resolved_workflow_id
)
set_workflow_id(workflow_id=resolved_workflow_id, cromshell_config=cromshell_config)

return resolved_workflow_id


def set_workflow_id(workflow_id: str, cromshell_config) -> None:
SHuang-Broad marked this conversation as resolved.
Show resolved Hide resolved
"""
Sets the workflow id in the config object

:param workflow_id: workflow UUID
:param cromshell_config:
:return: None
"""
cromshell_config.workflow_id = workflow_id
1 change: 1 addition & 0 deletions src/cromshell/utilities/cromshellconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
]
CROMWELL_API_STRING = "/api/workflows/v1"
WOMTOOL_API_STRING = "/api/womtool/v1"
workflow_id = None
# Concatenate the cromwell url, api string, and workflow ID. Set in subcommand.
cromwell_api_workflow_id = None
# Defaults for variables will be set after functions have been defined
Expand Down
99 changes: 99 additions & 0 deletions tests/integration/test_list_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from pathlib import Path

import pytest

from tests.integration import utility_test_functions

workflows_path = Path(__file__).parents[1].joinpath("workflows/")


class TestListOutputs:
@pytest.mark.parametrize(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the pytest parameters below, I'm using the same workflow to test different options of list-output. This runs the same workflow multiple times (redundant). I'm thinking of maybe running the workflow once, saving the workflow id in a global variable then have a different function that would run through the different list-output command

"wdl, json_file, options, output_template",
[
(
"tests/workflows/helloWorld.wdl",
"tests/workflows/helloWorld.json",
None,
[
"HelloWorld.output_file: /cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout",
"",
],
),
(
"tests/workflows/helloWorld.wdl",
"tests/workflows/helloWorld.json",
["-d"],
[
"HelloWorld.HelloWorldTask",
"\toutput_file: /cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout",
"",
],
),
(
"tests/workflows/helloWorld.wdl",
"tests/workflows/helloWorld.json",
["-j"],
[
"{",
' "HelloWorld.output_file": "/cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout"',
"}",
"",
],
),
(
"tests/workflows/helloWorld.wdl",
"tests/workflows/helloWorld.json",
["-j", "-d"],
[
"{",
' "HelloWorld.HelloWorldTask": [',
" {",
' "output_file": "/cromwell-executions/HelloWorld/<workflow-id>/call-HelloWorldTask/execution/stdout"',
" }",
" ]",
"}",
"",
],
),
],
)
def test_list_outputs(
self,
local_cromwell_url: str,
wdl: str,
json_file: str,
options: list,
output_template: list,
ansi_escape,
):
# submit workflow
test_workflow_id = utility_test_functions.submit_workflow(
local_cromwell_url=local_cromwell_url,
wdl=wdl,
json_file=json_file,
exit_code=0,
)

utility_test_functions.wait_for_workflow_completion(
test_workflow_id=test_workflow_id
)

# run list-outputs
status_result = utility_test_functions.run_cromshell_command(
command=["list-outputs", test_workflow_id],
exit_code=0,
subcommand_options=options,
)

status_result_per_line = status_result.stdout.split("\n")

workflow_outputs = [
sub.replace("<workflow-id>", test_workflow_id) for sub in output_template
]

print("Print workflow list-outputs results:")
for line in status_result_per_line:
print(line)

assert status_result_per_line == workflow_outputs
13 changes: 11 additions & 2 deletions tests/integration/utility_test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
from cromshell.utilities import cromshellconfig


def run_cromshell_command(command: list, exit_code: int):
def run_cromshell_command(
command: list, exit_code: int, subcommand_options: list = None
):
"""
Run cromshell alias using CliRunner and assert job is successful

:param subcommand_options: The options to pass to the subcommand
:param command: The subcommand, options, and arguments in list form e.g.
[
"alias",
Expand All @@ -25,12 +28,18 @@ def run_cromshell_command(command: list, exit_code: int):
:return: results from execution
"""

if subcommand_options:
command_with_options = command[:1] + subcommand_options + command[1:]
else:
command_with_options = command

runner = CliRunner(mix_stderr=False)
# The absolute path will be passed to the invoke command because
# the test is being run in temp directory created by CliRunner.
with runner.isolated_filesystem():
result = runner.invoke(cromshell, command)
result = runner.invoke(cromshell, command_with_options)
assert result.exit_code == exit_code, (
f"\nCOMMAND:\n{command_with_options}"
f"\nSTDOUT:\n{result.stdout}"
f"\nSTDERR:\n{result.stderr}"
f"\nExceptions:\n{result.exception}"
Expand Down
Loading