Skip to content

Commit

Permalink
Merge pull request #20 from djarecka/removing_newinterface
Browse files Browse the repository at this point in the history
Removing newinterface
  • Loading branch information
effigies authored Oct 16, 2018
2 parents 552bac5 + 22258c5 commit 4c528b0
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 560 deletions.
45 changes: 0 additions & 45 deletions pydra/engine/auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,51 +200,6 @@ def _add_name(mlist, name):
return mlist


#Function interface


class FunctionInterface(object):
""" A new function interface """

def __init__(self, function, output_nm, out_read=False, input_map=None):
self.function = function
if type(output_nm) is list:
self._output_nm = output_nm
else:
raise Exception("output_nm should be a list")
if not input_map:
self.input_map = {}
# TODO use signature
for key in inspect.getargspec(function)[0]:
if key not in self.input_map.keys():
self.input_map[key] = key
# flags if we want to read the txt file to save in node.output
self.out_read = out_read

def run(self, input):
self.output = {}
if self.input_map:
for (key_fun, key_inp) in self.input_map.items():
try:
input[key_fun] = input.pop(key_inp)
except KeyError:
raise Exception("no {} in the input dictionary".format(key_inp))
fun_output = self.function(**input)
logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output))
if type(fun_output) is tuple:
if len(self._output_nm) == len(fun_output):
for i, out in enumerate(fun_output):
self.output[self._output_nm[i]] = out
else:
raise Exception("length of output_nm doesnt match length of the function output")
elif len(self._output_nm) == 1:
self.output[self._output_nm[0]] = fun_output
else:
raise Exception("output_nm doesnt match length of the function output")

return fun_output


# want to use to access input as dot,
# but it doesnt work since im using "." within names (using my old syntax with - also cant work)
# https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary
Expand Down
151 changes: 41 additions & 110 deletions pydra/engine/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np

from nipype.utils.filemanip import loadpkl
from nipype import logging
from nipype import logging, Function

from . import state
from . import auxiliary as aux
Expand Down Expand Up @@ -164,23 +164,14 @@ def get_input_el(self, ind):
])
if not from_node.mapper:
dir_nm_el_from = ""

if is_node(from_node) and is_current_interface(from_node.interface):
file_from = self._reading_ci_output(
# TODO: do I need this if, what if this is wf?
if is_node(from_node):
out_from = self._reading_ci_output(
node=from_node, dir_nm_el=dir_nm_el_from, out_nm=from_socket)
if file_from and os.path.exists(file_from):
inputs_dict["{}.{}".format(self.name, to_socket)] = file_from
if out_from:
inputs_dict["{}.{}".format(self.name, to_socket)] = out_from
else:
raise Exception("{} doesnt exist".format(file_from))
else: # assuming here that I want to read the file (will not be used with the current interfaces)
file_from = os.path.join(from_node.workingdir, dir_nm_el_from,
from_socket + ".txt")
with open(file_from) as f:
content = f.readline()
try:
inputs_dict["{}.{}".format(self.name, to_socket)] = eval(content)
except NameError:
inputs_dict["{}.{}".format(self.name, to_socket)] = content
raise Exception("output from {} doesnt exist".format(from_node))

return state_dict, inputs_dict

Expand All @@ -191,11 +182,10 @@ def _reading_ci_output(self, dir_nm_el, out_nm, node=None):
result_pklfile = os.path.join(os.getcwd(), node.workingdir, dir_nm_el,
node.interface.nn.name, "result_{}.pklz".format(
node.interface.nn.name))
if os.path.exists(result_pklfile):
out_file = getattr(loadpkl(result_pklfile).outputs, out_nm)
if os.path.exists(out_file):
return out_file

if os.path.exists(result_pklfile) and os.stat(result_pklfile).st_size > 0:
out = getattr(loadpkl(result_pklfile).outputs, out_nm)
if out:
return out
return False

# checking if all outputs are saved
Expand Down Expand Up @@ -253,15 +243,8 @@ def __init__(self,
self.workingdir = workingdir
self.interface = interface

if is_function_interface(self.interface):
# adding node name to the interface's name mapping
self.interface.input_map = dict((key, "{}.{}".format(self.name, value))
for (key, value) in self.interface.input_map.items())
# list of output names taken from interface output name
self.output_names = self.interface._output_nm
elif is_current_interface(self.interface):
# list of interf_key_out
self.output_names = output_names
# list of interf_key_out
self.output_names = output_names
if not self.output_names:
self.output_names = []

Expand Down Expand Up @@ -293,19 +276,12 @@ def run_interface_el(self, i, ind):
print("Run interface el, dict={}".format(state_dict))
logger.debug("Run interface el, name={}, inputs_dict={}, state_dict={}".format(
self.name, inputs_dict, state_dict))
if is_function_interface(self.interface):
res = self.interface.run(inputs_dict)
output = self.interface.output
print("Run fun interface el, output={}".format(output))
logger.debug("Run fun interface el, output={}".format(output))
self._writting_results_tmp(state_dict, dir_nm_el, output)
elif is_current_interface(self.interface):
if not self.mapper:
dir_nm_el = ""
res = self.interface.run(
inputs=inputs_dict,
base_dir=os.path.join(os.getcwd(), self.workingdir),
dir_nm_el=dir_nm_el)
if not self.mapper:
dir_nm_el = ""
res = self.interface.run(
inputs=inputs_dict,
base_dir=os.path.join(os.getcwd(), self.workingdir),
dir_nm_el=dir_nm_el)

# TODO when join
#if self._joinByKey:
Expand All @@ -317,14 +293,6 @@ def run_interface_el(self, i, ind):
# dir_nm_el = os.path.join(dir_join, dir_nm_el)
return res

def _writting_results_tmp(self, state_dict, dir_nm_el, output):
"""temporary method to write the results in the files (this is usually part of a interface)"""
if not self.mapper:
dir_nm_el = ''
os.makedirs(os.path.join(self.workingdir, dir_nm_el), exist_ok=True)
for key_out, val_out in output.items():
with open(os.path.join(self.workingdir, dir_nm_el, key_out + ".txt"), "w") as fout:
fout.write(str(val_out))

def get_output(self):
"""collecting all outputs and updating self._output"""
Expand All @@ -337,32 +305,11 @@ def get_output(self):
state_dict = self.state.state_ind(ind)
dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items())])
if self.mapper:
if is_function_interface(self.interface):
output = os.path.join(self.workingdir, dir_nm_el, key_out + ".txt")
if self.interface.out_read:
with open(output) as fout:
content = fout.readline()
try:
output = eval(content)
except NameError:
output = content
self._output[key_out][dir_nm_el] = (state_dict, output)
elif is_current_interface(self.interface):
self._output[key_out][dir_nm_el] = \
(state_dict, (state_dict, self._reading_ci_output(dir_nm_el=dir_nm_el, out_nm=key_out)))
self._output[key_out][dir_nm_el] = \
(state_dict, self._reading_ci_output(dir_nm_el=dir_nm_el, out_nm=key_out))
else:
if is_function_interface(self.interface):
output = os.path.join(self.workingdir, key_out + ".txt")
if self.interface.out_read:
with open(output) as fout:
try:
output = eval(fout.readline())
except Workflow:
output = fout.readline()
self._output[key_out] = (state_dict, output)
elif is_current_interface(self.interface):
self._output[key_out] = \
(state_dict, self._reading_ci_output(dir_nm_el="", out_nm=key_out))
self._output[key_out] = \
(state_dict, self._reading_ci_output(dir_nm_el="", out_nm=key_out))
return self._output

