Skip to content

Commit

Permalink
Refactor redundant code
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Dec 18, 2024
1 parent 44e2c9a commit d7d702e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 91 deletions.
48 changes: 9 additions & 39 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@
OnlyNonNullTransformer,
ValueFromTransformer,
)
from streamflow.cwl.utils import LoadListing, SecondaryFile, resolve_dependencies
from streamflow.cwl.utils import (
LoadListing,
SecondaryFile,
process_inner_process,
resolve_dependencies,
)
from streamflow.cwl.workflow import CWLWorkflow
from streamflow.deployment.utils import get_binding_config
from streamflow.log_handler import logger
Expand Down Expand Up @@ -2112,44 +2117,9 @@ def _translate_workflow_step(
]

# Process inner element
run_command = cwl_element.run
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = utils.get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = utils.get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
inner_context = context
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
utils.get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
inner_context = {**context, **{"version": run_command.cwlVersion}}
run_command, inner_cwl_name_prefix, inner_context = process_inner_process(
cwl_element, step_name, name_prefix, cwl_name_prefix, context
)

# Handle optional input variables
default_ports = {}
Expand Down
49 changes: 49 additions & 0 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import cwl_utils.expression
import cwl_utils.parser
import cwl_utils.parser.utils
from cwl_utils.parser.cwl_v1_2_utils import CONTENT_LIMIT

from streamflow.core.context import StreamFlowContext
Expand Down Expand Up @@ -663,6 +664,54 @@ class LoadListing(Enum):
deep_listing = 2


def process_inner_process(
cwl_element: cwl_utils.parser.WorkflowStep,
cwl_name_prefix: str,
step_name: str,
name_prefix: str,
context: MutableMapping[str, Any],
):
run_command = cwl_element.run
inner_context = {**context}
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
inner_context |= {"version": run_command.cwlVersion}
return run_command, inner_cwl_name_prefix, inner_context


async def process_secondary_files(
context: StreamFlowContext,
cwl_version: str,
Expand Down
65 changes: 13 additions & 52 deletions streamflow/provenance/run_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,47 +76,6 @@ def _get_action_status(status: Status) -> str:
raise WorkflowProvenanceException(f"Action status {status.name} not supported.")


def _get_cwl_embedded_tool(
cwl_prefix: str,
prefix: str,
cwl_step: cwl_utils.parser.WorkflowStep,
step_name: str,
version: str,
) -> tuple[cwl_utils.parser.Process, str]:
run_command = cwl_step.run
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = version
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = streamflow.cwl.utils.get_name(
prefix, cwl_prefix, cwl_step.id, preserve_cwl_prefix=True
)
cwl_prefix = (
step_name if version == "v1.0" else posixpath.join(cwl_step_name, "run")
)
else:
cwl_prefix = streamflow.cwl.utils.get_name(
prefix,
cwl_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
else:
run_command = cwl_step.loadingOptions.fetcher.urljoin(
cwl_step.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_step.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
cwl_prefix = (
streamflow.cwl.utils.get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
return run_command, cwl_prefix


def _get_cwl_entity_id(entity_id: str) -> str:
tokens = entity_id.split("#")
if len(tokens) > 1:
Expand Down Expand Up @@ -1118,12 +1077,12 @@ def _get_step(
"@type": "HowToStep",
}
step_name = streamflow.cwl.utils.get_name(prefix, cwl_prefix, cwl_step.id)
embedded_tool, cwl_prefix = _get_cwl_embedded_tool(
cwl_prefix=cwl_prefix,
prefix=prefix,
cwl_step=cwl_step,
embedded_tool, cwl_prefix, _ = streamflow.cwl.utils.process_inner_process(
cwl_name_prefix=cwl_prefix,
name_prefix=prefix,
cwl_element=cwl_step,
step_name=step_name,
version=version,
context={"version": version},
)
if isinstance(embedded_tool, get_args(cwl_utils.parser.Workflow)):
work_example = self._get_workflow(
Expand Down Expand Up @@ -1317,12 +1276,14 @@ def _register_steps(
cwl_step_name = streamflow.cwl.utils.get_name(
prefix, cwl_prefix, cwl_step.id, preserve_cwl_prefix=True
)
embedded_tool, inner_cwl_prefix = _get_cwl_embedded_tool(
cwl_prefix=cwl_prefix,
prefix=prefix,
step_name=step_name,
cwl_step=cwl_step,
version=version,
embedded_tool, inner_cwl_prefix, _ = (
streamflow.cwl.utils.process_inner_process(
cwl_name_prefix=cwl_prefix,
name_prefix=prefix,
cwl_element=cwl_step,
step_name=step_name,
context={"version": version},
)
)
# Register step
jsonld_entity["step"].append({"@id": jsonld_step["@id"]})
Expand Down

0 comments on commit d7d702e

Please sign in to comment.