Skip to content
35 changes: 21 additions & 14 deletions cwltool/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom):


def check_types(srctype, sinktype, linkMerge, valueFrom):
# type: (Union[List[Text],Text], Union[List[Text],Text], Text, Text) -> Text
# type: (Any, Any, Text, Text) -> Text
"""Check if the source and sink types are "pass", "warning", or "exception".
"""

Expand All @@ -110,18 +110,25 @@ def check_types(srctype, sinktype, linkMerge, valueFrom):
return "warning"
else:
return "exception"
elif linkMerge == "merge_nested":
return check_types({"items": srctype, "type": "array"}, sinktype, None, None)
elif linkMerge == "merge_flattened":
return check_types(merge_flatten_type(srctype), sinktype, None, None)
else:
if not isinstance(sinktype, dict):
return "exception"
elif linkMerge == "merge_nested":
return check_types(srctype, sinktype["items"], None, None)
elif linkMerge == "merge_flattened":
if not isinstance(srctype, dict):
return check_types(srctype, sinktype["items"], None, None)
else:
return check_types(srctype, sinktype, None, None)
else:
raise WorkflowException(u"Unrecognized linkMerge enum '%s'" % linkMerge)
raise WorkflowException(u"Unrecognized linkMerge enu_m '%s'" % linkMerge)


def merge_flatten_type(src):
# type: (Any) -> Any
"""Return the merge flattened type of the source type
"""

if isinstance(src, list):
return [merge_flatten_type(t) for t in src]
elif isinstance(src, dict) and src.get("type") == "array":
return src
else:
return {"items": src, "type": "array"}


def can_assign_src_to_sink(src, sink, strict=False): # type: (Any, Any, bool) -> bool
Expand Down Expand Up @@ -581,7 +588,7 @@ def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs)
" with sink '%s' of type %s"
% (shortname(sink["id"]), json.dumps(sink["type"])))
if linkMerge:
msg += "\n" + SourceLine(sink).makeError(" sink has linkMerge method %s" % linkMerge)
msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge)
warning_msgs.append(msg)
for exception in exceptions:
src = exception.src
Expand All @@ -594,7 +601,7 @@ def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs)
" with sink '%s' of type %s"
% (shortname(sink["id"]), json.dumps(sink["type"])))
if linkMerge:
msg += "\n" + SourceLine(sink).makeError(" sink has linkMerge method %s" % linkMerge)
msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge)
exception_msgs.append(msg)

for sink in step_inputs:
Expand Down
24 changes: 24 additions & 0 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,12 @@ def test_typecheck(self):
linkMerge="merge_nested", valueFrom=None),
"exception")

# check linkMerge: merge_nested and sinktype is "Any"
self.assertEquals(cwltool.workflow.check_types(
['string', 'int'], "Any",
linkMerge="merge_nested", valueFrom=None),
"pass")

# check linkMerge: merge_flattened
self.assertEquals(cwltool.workflow.check_types(
['string', 'int'],
Expand Down Expand Up @@ -458,6 +464,24 @@ def test_typecheck(self):
linkMerge="merge_flattened", valueFrom=None),
"exception")

# check linkMerge: merge_flattened and sinktype is "Any"
self.assertEquals(cwltool.workflow.check_types(
['string', 'int'], "Any",
linkMerge="merge_flattened", valueFrom=None),
"pass")

self.assertEquals(cwltool.workflow.check_types(
{'items': ['string', 'int'], 'type': 'array'}, "Any",
linkMerge="merge_flattened", valueFrom=None),
"pass")

# check linkMerge: merge_flattened when srctype is a list
self.assertEquals(cwltool.workflow.check_types(
[{'items': 'string', 'type': 'array'}],
{'items': 'string', 'type': 'array'},
linkMerge="merge_flattened", valueFrom=None),
"pass")

# check valueFrom
self.assertEquals(cwltool.workflow.check_types(
{'items': ['File', 'int'], 'type': 'array'},
Expand Down