# dj: version without join
Expand All @@ -378,13 +325,8 @@ def _check_all_results(self):
dir_nm_el = ""

for key_out in self.output_names:
if is_function_interface(self.interface):
if not os.path.isfile(
os.path.join(self.workingdir, dir_nm_el, key_out + ".txt")):
return False
elif is_current_interface(self.interface):
if not self._reading_ci_output(dir_nm_el, key_out):
return False
if not self._reading_ci_output(dir_nm_el, key_out):
return False
self._is_complete = True
return True

Expand All @@ -394,18 +336,15 @@ def _reading_results(self):
"""
for key_out in self.output_names:
self._result[key_out] = []
#pdb.set_trace()
if self._state_inputs:
val_l = self._dict_tuple2list(self._output[key_out])
for (st_dict, filename) in val_l:
with open(filename) as fout:
self._result[key_out].append((st_dict, eval(fout.readline())))
for (st_dict, out) in val_l:
self._result[key_out].append((st_dict, out))
else:
# st_dict should be {}
# not sure if this is used (not tested)
(st_dict, filename) = self._output[key_out][None]
with open(filename) as fout:
self._result[key_out].append(({}, eval(fout.readline())))
(st_dict, out) = self._output[key_out][None]
self._result[key_out].append(({}, out))

# dj: removing temp. from Node class
# def run(self, plugin="serial"):
Expand Down Expand Up @@ -559,22 +498,13 @@ def _reading_results(self):
res_l = []
val_l = self._dict_tuple2list(self.output[key_out][dir_nm_el])
for val in val_l:
with open(val[1]) as fout:
logger.debug('Reading Results: file={}, st_dict={}'.format(
val[1], val[0]))
res_l.append((val[0], eval(fout.readline())))
res_l.append(val)
self._result[key_out].append((wf_inputs_dict, res_l))
else:
val_l = self._dict_tuple2list(self.output[key_out])
for val in val_l:
#TODO: I think that val shouldn't be dict here...
# TMP solution
if type(val) is dict:
val = [v for k, v in val.items()][0]
with open(val[1]) as fout:
logger.debug('Reading Results: file={}, st_dict={}'.format(
val[1], val[0]))
self._result[key_out].append((val[0], eval(fout.readline())))
self._result[key_out].append(val)


def add_nodes(self, nodes):
"""adding nodes without defining connections
Expand All @@ -594,6 +524,7 @@ def add(self,
name=None,
workingdir=None,
inputs=None,
input_names=None,
output_names=None,
mapper=None,
write_state=True,
Expand All @@ -602,10 +533,13 @@ def add(self,
if is_function(runnable):
if not output_names:
output_names = ["out"]
interface = aux.FunctionInterface(
function=runnable, output_nm=output_names, out_read=out_read)
if input_names is None:
raise Exception("you need to specify input_names")
if not name:
raise Exception("you have to specify name for the node")
nipype1_interf = Function(function=runnable, input_names=input_names,
output_names=output_names)
interface = aux.CurrentInterface(interface=nipype1_interf, name="addtwo")
if not workingdir:
workingdir = name
node = Node(
Expand All @@ -615,8 +549,9 @@ def add(self,
inputs=inputs,
mapper=mapper,
other_mappers=self._node_mappers,
write_state=write_state)
elif is_function_interface(runnable) or is_current_interface(runnable):
write_state=write_state,
output_names=output_names)
elif is_current_interface(runnable):
if not name:
raise Exception("you have to specify name for the node")
if not workingdir:
Expand Down Expand Up @@ -735,10 +670,6 @@ def is_function(obj):
return hasattr(obj, '__call__')


def is_function_interface(obj):
return type(obj) is aux.FunctionInterface


def is_current_interface(obj):
return type(obj) is aux.CurrentInterface

Expand Down
Loading

0 comments on commit 4c528b0

Please sign in to comment.