diff --git a/pydra/engine/auxiliary.py b/pydra/engine/auxiliary.py index 09e2e6fed1..471a2fd5d3 100644 --- a/pydra/engine/auxiliary.py +++ b/pydra/engine/auxiliary.py @@ -200,51 +200,6 @@ def _add_name(mlist, name): return mlist -#Function interface - - -class FunctionInterface(object): - """ A new function interface """ - - def __init__(self, function, output_nm, out_read=False, input_map=None): - self.function = function - if type(output_nm) is list: - self._output_nm = output_nm - else: - raise Exception("output_nm should be a list") - if not input_map: - self.input_map = {} - # TODO use signature - for key in inspect.getargspec(function)[0]: - if key not in self.input_map.keys(): - self.input_map[key] = key - # flags if we want to read the txt file to save in node.output - self.out_read = out_read - - def run(self, input): - self.output = {} - if self.input_map: - for (key_fun, key_inp) in self.input_map.items(): - try: - input[key_fun] = input.pop(key_inp) - except KeyError: - raise Exception("no {} in the input dictionary".format(key_inp)) - fun_output = self.function(**input) - logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output)) - if type(fun_output) is tuple: - if len(self._output_nm) == len(fun_output): - for i, out in enumerate(fun_output): - self.output[self._output_nm[i]] = out - else: - raise Exception("length of output_nm doesnt match length of the function output") - elif len(self._output_nm) == 1: - self.output[self._output_nm[0]] = fun_output - else: - raise Exception("output_nm doesnt match length of the function output") - - return fun_output - - # want to use to access input as dot, # but it doesnt work since im using "." within names (using my old syntax with - also cant work) # https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary diff --git a/pydra/engine/node.py b/pydra/engine/node.py index bede18ac63..ea04e7ab9a 100644 --- a/pydra/engine/node.py +++ b/pydra/engine/node.py @@ -6,7 +6,7 @@ import numpy as np from nipype.utils.filemanip import loadpkl -from nipype import logging +from nipype import logging, Function from . import state from . import auxiliary as aux @@ -164,23 +164,14 @@ def get_input_el(self, ind): ]) if not from_node.mapper: dir_nm_el_from = "" - - if is_node(from_node) and is_current_interface(from_node.interface): - file_from = self._reading_ci_output( + # TODO: do I need this if, what if this is wf? + if is_node(from_node): + out_from = self._reading_ci_output( node=from_node, dir_nm_el=dir_nm_el_from, out_nm=from_socket) - if file_from and os.path.exists(file_from): - inputs_dict["{}.{}".format(self.name, to_socket)] = file_from + if out_from: + inputs_dict["{}.{}".format(self.name, to_socket)] = out_from else: - raise Exception("{} doesnt exist".format(file_from)) - else: # assuming here that I want to read the file (will not be used with the current interfaces) - file_from = os.path.join(from_node.workingdir, dir_nm_el_from, - from_socket + ".txt") - with open(file_from) as f: - content = f.readline() - try: - inputs_dict["{}.{}".format(self.name, to_socket)] = eval(content) - except NameError: - inputs_dict["{}.{}".format(self.name, to_socket)] = content + raise Exception("output from {} doesnt exist".format(from_node)) return state_dict, inputs_dict @@ -191,11 +182,10 @@ def _reading_ci_output(self, dir_nm_el, out_nm, node=None): result_pklfile = os.path.join(os.getcwd(), node.workingdir, dir_nm_el, node.interface.nn.name, "result_{}.pklz".format( node.interface.nn.name)) - if os.path.exists(result_pklfile): - out_file = getattr(loadpkl(result_pklfile).outputs, out_nm) - if os.path.exists(out_file): - return out_file - + if os.path.exists(result_pklfile) and os.stat(result_pklfile).st_size > 0: + out = getattr(loadpkl(result_pklfile).outputs, out_nm) + if out: + return out return False # checking if all outputs are saved @@ -253,15 +243,8 @@ def __init__(self, self.workingdir = workingdir self.interface = interface - if is_function_interface(self.interface): - # adding node name to the interface's name mapping - self.interface.input_map = dict((key, "{}.{}".format(self.name, value)) - for (key, value) in self.interface.input_map.items()) - # list of output names taken from interface output name - self.output_names = self.interface._output_nm - elif is_current_interface(self.interface): - # list of interf_key_out - self.output_names = output_names + # list of interf_key_out + self.output_names = output_names if not self.output_names: self.output_names = [] @@ -293,19 +276,12 @@ def run_interface_el(self, i, ind): print("Run interface el, dict={}".format(state_dict)) logger.debug("Run interface el, name={}, inputs_dict={}, state_dict={}".format( self.name, inputs_dict, state_dict)) - if is_function_interface(self.interface): - res = self.interface.run(inputs_dict) - output = self.interface.output - print("Run fun interface el, output={}".format(output)) - logger.debug("Run fun interface el, output={}".format(output)) - self._writting_results_tmp(state_dict, dir_nm_el, output) - elif is_current_interface(self.interface): - if not self.mapper: - dir_nm_el = "" - res = self.interface.run( - inputs=inputs_dict, - base_dir=os.path.join(os.getcwd(), self.workingdir), - dir_nm_el=dir_nm_el) + if not self.mapper: + dir_nm_el = "" + res = self.interface.run( + inputs=inputs_dict, + base_dir=os.path.join(os.getcwd(), self.workingdir), + dir_nm_el=dir_nm_el) # TODO when join #if self._joinByKey: @@ -317,14 +293,6 @@ def run_interface_el(self, i, ind): # dir_nm_el = os.path.join(dir_join, dir_nm_el) return res - def _writting_results_tmp(self, state_dict, dir_nm_el, output): - """temporary method to write the results in the files (this is usually part of a interface)""" - if not self.mapper: - dir_nm_el = '' - os.makedirs(os.path.join(self.workingdir, dir_nm_el), exist_ok=True) - for key_out, val_out in output.items(): - with open(os.path.join(self.workingdir, dir_nm_el, key_out + ".txt"), "w") as fout: - fout.write(str(val_out)) def get_output(self): """collecting all outputs and updating self._output""" @@ -337,32 +305,11 @@ def get_output(self): state_dict = self.state.state_ind(ind) dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items())]) if self.mapper: - if is_function_interface(self.interface): - output = os.path.join(self.workingdir, dir_nm_el, key_out + ".txt") - if self.interface.out_read: - with open(output) as fout: - content = fout.readline() - try: - output = eval(content) - except NameError: - output = content - self._output[key_out][dir_nm_el] = (state_dict, output) - elif is_current_interface(self.interface): - self._output[key_out][dir_nm_el] = \ - (state_dict, (state_dict, self._reading_ci_output(dir_nm_el=dir_nm_el, out_nm=key_out))) + self._output[key_out][dir_nm_el] = \ + (state_dict, self._reading_ci_output(dir_nm_el=dir_nm_el, out_nm=key_out)) else: - if is_function_interface(self.interface): - output = os.path.join(self.workingdir, key_out + ".txt") - if self.interface.out_read: - with open(output) as fout: - try: - output = eval(fout.readline()) - except Workflow: - output = fout.readline() - self._output[key_out] = (state_dict, output) - elif is_current_interface(self.interface): - self._output[key_out] = \ - (state_dict, self._reading_ci_output(dir_nm_el="", out_nm=key_out)) + self._output[key_out] = \ + (state_dict, self._reading_ci_output(dir_nm_el="", out_nm=key_out)) return self._output # dj: version without join @@ -378,13 +325,8 @@ def _check_all_results(self): dir_nm_el = "" for key_out in self.output_names: - if is_function_interface(self.interface): - if not os.path.isfile( - os.path.join(self.workingdir, dir_nm_el, key_out + ".txt")): - return False - elif is_current_interface(self.interface): - if not self._reading_ci_output(dir_nm_el, key_out): - return False + if not self._reading_ci_output(dir_nm_el, key_out): + return False self._is_complete = True return True @@ -394,18 +336,15 @@ def _reading_results(self): """ for key_out in self.output_names: self._result[key_out] = [] - #pdb.set_trace() if self._state_inputs: val_l = self._dict_tuple2list(self._output[key_out]) - for (st_dict, filename) in val_l: - with open(filename) as fout: - self._result[key_out].append((st_dict, eval(fout.readline()))) + for (st_dict, out) in val_l: + self._result[key_out].append((st_dict, out)) else: # st_dict should be {} # not sure if this is used (not tested) - (st_dict, filename) = self._output[key_out][None] - with open(filename) as fout: - self._result[key_out].append(({}, eval(fout.readline()))) + (st_dict, out) = self._output[key_out][None] + self._result[key_out].append(({}, out)) # dj: removing temp. from Node class # def run(self, plugin="serial"): @@ -559,22 +498,13 @@ def _reading_results(self): res_l = [] val_l = self._dict_tuple2list(self.output[key_out][dir_nm_el]) for val in val_l: - with open(val[1]) as fout: - logger.debug('Reading Results: file={}, st_dict={}'.format( - val[1], val[0])) - res_l.append((val[0], eval(fout.readline()))) + res_l.append(val) self._result[key_out].append((wf_inputs_dict, res_l)) else: val_l = self._dict_tuple2list(self.output[key_out]) for val in val_l: - #TODO: I think that val shouldn't be dict here... - # TMP solution - if type(val) is dict: - val = [v for k, v in val.items()][0] - with open(val[1]) as fout: - logger.debug('Reading Results: file={}, st_dict={}'.format( - val[1], val[0])) - self._result[key_out].append((val[0], eval(fout.readline()))) + self._result[key_out].append(val) + def add_nodes(self, nodes): """adding nodes without defining connections @@ -594,6 +524,7 @@ def add(self, name=None, workingdir=None, inputs=None, + input_names=None, output_names=None, mapper=None, write_state=True, @@ -602,10 +533,13 @@ def add(self, if is_function(runnable): if not output_names: output_names = ["out"] - interface = aux.FunctionInterface( - function=runnable, output_nm=output_names, out_read=out_read) + if input_names is None: + raise Exception("you need to specify input_names") if not name: raise Exception("you have to specify name for the node") + nipype1_interf = Function(function=runnable, input_names=input_names, + output_names=output_names) + interface = aux.CurrentInterface(interface=nipype1_interf, name="addtwo") if not workingdir: workingdir = name node = Node( @@ -615,8 +549,9 @@ def add(self, inputs=inputs, mapper=mapper, other_mappers=self._node_mappers, - write_state=write_state) - elif is_function_interface(runnable) or is_current_interface(runnable): + write_state=write_state, + output_names=output_names) + elif is_current_interface(runnable): if not name: raise Exception("you have to specify name for the node") if not workingdir: @@ -735,10 +670,6 @@ def is_function(obj): return hasattr(obj, '__call__') -def is_function_interface(obj): - return type(obj) is aux.FunctionInterface - - def is_current_interface(obj): return type(obj) is aux.CurrentInterface diff --git a/pydra/engine/tests/test_node.py b/pydra/engine/tests/test_node.py index 17c2e6d90e..676984acd2 100644 --- a/pydra/engine/tests/test_node.py +++ b/pydra/engine/tests/test_node.py @@ -6,9 +6,10 @@ from nipype.utils.filemanip import save_json, makedirs, to_str from nipype.interfaces import fsl +from nipype import Function from ..node import Node, Workflow -from ..auxiliary import FunctionInterface, CurrentInterface +from ..auxiliary import CurrentInterface from ..submitter import Submitter import pytest @@ -38,19 +39,25 @@ def move2orig(): def fun_addtwo(a): + import time time.sleep(1) if a == 3: time.sleep(2) return a + 2 +_interf_addtwo = Function(function=fun_addtwo, input_names=["a"], output_names=["out"]) +interf_addtwo = CurrentInterface(interface=_interf_addtwo, name="addtwo") -def fun_addvar(a, b): - return a + b + +def fun_addvar(b, c): + return b + c + +_interf_addvar = Function(function=fun_addvar, input_names=["b", "c"], output_names=["out"]) +interf_addvar = CurrentInterface(interface=_interf_addvar, name="addvar") def test_node_1(): """Node with mandatory arguments only""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) nn = Node(name="NA", interface=interf_addtwo) assert nn.mapper is None assert nn.inputs == {} @@ -59,7 +66,6 @@ def test_node_1(): def test_node_2(): """Node with interface and inputs""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) nn = Node(name="NA", interface=interf_addtwo, inputs={"a": 3}) assert nn.mapper is None # adding NA to the name of the variable @@ -69,7 +75,6 @@ def test_node_2(): def test_node_3(): """Node with interface, inputs and mapper""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) nn = Node(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}, mapper="a") assert nn.mapper == "NA.a" assert (nn.inputs["NA.a"] == np.array([3, 5])).all() @@ -83,7 +88,6 @@ def test_node_3(): def test_node_4(): """Node with interface and inputs. mapper set using map method""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) nn = Node(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}) nn.map(mapper="a") assert nn.mapper == "NA.a" @@ -97,7 +101,6 @@ def test_node_4(): def test_node_4a(): """Node with interface, mapper and inputs set with the map method""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) nn = Node(name="NA", interface=interf_addtwo) nn.map(mapper="a", inputs={"a": [3, 5]}) assert nn.mapper == "NA.a" @@ -113,12 +116,8 @@ def test_node_4a(): @python35_only def test_node_5(plugin, change_dir): """Node with interface and inputs, no mapper, running interface""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - nn = Node( - name="NA", - inputs={"a": 3}, - interface=interf_addtwo, - workingdir="test_nd5_{}".format(plugin)) + nn = Node(name="NA", inputs={"a": 3}, interface=interf_addtwo, + workingdir="test_nd5_{}".format(plugin), output_names=["out"]) assert (nn.inputs["NA.a"] == np.array([3])).all() @@ -142,8 +141,8 @@ def test_node_5(plugin, change_dir): @python35_only def test_node_6(plugin, change_dir): """Node with interface, inputs and the simplest mapper, running interface""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - nn = Node(name="NA", interface=interf_addtwo, workingdir="test_nd6_{}".format(plugin)) + nn = Node(name="NA", interface=interf_addtwo, workingdir="test_nd6_{}".format(plugin), + output_names=["out"]) nn.map(mapper="a", inputs={"a": [3, 5]}) assert nn.mapper == "NA.a" @@ -169,21 +168,21 @@ def test_node_6(plugin, change_dir): @python35_only def test_node_7(plugin, change_dir): """Node with interface, inputs and scalar mapper, running interface""" - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nn = Node(name="NA", interface=interf_addvar, workingdir="test_nd7_{}".format(plugin)) + nn = Node(name="NA", interface=interf_addvar, workingdir="test_nd7_{}".format(plugin), + output_names=["out"]) # scalar mapper - nn.map(mapper=("a", "b"), inputs={"a": [3, 5], "b": [2, 1]}) + nn.map(mapper=("b", "c"), inputs={"b": [3, 5], "c": [2, 1]}) - assert nn.mapper == ("NA.a", "NA.b") - assert (nn.inputs["NA.a"] == np.array([3, 5])).all() - assert (nn.inputs["NA.b"] == np.array([2, 1])).all() + assert nn.mapper == ("NA.b", "NA.c") + assert (nn.inputs["NA.b"] == np.array([3, 5])).all() + assert (nn.inputs["NA.c"] == np.array([2, 1])).all() sub = Submitter(plugin=plugin, runnable=nn) sub.run() sub.close() # checking the results - expected = [({"NA.a": 3, "NA.b": 2}, 5), ({"NA.a": 5, "NA.b": 1}, 6)] + expected = [({"NA.b": 3, "NA.c": 2}, 5), ({"NA.b": 5, "NA.c": 1}, 6)] # to be sure that there is the same order (not sure if node itself should keep the order) key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -198,33 +197,22 @@ def test_node_7(plugin, change_dir): @python35_only def test_node_8(plugin, change_dir): """Node with interface, inputs and vector mapper, running interface""" - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nn = Node(name="NA", interface=interf_addvar, workingdir="test_nd8_{}".format(plugin)) + nn = Node(name="NA", interface=interf_addvar, workingdir="test_nd8_{}".format(plugin), + output_names=["out"]) # [] for outer product - nn.map(mapper=["a", "b"], inputs={"a": [3, 5], "b": [2, 1]}) + nn.map(mapper=["b", "c"], inputs={"b": [3, 5], "c": [2, 1]}) - assert nn.mapper == ["NA.a", "NA.b"] - assert (nn.inputs["NA.a"] == np.array([3, 5])).all() - assert (nn.inputs["NA.b"] == np.array([2, 1])).all() + assert nn.mapper == ["NA.b", "NA.c"] + assert (nn.inputs["NA.b"] == np.array([3, 5])).all() + assert (nn.inputs["NA.c"] == np.array([2, 1])).all() sub = Submitter(plugin=plugin, runnable=nn) sub.run() sub.close() # checking teh results - expected = [({ - "NA.a": 3, - "NA.b": 1 - }, 4), ({ - "NA.a": 3, - "NA.b": 2 - }, 5), ({ - "NA.a": 5, - "NA.b": 1 - }, 6), ({ - "NA.a": 5, - "NA.b": 2 - }, 7)] + expected = [({"NA.b": 3, "NA.c": 1}, 4), ({"NA.b": 3, "NA.c": 2}, 5), + ({"NA.b": 5, "NA.c": 1}, 6), ({"NA.b": 5, "NA.c": 2}, 7)] # to be sure that there is the same order (not sure if node itself should keep the order) key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -241,9 +229,8 @@ def test_node_8(plugin, change_dir): def test_workflow_0(plugin="serial"): """workflow (without run) with one node with a mapper""" wf = Workflow(name="wf0", workingdir="test_wf0_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) # defining a node with mapper and inputs first - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["a"]) na.map(mapper="a", inputs={"a": [3, 5]}) # one of the way of adding nodes to the workflow wf.add_nodes([na]) @@ -257,8 +244,7 @@ def test_workflow_0(plugin="serial"): def test_workflow_1(plugin, change_dir): """workflow with one node with a mapper""" wf = Workflow(name="wf1", workingdir="test_wf1_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) wf.add_nodes([na]) @@ -279,18 +265,18 @@ def test_workflow_1(plugin, change_dir): @python35_only def test_workflow_2(plugin, change_dir): """workflow with two nodes, second node without mapper""" - wf = Workflow(name="wf2", workingdir="test_wf2_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + wf = Workflow(name="wf2", workingdir="test_wf2_{}".format(plugin), + wf_output_names=[("NB", "out")]) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) # the second node does not have explicit mapper (but keeps the mapper from the NA node) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, inputs={"b": 10}, workingdir="nb") + nb = Node(name="NB", interface=interf_addvar, inputs={"c": 10}, workingdir="nb", + output_names=["out"]) # adding 2 nodes and create a connection (as it is now) wf.add_nodes([na, nb]) - wf.connect("NA", "out", "NB", "a") + wf.connect("NA", "out", "NB", "b") assert wf.nodes[0].mapper == "NA.a" sub = Submitter(runnable=wf, plugin=plugin) @@ -307,7 +293,7 @@ def test_workflow_2(plugin, change_dir): # results from NB keeps the "state input" from the first node # two elements as in NA - expected_B = [({"NA.a": 3, "NB.b": 10}, 15), ({"NA.a": 5, "NB.b": 10}, 17)] + expected_B = [({"NA.a": 3, "NB.c": 10}, 15), ({"NA.a": 5, "NB.c": 10}, 17)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -315,26 +301,32 @@ def test_workflow_2(plugin, change_dir): assert wf.nodes[1].result["out"][i][0] == res[0] assert wf.nodes[1].result["out"][i][1] == res[1] + #output of the wf + wf.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.result["out"][i][0] == res[0] + assert wf.result["out"][i][1] == res[1] + + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_2a(plugin, change_dir): """workflow with two nodes, second node with a scalar mapper""" - wf = Workflow(name="wf2", workingdir="test_wf2a_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + wf = Workflow(name="wf2", workingdir="test_wf2a_{}".format(plugin), + wf_output_names=[("NB", "out")]) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # explicit scalar mapper between "a" from NA and b - nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + nb.map(mapper=("NA.a", "c"), inputs={"c": [2, 1]}) wf.add_nodes([na, nb]) - wf.connect("NA", "out", "NB", "a") + wf.connect("NA", "out", "NB", "b") assert wf.nodes[0].mapper == "NA.a" - assert wf.nodes[1].mapper == ("NA.a", "NB.b") + assert wf.nodes[1].mapper == ("NA.a", "NB.c") sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -349,7 +341,7 @@ def test_workflow_2a(plugin, change_dir): assert wf.nodes[0].result["out"][i][1] == res[1] # two elements (scalar mapper) - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -357,26 +349,30 @@ def test_workflow_2a(plugin, change_dir): assert wf.nodes[1].result["out"][i][0] == res[0] assert wf.nodes[1].result["out"][i][1] == res[1] + # output of the wf + wf.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.result["out"][i][0] == res[0] + assert wf.result["out"][i][1] == res[1] + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_2b(plugin): """workflow with two nodes, second node with a vector mapper""" - wf = Workflow(name="wf2", workingdir="test_wf2b_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + wf = Workflow(name="wf2", workingdir="test_wf2b_{}".format(plugin), + wf_output_names=[("NB", "out")]) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) - - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # outer mapper - nb.map(mapper=["NA.a", "b"], inputs={"b": [2, 1]}) + nb.map(mapper=["NA.a", "c"], inputs={"c": [2, 1]}) wf.add_nodes([na, nb]) - wf.connect("NA", "out", "NB", "a") + wf.connect("NA", "out", "NB", "b") assert wf.nodes[0].mapper == "NA.a" - assert wf.nodes[1].mapper == ["NA.a", "NB.b"] + assert wf.nodes[1].mapper == ["NA.a", "NB.c"] sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -391,19 +387,8 @@ def test_workflow_2b(plugin): assert wf.nodes[0].result["out"][i][1] == res[1] # four elements (outer product) - expected_B = [({ - "NA.a": 3, - "NB.b": 1 - }, 6), ({ - "NA.a": 3, - "NB.b": 2 - }, 7), ({ - "NA.a": 5, - "NB.b": 1 - }, 8), ({ - "NA.a": 5, - "NB.b": 2 - }, 9)] + expected_B = [({"NA.a": 3, "NB.c": 1}, 6), ({"NA.a": 3, "NB.c": 2}, 7), + ({"NA.a": 5, "NB.c": 1}, 8), ({"NA.a": 5, "NB.c": 2}, 9)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -411,17 +396,21 @@ def test_workflow_2b(plugin): assert wf.nodes[1].result["out"][i][0] == res[0] assert wf.nodes[1].result["out"][i][1] == res[1] + # output of the wf + wf.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.result["out"][i][0] == res[0] + assert wf.result["out"][i][1] == res[1] -# using add method to add nodes +# using add method to add nodes @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_3(plugin, change_dir): """using add(node) method""" wf = Workflow(name="wf3", workingdir="test_wf3_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) # using add method (as in the Satra's example) with a node wf.add(na) @@ -446,10 +435,9 @@ def test_workflow_3(plugin, change_dir): def test_workflow_3a(plugin, change_dir): """using add(interface) method""" wf = Workflow(name="wf3a", workingdir="test_wf3a_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - # using the add method with an interface - wf.add(interf_addtwo, workingdir="na", mapper="a", inputs={"a": [3, 5]}, name="NA") + wf.add(interf_addtwo, workingdir="na", mapper="a", inputs={"a": [3, 5]}, name="NA", + output_names=["out"]) assert wf.nodes[0].mapper == "NA.a" @@ -472,7 +460,8 @@ def test_workflow_3b(plugin, change_dir): """using add (function) method""" wf = Workflow(name="wf3b", workingdir="test_wf3b_{}".format(plugin)) # using the add method with a function - wf.add(fun_addtwo, workingdir="na", mapper="a", inputs={"a": [3, 5]}, name="NA") + wf.add(fun_addtwo, input_names=["a"], workingdir="na", mapper="a", + inputs={"a": [3, 5]}, name="NA", output_names=["out"]) assert wf.nodes[0].mapper == "NA.a" @@ -496,19 +485,16 @@ def test_workflow_4(plugin, change_dir): using wf.connect to connect two nodes """ wf = Workflow(name="wf4", workingdir="test_wf4_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) wf.add(na) - - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # explicit mapper with a variable from the previous node # providing inputs with b - nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + nb.map(mapper=("NA.a", "c"), inputs={"c": [2, 1]}) wf.add(nb) # connect method as it is in the current version - wf.connect("NA", "out", "NB", "a") + wf.connect("NA", "out", "NB", "b") sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -522,7 +508,7 @@ def test_workflow_4(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -536,17 +522,16 @@ def test_workflow_4(plugin, change_dir): def test_workflow_4a(plugin, change_dir): """ using add(node) method with kwarg arg to connect nodes (instead of wf.connect) """ wf = Workflow(name="wf4a", workingdir="test_wf4a_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) wf.add(na) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # explicit mapper with a variable from the previous node - nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + nb.map(mapper=("NA.a", "c"), inputs={"c": [2, 1]}) # instead of "connect", using kwrg argument in the add method as in the example - wf.add(nb, a="NA.out") + wf.add(nb, b="NA.out") sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -560,7 +545,7 @@ def test_workflow_4a(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -577,8 +562,7 @@ def test_workflow_4a(plugin, change_dir): def test_workflow_5(plugin, change_dir): """using a map method for one node""" wf = Workflow(name="wf5", workingdir="test_wf5_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) wf.add(na) # using the map method after add (using mapper for the last added node as default) @@ -602,8 +586,7 @@ def test_workflow_5(plugin, change_dir): def test_workflow_5a(plugin, change_dir): """using a map method for one node (using add and map in one chain)""" wf = Workflow(name="wf5a", workingdir="test_wf5a_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) wf.add(na).map_node(mapper="a", inputs={"a": [3, 5]}) @@ -625,17 +608,15 @@ def test_workflow_5a(plugin, change_dir): def test_workflow_6(plugin, change_dir): """using a map method for two nodes (using last added node as default)""" wf = Workflow(name="wf6", workingdir="test_wf6_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # using the map methods after add (using mapper for the last added nodes as default) wf.add(na) wf.map_node(mapper="a", inputs={"a": [3, 5]}) wf.add(nb) - wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) - wf.connect("NA", "out", "NB", "a") + wf.map_node(mapper=("NA.a", "c"), inputs={"c": [2, 1]}) + wf.connect("NA", "out", "NB", "b") sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -649,7 +630,7 @@ def test_workflow_6(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -663,18 +644,16 @@ def test_workflow_6(plugin, change_dir): def test_workflow_6a(plugin, change_dir): """using a map method for two nodes (specifying the node)""" wf = Workflow(name="wf6a", workingdir="test_wf6a_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # using the map method after add (specifying the node) wf.add(na) wf.add(nb) wf.map_node(mapper="a", inputs={"a": [3, 5]}, node=na) - # TODO: should we se ("a", "c") instead?? shold I forget "NA.a" value? - wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}, node=nb) - wf.connect("NA", "out", "NB", "a") + # TODO: should we se ("b", "c") instead?? shold I forget "NA.a" value? + wf.map_node(mapper=("NA.a", "c"), inputs={"c": [2, 1]}, node=nb) + wf.connect("NA", "out", "NB", "b") sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -688,7 +667,7 @@ def test_workflow_6a(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -702,16 +681,13 @@ def test_workflow_6a(plugin, change_dir): def test_workflow_6b(plugin, change_dir): """using a map method for two nodes (specifying the node), using kwarg arg instead of connect""" wf = Workflow(name="wf6b", workingdir="test_wf6b_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") - - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) wf.add(na) - wf.add(nb, a="NA.out") + wf.add(nb, b="NA.out") wf.map_node(mapper="a", inputs={"a": [3, 5]}, node=na) - wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}, node=nb) + wf.map_node(mapper=("NA.a", "c"), inputs={"c": [2, 1]}, node=nb) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -725,7 +701,7 @@ def test_workflow_6b(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -743,8 +719,7 @@ def test_workflow_7(plugin, change_dir): """using inputs for workflow and connect_workflow""" # adding inputs to the workflow directly wf = Workflow(name="wf7", inputs={"wfa": [3, 5]}, workingdir="test_wf7_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) wf.add(na) # connecting the node with inputs from the workflow @@ -769,8 +744,7 @@ def test_workflow_7(plugin, change_dir): def test_workflow_7a(plugin, change_dir): """using inputs for workflow and kwarg arg in add (instead of connect)""" wf = Workflow(name="wf7a", inputs={"wfa": [3, 5]}, workingdir="test_wf7a_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) # using kwrg argument in the add method (instead of connect or connect_wf_input wf.add(na, a="wfa") wf.map_node(mapper="a") @@ -792,17 +766,14 @@ def test_workflow_7a(plugin, change_dir): @python35_only def test_workflow_8(plugin, change_dir): """using inputs for workflow and connect_wf_input for the second node""" - wf = Workflow(name="wf8", workingdir="test_wf8_{}".format(plugin), inputs={"b": 10}) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + wf = Workflow(name="wf8", workingdir="test_wf8_{}".format(plugin), inputs={"c": 10}) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) - - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) wf.add_nodes([na, nb]) - wf.connect("NA", "out", "NB", "a") - wf.connect_wf_input("b", "NB", "b") + wf.connect("NA", "out", "NB", "b") + wf.connect_wf_input("c", "NB", "c") assert wf.nodes[0].mapper == "NA.a" sub = Submitter(runnable=wf, plugin=plugin) @@ -817,7 +788,7 @@ def test_workflow_8(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 10}, 15), ({"NA.a": 5, "NB.b": 10}, 17)] + expected_B = [({"NA.a": 3, "NB.c": 10}, 15), ({"NA.a": 5, "NB.c": 10}, 17)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -834,15 +805,11 @@ def test_workflow_8(plugin, change_dir): def test_workflow_9(plugin, change_dir): """using add(interface) method and mapper from previous nodes""" wf = Workflow(name="wf9", workingdir="test_wf9_{}".format(plugin)) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - wf.add( - name="NA", runnable=interf_addtwo, workingdir="na").map_node( - mapper="a", inputs={"a": [3, 5]}) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) + wf.add(name="NA", runnable=interf_addtwo, workingdir="na", + output_names=["out"]).map_node(mapper="a", inputs={"a": [3, 5]}) # _NA means that I'm using mapper from the NA node, it's the same as ("NA.a", "b") - wf.add( - name="NB", runnable=interf_addvar, workingdir="nb", a="NA.out").map_node( - mapper=("_NA", "b"), inputs={"b": [2, 1]}) + wf.add(name="NB", runnable=interf_addvar, workingdir="nb", b="NA.out", + output_names=["out"]).map_node(mapper=("_NA", "c"), inputs={"c": [2, 1]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -856,7 +823,7 @@ def test_workflow_9(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -870,24 +837,18 @@ def test_workflow_9(plugin, change_dir): def test_workflow_10(plugin, change_dir): """using add(interface) method and scalar mapper from previous nodes""" wf = Workflow(name="wf10", workingdir="test_wf10_{}".format(plugin)) - interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add( - name="NA", runnable=interf_addvar1, workingdir="na").map_node( - mapper=("a", "b"), inputs={ - "a": [3, 5], - "b": [0, 10] - }) - interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) + wf.add(name="NA", runnable=interf_addvar, workingdir="na", + output_names=["out"]).map_node( + mapper=("b", "c"), inputs={"b": [3, 5], "c": [0, 10]}) # _NA means that I'm using mapper from the NA node, it's the same as (("NA.a", NA.b), "b") - wf.add( - name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node( - mapper=("_NA", "b"), inputs={"b": [2, 1]}) + wf.add(name="NB", runnable=interf_addvar, workingdir="nb", b="NA.out", + output_names=["out"]).map_node(mapper=("_NA", "c"), inputs={"c": [2, 1]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() sub.close() - expected = [({"NA.a": 3, "NA.b": 0}, 3), ({"NA.a": 5, "NA.b": 10}, 15)] + expected = [({"NA.b": 3, "NA.c": 0}, 3), ({"NA.b": 5, "NA.c": 10}, 15)] key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -895,7 +856,7 @@ def test_workflow_10(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NA.b": 0, "NB.b": 2}, 5), ({"NA.a": 5, "NA.b": 10, "NB.b": 1}, 16)] + expected_B = [({"NA.b": 3, "NA.c": 0, "NB.c": 2}, 5), ({"NA.b": 5, "NA.c": 10, "NB.c": 1}, 16)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -909,36 +870,20 @@ def test_workflow_10(plugin, change_dir): def test_workflow_10a(plugin, change_dir): """using add(interface) method and vector mapper from previous nodes""" wf = Workflow(name="wf10a", workingdir="test_wf10a_{}".format(plugin)) - interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add( - name="NA", runnable=interf_addvar1, workingdir="na").map_node( - mapper=["a", "b"], inputs={ - "a": [3, 5], - "b": [0, 10] - }) - interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) + wf.add(name="NA", runnable=interf_addvar, workingdir="na", + output_names=["out"]).map_node( + mapper=["b", "c"], inputs={"b": [3, 5], "c": [0, 10]}) # _NA means that I'm using mapper from the NA node, it's the same as (["NA.a", NA.b], "b") - wf.add( - name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node( - mapper=("_NA", "b"), inputs={"b": [[2, 1], [0, 0]]}) + wf.add(name="NB", runnable=interf_addvar, workingdir="nb", b="NA.out", + output_names=["out"]).map_node( + mapper=("_NA", "c"), inputs={"c": [[2, 1], [0, 0]]}) sub = Submitter(runnable=wf, plugin=plugin) sub.run() sub.close() - expected = [({ - "NA.a": 3, - "NA.b": 0 - }, 3), ({ - "NA.a": 3, - "NA.b": 10 - }, 13), ({ - "NA.a": 5, - "NA.b": 0 - }, 5), ({ - "NA.a": 5, - "NA.b": 10 - }, 15)] + expected = [({"NA.b": 3, "NA.c": 0}, 3), ({"NA.b": 3, "NA.c": 10}, 13), + ({"NA.b": 5, "NA.c": 0}, 5), ({"NA.b": 5, "NA.c": 10}, 15)] key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -946,23 +891,10 @@ def test_workflow_10a(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_B = [({ - "NA.a": 3, - "NA.b": 0, - "NB.b": 2 - }, 5), ({ - "NA.a": 3, - "NA.b": 10, - "NB.b": 1 - }, 14), ({ - "NA.a": 5, - "NA.b": 0, - "NB.b": 0 - }, 5), ({ - "NA.a": 5, - "NA.b": 10, - "NB.b": 0 - }, 15)] + expected_B = [({"NA.b": 3, "NA.c": 0, "NB.c": 2}, 5), + ({"NA.b": 3, "NA.c": 10, "NB.c": 1}, 14), + ({"NA.b": 5, "NA.c": 0, "NB.c": 0}, 5), + ({"NA.b": 5, "NA.c": 10, "NB.c": 0}, 15)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -971,33 +903,26 @@ def test_workflow_10a(plugin, change_dir): assert wf.nodes[1].result["out"][i][1] == res[1] +# TODO: this test started sometimes failing for mp and cf @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_11(plugin, change_dir): """using add(interface) method and vector mapper from previous two nodes""" wf = Workflow(name="wf11", workingdir="test_wf11_{}".format(plugin)) - interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) - wf.add( - name="NA", runnable=interf_addvar1, workingdir="na").map_node( - mapper=("a", "b"), inputs={ - "a": [3, 5], - "b": [0, 10] - }) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - wf.add( - name="NB", runnable=interf_addtwo, workingdir="nb").map_node( - mapper="a", inputs={"a": [2, 1]}) - interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) + wf.add(name="NA", runnable=interf_addvar, workingdir="na", + output_names=["out"]).map_node( + mapper=("b", "c"), inputs={"b": [3, 5],"c": [0, 10]}) + wf.add(name="NB", runnable=interf_addtwo, workingdir="nb", + output_names=["out"]).map_node(mapper="a", inputs={"a": [2, 1]}) # _NA, _NB means that I'm using mappers from the NA/NB nodes, it's the same as [("NA.a", NA.b), "NB.a"] - wf.add( - name="NC", runnable=interf_addvar2, workingdir="nc", a="NA.out", b="NB.out").map_node( - mapper=["_NA", "_NB"]) # TODO: this should eb default? + wf.add(name="NC", runnable=interf_addvar, workingdir="nc", b="NA.out", c="NB.out", + output_names=["out"]).map_node(mapper=["_NA", "_NB"]) # TODO: this should eb default? sub = Submitter(runnable=wf, plugin=plugin) sub.run() sub.close() - expected = [({"NA.a": 3, "NA.b": 0}, 3), ({"NA.a": 5, "NA.b": 10}, 15)] + expected = [({"NA.b": 3, "NA.c": 0}, 3), ({"NA.b": 5, "NA.c": 10}, 15)] key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -1005,23 +930,10 @@ def test_workflow_11(plugin, change_dir): assert wf.nodes[0].result["out"][i][0] == res[0] assert wf.nodes[0].result["out"][i][1] == res[1] - expected_C = [({ - "NA.a": 3, - "NA.b": 0, - "NB.a": 1 - }, 6), ({ - "NA.a": 3, - "NA.b": 0, - "NB.a": 2 - }, 7), ({ - "NA.a": 5, - "NA.b": 10, - "NB.a": 1 - }, 18), ({ - "NA.a": 5, - "NA.b": 10, - "NB.a": 2 - }, 19)] + expected_C = [({"NA.b": 3,"NA.c": 0,"NB.a": 1}, 6), + ({"NA.b": 3,"NA.c": 0,"NB.a": 2}, 7), + ({"NA.b": 5,"NA.c": 10,"NB.a": 1}, 18), + ({"NA.b": 5,"NA.c": 10,"NB.a": 2}, 19)] key_sort = list(expected_C[0][0].keys()) expected_C.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.nodes[2].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -1037,21 +949,16 @@ def test_workflow_11(plugin, change_dir): @python35_only def test_workflow_12(plugin, change_dir): """testing if wf.result works (the same workflow as in test_workflow_6)""" - wf = Workflow( - name="wf12", - workingdir="test_wf12_{}".format(plugin), + wf = Workflow(name="wf12", workingdir="test_wf12_{}".format(plugin), wf_output_names=[("NA", "out", "NA_out"), ("NB", "out")]) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") - - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # using the map methods after add (using mapper for the last added nodes as default) wf.add(na) wf.map_node(mapper="a", inputs={"a": [3, 5]}) wf.add(nb) - wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) - wf.connect("NA", "out", "NB", "a") + wf.map_node(mapper=("NA.a", "c"), inputs={"c": [2, 1]}) + wf.connect("NA", "out", "NB", "b") sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -1066,13 +973,13 @@ def test_workflow_12(plugin, change_dir): key_sort = list(expected[0][0].keys()) expected.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.result["NA_out"].sort(key=lambda t: [t[0][key] for key in key_sort]) - #pdb.set_trace() + assert wf.is_complete for i, res in enumerate(expected): assert wf.result["NA_out"][i][0] == res[0] assert wf.result["NA_out"][i][1] == res[1] - expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + expected_B = [({"NA.a": 3, "NB.c": 2}, 7), ({"NA.a": 5, "NB.c": 1}, 8)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -1085,21 +992,16 @@ def test_workflow_12(plugin, change_dir): @python35_only def test_workflow_12a(plugin, change_dir): """testing if wf.result raises exceptione (the same workflow as in test_workflow_6)""" - wf = Workflow( - name="wf12a", - workingdir="test_wf12a_{}".format(plugin), + wf = Workflow(name="wf12a",workingdir="test_wf12a_{}".format(plugin), wf_output_names=[("NA", "out", "wf_out"), ("NB", "out", "wf_out")]) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") - - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", output_names=["out"]) # using the map methods after add (using mapper for the last added nodes as default) wf.add(na) wf.map_node(mapper="a", inputs={"a": [3, 5]}) wf.add(nb) - wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) - wf.connect("NA", "out", "NB", "a") + wf.map_node(mapper=("NA.a", "c"), inputs={"c": [2, 1]}) + wf.connect("NA", "out", "NB", "b") sub = Submitter(runnable=wf, plugin=plugin) # wf_out can't be used twice @@ -1110,19 +1012,14 @@ def test_workflow_12a(plugin, change_dir): # tests for a workflow that have its own input and mapper - @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_13(plugin, change_dir): """using inputs for workflow and connect_wf_input""" - wf = Workflow( - name="wf13", - inputs={"wfa": [3, 5]}, - mapper="wfa", + wf = Workflow(name="wf13", inputs={"wfa": [3, 5]},mapper="wfa", workingdir="test_wf13_{}".format(plugin), wf_output_names=[("NA", "out", "NA_out")]) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) wf.add(na) wf.connect_wf_input("wfa", "NA", "a") @@ -1142,57 +1039,64 @@ def test_workflow_13(plugin, change_dir): @python35_only def test_workflow_13a(plugin, change_dir): """using inputs for workflow and connect_wf_input (the node has 2 inputs)""" - wf = Workflow( - name="wf13a", - inputs={"wfa": [3, 5]}, - mapper="wfa", + wf = Workflow(name="wf13a", inputs={"wfa": [3, 5]}, mapper="wfa", workingdir="test_wf13a_{}".format(plugin), wf_output_names=[("NA", "out", "NA_out")]) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - na = Node( - name="NA", interface=interf_addvar, workingdir="na", mapper="b", inputs={"b": [10, 20]}) + na = Node(name="NA", interface=interf_addvar, workingdir="na", mapper="c", + inputs={"c": [10, 20]}, output_names=["out"]) wf.add(na) - wf.connect_wf_input("wfa", "NA", "a") + wf.connect_wf_input("wfa", "NA", "b") sub = Submitter(runnable=wf, plugin=plugin) sub.run() sub.close() assert wf.is_complete - expected = [({ - "wf13a.wfa": 3 - }, [({ - "NA.a": 3, - "NA.b": 10 - }, 13), ({ - "NA.a": 3, - "NA.b": 20 - }, 23)]), ({ - 'wf13a.wfa': 5 - }, [({ - "NA.a": 5, - "NA.b": 10 - }, 15), ({ - "NA.a": 5, - "NA.b": 20 - }, 25)])] + expected = [({"wf13a.wfa": 3}, [({"NA.b": 3, "NA.c": 10}, 13), + ({"NA.b": 3, "NA.c": 20}, 23)]), + ({'wf13a.wfa': 5}, [({"NA.b": 5, "NA.c": 10}, 15), + ({"NA.b": 5, "NA.c": 20}, 25)])] for i, res in enumerate(expected): assert wf.result["NA_out"][i][0] == res[0] for j in range(len(res[1])): assert wf.result["NA_out"][i][1][j][0] == res[1][j][0] assert wf.result["NA_out"][i][1][j][1] == res[1][j][1] +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_13b(plugin, change_dir): + """using inputs for workflow and connect_wf_input, using wf.map(mapper)""" + wf = Workflow(name="wf13b", inputs={"wfa": [3, 5]}, + workingdir="test_wf13b_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", + output_names=["out"]) + wf.add(na).map(mapper="wfa") + wf.connect_wf_input("wfa", "NA", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"wf13b.wfa": 3}, [({"NA.a": 3}, 5)]), + ({'wf13b.wfa': 5}, [({"NA.a": 5}, 7)])] + for i, res in enumerate(expected): + assert wf.result["NA_out"][i][0] == res[0] + assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] + assert wf.result["NA_out"][i][1][0][1] == res[1][0][1] + + + @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_13c(plugin, change_dir): """using inputs for workflow and connect_wf_input, using wf.map(mapper, inputs)""" - wf = Workflow( - name="wf13c", - workingdir="test_wf13c_{}".format(plugin), + wf = Workflow(name="wf13c", workingdir="test_wf13c_{}".format(plugin), wf_output_names=[("NA", "out", "NA_out")]) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", + output_names=["out"]) wf.add(na).map(mapper="wfa", inputs={"wfa": [3, 5]}) wf.connect_wf_input("wfa", "NA", "a") @@ -1207,48 +1111,20 @@ def test_workflow_13c(plugin, change_dir): assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] assert wf.result["NA_out"][i][1][0][1] == res[1][0][1] - @pytest.mark.parametrize("plugin", Plugins) - @python35_only - def test_workflow_13b(plugin, change_dir): - """using inputs for workflow and connect_wf_input, using wf.map(mapper)""" - wf = Workflow( - name="wf13b", - inputs={"wfa": [3, 5]}, - workingdir="test_wf13b_{}".format(plugin), - wf_output_names=[("NA", "out", "NA_out")]) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") - wf.add(na).map(mapper="wfa") - wf.connect_wf_input("wfa", "NA", "a") - - sub = Submitter(runnable=wf, plugin=plugin) - sub.run() - sub.close() - - assert wf.is_complete - expected = [({"wf13b.wfa": 3}, [({"NA.a": 3}, 5)]), ({'wf13b.wfa': 5}, [({"NA.a": 5}, 7)])] - for i, res in enumerate(expected): - assert wf.result["NA_out"][i][0] == res[0] - assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] - assert wf.result["NA_out"][i][1][0][1] == res[1][0][1] - # workflow as a node - @pytest.mark.parametrize("plugin", Plugins) @python35_only def test_workflow_14(plugin, change_dir): """workflow with a workflow as a node (no mapper)""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}, + output_names=["out"]) wfa = Workflow(name="wfa", workingdir="test_wfa", wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) - wf = Workflow( - name="wf14", - workingdir="test_wf14_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf = Workflow(name="wf14", workingdir="test_wf14_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) wf.add(wfa) sub = Submitter(runnable=wf, plugin=plugin) @@ -1266,20 +1142,15 @@ def test_workflow_14(plugin, change_dir): @python35_only def test_workflow_14a(plugin, change_dir): """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa)""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") - wfa = Workflow( - name="wfa", - workingdir="test_wfa", - inputs={"a": 3}, - wf_output_names=[("NA", "out", "NA_out")]) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", + output_names=["out"]) + wfa = Workflow(name="wfa", workingdir="test_wfa", inputs={"a": 3}, + wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) wfa.connect_wf_input("a", "NA", "a") - wf = Workflow( - name="wf14a", - workingdir="test_wf14a_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf = Workflow(name="wf14a", workingdir="test_wf14a_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) wf.add(wfa) sub = Submitter(runnable=wf, plugin=plugin) @@ -1297,17 +1168,13 @@ def test_workflow_14a(plugin, change_dir): @python35_only def test_workflow_14b(plugin, change_dir): """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa and wf)""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", output_names=["out"]) wfa = Workflow(name="wfa", workingdir="test_wfa", wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) wfa.connect_wf_input("a", "NA", "a") - wf = Workflow( - name="wf14b", - workingdir="test_wf14b_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")], - inputs={"a": 3}) + wf = Workflow(name="wf14b", workingdir="test_wf14b_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")], inputs={"a": 3}) wf.add(wfa) wf.connect_wf_input("a", "wfa", "a") @@ -1326,16 +1193,13 @@ def test_workflow_14b(plugin, change_dir): @python35_only def test_workflow_15(plugin, change_dir): """workflow with a workflow as a node with mapper (like 14 but with a mapper)""" - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node( - name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": [3, 5]}, mapper="a") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", + inputs={"a": [3, 5]}, mapper="a", output_names=["out"]) wfa = Workflow(name="wfa", workingdir="test_wfa", wf_output_names=[("NA", "out", "NA_out")]) wfa.add(na) - wf = Workflow( - name="wf15", - workingdir="test_wf15_{}".format(plugin), - wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf = Workflow(name="wf15", workingdir="test_wf15_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) wf.add(wfa) sub = Submitter(runnable=wf, plugin=plugin) @@ -1353,28 +1217,23 @@ def test_workflow_15(plugin, change_dir): @python35_only def test_workflow_16(plugin, change_dir): """workflow with two nodes, and one is a workflow (no mapper)""" - wf = Workflow( - name="wf16", - workingdir="test_wf16_{}".format(plugin), - wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}) + wf = Workflow(name="wf16", workingdir="test_wf16_{}".format(plugin), + wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) + na = Node(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}, + output_names=["out"]) wf.add(na) # the second node does not have explicit mapper (but keeps the mapper from the NA node) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") - wfb = Workflow( - name="wfb", - workingdir="test_wfb", - inputs={"b": 10}, - wf_output_names=[("NB", "out", "NB_out")]) + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", + output_names=["out"]) + wfb = Workflow(name="wfb", workingdir="test_wfb", inputs={"c": 10}, + wf_output_names=[("NB", "out", "NB_out")]) wfb.add(nb) wfb.connect_wf_input("b", "NB", "b") - wfb.connect_wf_input("a", "NB", "a") + wfb.connect_wf_input("c", "NB", "c") wf.add(wfb) - wf.connect("NA", "out", "wfb", "a") + wf.connect("NA", "out", "wfb", "b") sub = Submitter(runnable=wf, plugin=plugin) sub.run() @@ -1386,9 +1245,9 @@ def test_workflow_16(plugin, change_dir): assert wf.result["NA_out"][i][0] == res[0] assert wf.result["NA_out"][i][1] == res[1] - # TODO: the naming rememebrs only the node, doesnt remember that a came from NA... + # TODO (res): the naming rememebrs only the node, doesnt remember that a came from NA... # the naming should have names with workflows?? - expected_B = [({"NB.a": 5, "NB.b": 10}, 15)] + expected_B = [({"NB.b": 5, "NB.c": 10}, 15)] for i, res in enumerate(expected_B): assert wf.result["NB_out"][i][0] == res[0] assert wf.result["NB_out"][i][1] == res[1] @@ -1398,30 +1257,24 @@ def test_workflow_16(plugin, change_dir): @python35_only def test_workflow_16a(plugin, change_dir): """workflow with two nodes, and one is a workflow (with mapper)""" - wf = Workflow( - name="wf16a", - workingdir="test_wf16a_{}".format(plugin), + wf = Workflow(name="wf16a", workingdir="test_wf16a_{}".format(plugin), wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) - interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) - na = Node(name="NA", interface=interf_addtwo, workingdir="na") + na = Node(name="NA", interface=interf_addtwo, workingdir="na", + output_names=["out"]) na.map(mapper="a", inputs={"a": [3, 5]}) wf.add(na) - # the second node does not have explicit mapper (but keeps the mapper from the NA node) - interf_addvar = FunctionInterface(fun_addvar, ["out"]) - nb = Node(name="NB", interface=interf_addvar, workingdir="nb") - wfb = Workflow( - name="wfb", - workingdir="test_wfb", - inputs={"b": 10}, + nb = Node(name="NB", interface=interf_addvar, workingdir="nb", + output_names=["out"]) + wfb = Workflow(name="wfb", workingdir="test_wfb", inputs={"c": 10}, wf_output_names=[("NB", "out", "NB_out")]) wfb.add(nb) wfb.connect_wf_input("b", "NB", "b") - wfb.connect_wf_input("a", "NB", "a") + wfb.connect_wf_input("c", "NB", "c") # adding 2 nodes and create a connection (as it is now) wf.add(wfb) - wf.connect("NA", "out", "wfb", "a") + wf.connect("NA", "out", "wfb", "b") assert wf.nodes[0].mapper == "NA.a" sub = Submitter(runnable=wf, plugin=plugin) @@ -1435,9 +1288,9 @@ def test_workflow_16a(plugin, change_dir): assert wf.result["NA_out"][i][0] == res[0] assert wf.result["NA_out"][i][1] == res[1] - # TODO: the naming rememebrs only the node, doesnt remember that a came from NA... + # TODO (res): the naming rememebrs only the node, doesnt remember that came from NA... # the naming should have names with workflows?? - expected_B = [({"NB.a": 5, "NB.b": 10}, 15), ({"NB.a": 7, "NB.b": 10}, 17)] + expected_B = [({"NB.b": 5, "NB.c": 10}, 15), ({"NB.b": 7, "NB.c": 10}, 17)] key_sort = list(expected_B[0][0].keys()) expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) wf.result["NB_out"].sort(key=lambda t: [t[0][key] for key in key_sort]) @@ -1447,7 +1300,11 @@ def test_workflow_16a(plugin, change_dir): # testing CurrentInterface that is a temporary wrapper for current interfaces - +T1_file = "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz" +T1_file_list = [ + "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", + "/Users/dorota/nipype_workshop/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz" + ] @pytest.mark.skipif(not DS114_DIR.exists(), reason="Missing $PYDRA_TEST_DATA/ds000114") @pytest.mark.parametrize("plugin", Plugins) @@ -1455,7 +1312,6 @@ def test_workflow_16a(plugin, change_dir): def test_current_node_1(change_dir, plugin): """Node with a current interface and inputs, no mapper, running interface""" interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") - nn = Node( name="NA", inputs={"in_file": str(DS114_DIR / "sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz")}, @@ -1466,7 +1322,7 @@ def test_current_node_1(change_dir, plugin): sub = Submitter(plugin=plugin, runnable=nn) sub.run() sub.close() - # TODO: nodes only returns relative path + # TODO (res): nodes only returns relative path assert "out_file" in nn.output.keys() diff --git a/pydra/engine/tests/test_node_neuro.py b/pydra/engine/tests/test_node_neuro.py index 2043bf5fb1..01637f952f 100644 --- a/pydra/engine/tests/test_node_neuro.py +++ b/pydra/engine/tests/test_node_neuro.py @@ -18,7 +18,7 @@ @pytest.fixture() def change_dir(request): orig_dir = os.getcwd() - test_dir = os.path.join(orig_dir, "/pydra/engine/test_neuro") + test_dir = os.path.join(orig_dir, "/pydra/pydra/engine/test_neuro") os.makedirs(test_dir, exist_ok=True) os.chdir(test_dir) @@ -34,8 +34,7 @@ def move2orig(): DEFAULT_MEMORY_MIN_GB = None # TODO, adding fields to Inputs (subject_id) Inputs = { - "subject_id": - "sub-01", + "subject_id": "sub-01", "output_spaces": ["fsaverage", "fsaverage5"], "source_file": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/func_preproc_ses_test_task_fingerfootlips_wf/bold_t1_trans_wf/merge/vol0000_xform-00000_merged.nii", @@ -68,11 +67,8 @@ def test_neuro(change_dir, plugin): # #dj: why do I need outputs? - wf = Workflow( - name=Name, - inputs=Inputs, - workingdir="test_neuro_{}".format(plugin), - write_state=False, + wf = Workflow(name=Name, inputs=Inputs, + workingdir="test_neuro_{}".format(plugin), write_state=False, wf_output_names=[("sampler", "out_file", "sampler_out"), ("targets", "out", "target_out")]) # @interface @@ -86,7 +82,8 @@ def test_neuro(change_dir, plugin): #dj: don't have option in map to connect with wf input - wf.add(runnable=select_target, name="targets", subject_id="subject_id", output_names=["out"], + wf.add(runnable=select_target, name="targets", subject_id="subject_id", + input_names=["subject_id", "space"], output_names=["out"], out_read=True, write_state=False)\ .map_node(mapper="space", inputs={"space": [space for space in Inputs["output_spaces"] if space.startswith("fs")]}) @@ -117,8 +114,7 @@ def test_neuro(change_dir, plugin): wf.add(name='resampling_xfm', runnable=fs.utils.LTAConvert(in_lta='identity.nofile', out_lta=True), source_file="source_file", target_file="t1_preproc", - output_names=["out_lta"], - write_state=False)\ + output_names=["out_lta"], write_state=False)\ .add(name='set_xfm_source', runnable=ConcatenateLTA(out_type='RAS2RAS'), in_lta2="t1_2_fsnative_forward_transform", in_lta1="resampling_xfm.out_lta", output_names=["out_file"], write_state=False)