From 44e2c9a9b248743396518073c0e972701d65fa9e Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Tue, 17 Dec 2024 17:08:24 +0100 Subject: [PATCH] This commit fixes several CWL translator issues. - the retrieval of the step name. Before this commit, it was expected that the `out` attribute of `WorkflowStep` was always a list of `string`. Instead, it is a list of `Any`, following the typing hint of the `cwl_utils.parser.WorkflowStep` constructor. In particular, it can have `WorkflowStepOutput` object elements. - the creation of the `CommandOutputProcessor` when a schema is defined. Before this commit, the `CommandInputRecordSchema` was not included in the `RecordSchema` case. Similar for `EnumSchema` and `ArraySchema` cases, respectively. --- streamflow/cwl/translator.py | 135 ++++++++++++++++++++--------------- 1 file changed, 76 insertions(+), 59 deletions(-) diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 61af6076..48c096be 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -267,7 +267,7 @@ def _create_command_output_processor( optional: bool = False, ) -> CommandOutputProcessor: # Array type: -> MapCommandOutputProcessor - if isinstance(port_type, get_args(cwl_utils.parser.OutputArraySchema)): + if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): return CWLMapCommandOutputProcessor( name=port_name, workflow=workflow, @@ -284,7 +284,7 @@ def _create_command_output_processor( ), ) # Enum type: -> create command output processor - elif isinstance(port_type, get_args(cwl_utils.parser.OutputEnumSchema)): + elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): # Process InlineJavascriptRequirement requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) @@ -312,7 +312,7 @@ def _create_command_output_processor( optional=optional, ) # Record type: -> ObjectCommandOutputProcessor - elif isinstance(port_type, get_args(cwl_utils.parser.OutputRecordSchema)): + elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): # Process InlineJavascriptRequirement requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) @@ -1504,6 +1504,7 @@ def _handle_optional_input_variables( cwl_element: cwl_utils.parser.WorkflowStep, inner_cwl_element: cwl_utils.parser.Process, cwl_name_prefix: str, + inner_cwl_name_prefix: str, default_ports: MutableMapping[str, Port], name_prefix: str, step_name: str, @@ -1512,9 +1513,6 @@ def _handle_optional_input_variables( inner_input_ports, outer_input_ports = set(), set() # Get inner CWL object input names for element_input in inner_cwl_element.inputs: - inner_cwl_name_prefix = utils.get_name( - name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True - ) global_name = utils.get_name( step_name, inner_cwl_name_prefix, element_input.id ) @@ -1522,7 +1520,6 @@ def _handle_optional_input_variables( inner_input_ports.add(port_name) # Get WorkflowStep input names for element_input in cwl_element.in_: - step_name = utils.get_name(name_prefix, cwl_name_prefix, cwl_element.id) cwl_step_name = utils.get_name( name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True ) @@ -2113,9 +2110,61 @@ def _translate_workflow_step( utils.get_name(step_name, cwl_step_name, n) for n in cwl_element.scatter or [] ] + + # Process inner element + run_command = cwl_element.run + if cwl_utils.parser.is_process(run_command): + run_command.cwlVersion = context["version"] + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + if ":" in run_command.id.split("#")[-1]: + cwl_step_name = utils.get_name( + name_prefix, + cwl_name_prefix, + cwl_element.id, + preserve_cwl_prefix=True, + ) + inner_cwl_name_prefix = ( + step_name + if context["version"] == "v1.0" + else posixpath.join(cwl_step_name, "run") + ) + else: + inner_cwl_name_prefix = utils.get_name( + name_prefix, + cwl_name_prefix, + run_command.id, + preserve_cwl_prefix=True, + ) + inner_context = context + else: + run_command = cwl_element.loadingOptions.fetcher.urljoin( + cwl_element.loadingOptions.fileuri, run_command + ) + run_command = cwl_utils.parser.load_document_by_uri( + run_command, loadingOptions=cwl_element.loadingOptions + ) + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + inner_cwl_name_prefix = ( + utils.get_name(posixpath.sep, posixpath.sep, run_command.id) + if "#" in run_command.id + else posixpath.sep + ) + inner_context = {**context, **{"version": run_command.cwlVersion}} + + # Handle optional input variables + default_ports = {} + self._handle_optional_input_variables( + cwl_element=cwl_element, + inner_cwl_element=run_command, + cwl_name_prefix=cwl_name_prefix, + inner_cwl_name_prefix=inner_cwl_name_prefix, + default_ports=default_ports, + name_prefix=name_prefix, + step_name=step_name, + workflow=workflow, + ) # Process inputs input_ports = {} - default_ports = {} value_from_transformers = {} input_dependencies = {} for element_input in cwl_element.in_: @@ -2335,7 +2384,15 @@ def _translate_workflow_step( external_output_ports = {} internal_output_ports = {} for element_output in cwl_element.out: - global_name = utils.get_name(step_name, cwl_step_name, element_output) + global_name = utils.get_name( + step_name, + cwl_step_name, + ( + element_output + if isinstance(element_output, str) + else element_output.id + ), + ) port_name = posixpath.relpath(global_name, step_name) # Retrieve or create output port if global_name not in self.output_ports: @@ -2546,7 +2603,15 @@ def _translate_workflow_step( # Add skip ports if there is a condition if cwl_condition: for element_output in cwl_element.out: - global_name = utils.get_name(step_name, cwl_step_name, element_output) + global_name = utils.get_name( + step_name, + cwl_step_name, + ( + element_output + if isinstance(element_output, str) + else element_output.id + ), + ) port_name = posixpath.relpath(global_name, step_name) skip_port = ( external_output_ports[global_name] @@ -2558,62 +2623,14 @@ def _translate_workflow_step( ) # Update output ports with the internal ones self.output_ports |= internal_output_ports - # Process inner element - run_command = cwl_element.run - if cwl_utils.parser.is_process(run_command): - run_command.cwlVersion = context["version"] - cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) - if ":" in run_command.id.split("#")[-1]: - cwl_step_name = utils.get_name( - name_prefix, - cwl_name_prefix, - cwl_element.id, - preserve_cwl_prefix=True, - ) - inner_cwl_name_prefix = ( - step_name - if context["version"] == "v1.0" - else posixpath.join(cwl_step_name, "run") - ) - else: - inner_cwl_name_prefix = utils.get_name( - name_prefix, - cwl_name_prefix, - run_command.id, - preserve_cwl_prefix=True, - ) - else: - run_command = cwl_element.loadingOptions.fetcher.urljoin( - cwl_element.loadingOptions.fileuri, run_command - ) - run_command = cwl_utils.parser.load_document_by_uri( - run_command, loadingOptions=cwl_element.loadingOptions - ) - cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) - inner_cwl_name_prefix = ( - utils.get_name(posixpath.sep, posixpath.sep, run_command.id) - if "#" in run_command.id - else posixpath.sep - ) - context = {**context, **{"version": run_command.cwlVersion}} self._recursive_translate( workflow=workflow, cwl_element=run_command, - context=context + context=inner_context | {"requirements": {k: v for k, v in requirements.items() if k != "Loop"}}, name_prefix=step_name, cwl_name_prefix=inner_cwl_name_prefix, ) - # Handle optional input variables - self._handle_optional_input_variables( - cwl_element=cwl_element, - inner_cwl_element=run_command, - cwl_name_prefix=cwl_name_prefix, - default_ports=default_ports, - name_prefix=name_prefix, - step_name=step_name, - workflow=workflow, - ) # Update output ports with the external ones self.output_ports |= external_output_ports