diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 48c096bef..a978aa4dd 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -82,7 +82,12 @@ OnlyNonNullTransformer, ValueFromTransformer, ) -from streamflow.cwl.utils import LoadListing, SecondaryFile, resolve_dependencies +from streamflow.cwl.utils import ( + LoadListing, + SecondaryFile, + process_inner_process, + resolve_dependencies, +) from streamflow.cwl.workflow import CWLWorkflow from streamflow.deployment.utils import get_binding_config from streamflow.log_handler import logger @@ -2112,44 +2117,9 @@ def _translate_workflow_step( ] # Process inner element - run_command = cwl_element.run - if cwl_utils.parser.is_process(run_command): - run_command.cwlVersion = context["version"] - cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) - if ":" in run_command.id.split("#")[-1]: - cwl_step_name = utils.get_name( - name_prefix, - cwl_name_prefix, - cwl_element.id, - preserve_cwl_prefix=True, - ) - inner_cwl_name_prefix = ( - step_name - if context["version"] == "v1.0" - else posixpath.join(cwl_step_name, "run") - ) - else: - inner_cwl_name_prefix = utils.get_name( - name_prefix, - cwl_name_prefix, - run_command.id, - preserve_cwl_prefix=True, - ) - inner_context = context - else: - run_command = cwl_element.loadingOptions.fetcher.urljoin( - cwl_element.loadingOptions.fileuri, run_command - ) - run_command = cwl_utils.parser.load_document_by_uri( - run_command, loadingOptions=cwl_element.loadingOptions - ) - cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) - inner_cwl_name_prefix = ( - utils.get_name(posixpath.sep, posixpath.sep, run_command.id) - if "#" in run_command.id - else posixpath.sep - ) - inner_context = {**context, **{"version": run_command.cwlVersion}} + run_command, inner_cwl_name_prefix, inner_context = process_inner_process( + cwl_element, step_name, name_prefix, cwl_name_prefix, context + ) # Handle optional input variables default_ports = {} diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index fb869aafd..ad2a4d4c9 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -12,6 +12,7 @@ import cwl_utils.expression import cwl_utils.parser +import cwl_utils.parser.utils from cwl_utils.parser.cwl_v1_2_utils import CONTENT_LIMIT from streamflow.core.context import StreamFlowContext @@ -663,6 +664,54 @@ class LoadListing(Enum): deep_listing = 2 +def process_inner_process( + cwl_element: cwl_utils.parser.WorkflowStep, + cwl_name_prefix: str, + step_name: str, + name_prefix: str, + context: MutableMapping[str, Any], +): + run_command = cwl_element.run + inner_context = {**context} + if cwl_utils.parser.is_process(run_command): + run_command.cwlVersion = context["version"] + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + if ":" in run_command.id.split("#")[-1]: + cwl_step_name = get_name( + name_prefix, + cwl_name_prefix, + cwl_element.id, + preserve_cwl_prefix=True, + ) + inner_cwl_name_prefix = ( + step_name + if context["version"] == "v1.0" + else posixpath.join(cwl_step_name, "run") + ) + else: + inner_cwl_name_prefix = get_name( + name_prefix, + cwl_name_prefix, + run_command.id, + preserve_cwl_prefix=True, + ) + else: + run_command = cwl_element.loadingOptions.fetcher.urljoin( + cwl_element.loadingOptions.fileuri, run_command + ) + run_command = cwl_utils.parser.load_document_by_uri( + run_command, loadingOptions=cwl_element.loadingOptions + ) + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + inner_cwl_name_prefix = ( + get_name(posixpath.sep, posixpath.sep, run_command.id) + if "#" in run_command.id + else posixpath.sep + ) + inner_context |= {"version": run_command.cwlVersion} + return run_command, inner_cwl_name_prefix, inner_context + + async def process_secondary_files( context: StreamFlowContext, cwl_version: str, diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index 19aa462cd..4299afb30 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -76,47 +76,6 @@ def _get_action_status(status: Status) -> str: raise WorkflowProvenanceException(f"Action status {status.name} not supported.") -def _get_cwl_embedded_tool( - cwl_prefix: str, - prefix: str, - cwl_step: cwl_utils.parser.WorkflowStep, - step_name: str, - version: str, -) -> tuple[cwl_utils.parser.Process, str]: - run_command = cwl_step.run - if cwl_utils.parser.is_process(run_command): - run_command.cwlVersion = version - cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) - if ":" in run_command.id.split("#")[-1]: - cwl_step_name = streamflow.cwl.utils.get_name( - prefix, cwl_prefix, cwl_step.id, preserve_cwl_prefix=True - ) - cwl_prefix = ( - step_name if version == "v1.0" else posixpath.join(cwl_step_name, "run") - ) - else: - cwl_prefix = streamflow.cwl.utils.get_name( - prefix, - cwl_prefix, - run_command.id, - preserve_cwl_prefix=True, - ) - else: - run_command = cwl_step.loadingOptions.fetcher.urljoin( - cwl_step.loadingOptions.fileuri, run_command - ) - run_command = cwl_utils.parser.load_document_by_uri( - run_command, loadingOptions=cwl_step.loadingOptions - ) - cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) - cwl_prefix = ( - streamflow.cwl.utils.get_name(posixpath.sep, posixpath.sep, run_command.id) - if "#" in run_command.id - else posixpath.sep - ) - return run_command, cwl_prefix - - def _get_cwl_entity_id(entity_id: str) -> str: tokens = entity_id.split("#") if len(tokens) > 1: @@ -1118,12 +1077,12 @@ def _get_step( "@type": "HowToStep", } step_name = streamflow.cwl.utils.get_name(prefix, cwl_prefix, cwl_step.id) - embedded_tool, cwl_prefix = _get_cwl_embedded_tool( - cwl_prefix=cwl_prefix, - prefix=prefix, - cwl_step=cwl_step, + embedded_tool, cwl_prefix, _ = streamflow.cwl.utils.process_inner_process( + cwl_name_prefix=cwl_prefix, + name_prefix=prefix, + cwl_element=cwl_step, step_name=step_name, - version=version, + context={"version": version}, ) if isinstance(embedded_tool, get_args(cwl_utils.parser.Workflow)): work_example = self._get_workflow( @@ -1317,12 +1276,14 @@ def _register_steps( cwl_step_name = streamflow.cwl.utils.get_name( prefix, cwl_prefix, cwl_step.id, preserve_cwl_prefix=True ) - embedded_tool, inner_cwl_prefix = _get_cwl_embedded_tool( - cwl_prefix=cwl_prefix, - prefix=prefix, - step_name=step_name, - cwl_step=cwl_step, - version=version, + embedded_tool, inner_cwl_prefix, _ = ( + streamflow.cwl.utils.process_inner_process( + cwl_name_prefix=cwl_prefix, + name_prefix=prefix, + cwl_element=cwl_step, + step_name=step_name, + context={"version": version}, + ) ) # Register step jsonld_entity["step"].append({"@id": jsonld_step["@id"]})