21
21
22
22
23
23
def _get_function_dict (flow ):
24
- return {
25
- job .uuid : job .function
26
- for job in flow .jobs
27
- }
24
+ return {job .uuid : job .function for job in flow .jobs }
28
25
29
26
30
27
def _get_nodes_dict (function_dict ):
@@ -37,7 +34,7 @@ def _get_nodes_dict(function_dict):
37
34
38
35
39
36
def _get_edge_from_dict (target , key , value_dict , nodes_mapping_dict ):
40
- if len (value_dict [' attributes' ]) == 1 :
37
+ if len (value_dict [" attributes" ]) == 1 :
41
38
return {
42
39
TARGET_LABEL : target ,
43
40
TARGET_PORT_LABEL : key ,
@@ -57,72 +54,152 @@ def _get_edges_and_extend_nodes(flow_dict, nodes_mapping_dict, nodes_dict):
57
54
edges_lst = []
58
55
for job in flow_dict ["jobs" ]:
59
56
for k , v in job ["function_kwargs" ].items ():
60
- if isinstance (v , dict ) and "@module" in v and "@class" in v and "@version" in v :
61
- edges_lst .append (_get_edge_from_dict (
62
- target = nodes_mapping_dict [job ["uuid" ]],
63
- key = k ,
64
- value_dict = v ,
65
- nodes_mapping_dict = nodes_mapping_dict ,
66
- ))
67
- elif isinstance (v , dict ) and any ([isinstance (el , dict ) and "@module" in el and "@class" in el and "@version" in el for el in v .values ()]):
57
+ if (
58
+ isinstance (v , dict )
59
+ and "@module" in v
60
+ and "@class" in v
61
+ and "@version" in v
62
+ ):
63
+ edges_lst .append (
64
+ _get_edge_from_dict (
65
+ target = nodes_mapping_dict [job ["uuid" ]],
66
+ key = k ,
67
+ value_dict = v ,
68
+ nodes_mapping_dict = nodes_mapping_dict ,
69
+ )
70
+ )
71
+ elif isinstance (v , dict ) and any (
72
+ [
73
+ isinstance (el , dict )
74
+ and "@module" in el
75
+ and "@class" in el
76
+ and "@version" in el
77
+ for el in v .values ()
78
+ ]
79
+ ):
68
80
node_dict_index = len (nodes_dict )
69
81
nodes_dict [node_dict_index ] = get_dict
70
82
for kt , vt in v .items ():
71
- if isinstance (vt , dict ) and "@module" in vt and "@class" in vt and "@version" in vt :
72
- edges_lst .append (_get_edge_from_dict (
73
- target = node_dict_index ,
74
- key = kt ,
75
- value_dict = vt ,
76
- nodes_mapping_dict = nodes_mapping_dict ,
77
- ))
83
+ if (
84
+ isinstance (vt , dict )
85
+ and "@module" in vt
86
+ and "@class" in vt
87
+ and "@version" in vt
88
+ ):
89
+ edges_lst .append (
90
+ _get_edge_from_dict (
91
+ target = node_dict_index ,
92
+ key = kt ,
93
+ value_dict = vt ,
94
+ nodes_mapping_dict = nodes_mapping_dict ,
95
+ )
96
+ )
78
97
else :
79
98
if vt not in nodes_dict .values ():
80
99
node_index = len (nodes_dict )
81
100
nodes_dict [node_index ] = vt
82
101
else :
83
- node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[str (vt )]
84
- edges_lst .append ({TARGET_LABEL : node_dict_index , TARGET_PORT_LABEL : kt , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
85
- edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_dict_index , SOURCE_PORT_LABEL : None })
86
- elif isinstance (v , list ) and any ([isinstance (el , dict ) and "@module" in el and "@class" in el and "@version" in el for el in v ]):
102
+ node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[
103
+ str (vt )
104
+ ]
105
+ edges_lst .append (
106
+ {
107
+ TARGET_LABEL : node_dict_index ,
108
+ TARGET_PORT_LABEL : kt ,
109
+ SOURCE_LABEL : node_index ,
110
+ SOURCE_PORT_LABEL : None ,
111
+ }
112
+ )
113
+ edges_lst .append (
114
+ {
115
+ TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]],
116
+ TARGET_PORT_LABEL : k ,
117
+ SOURCE_LABEL : node_dict_index ,
118
+ SOURCE_PORT_LABEL : None ,
119
+ }
120
+ )
121
+ elif isinstance (v , list ) and any (
122
+ [
123
+ isinstance (el , dict )
124
+ and "@module" in el
125
+ and "@class" in el
126
+ and "@version" in el
127
+ for el in v
128
+ ]
129
+ ):
87
130
node_list_index = len (nodes_dict )
88
131
nodes_dict [node_list_index ] = get_list
89
132
for kt , vt in enumerate (v ):
90
- if isinstance (vt , dict ) and "@module" in vt and "@class" in vt and "@version" in vt :
91
- edges_lst .append (_get_edge_from_dict (
92
- target = node_list_index ,
93
- key = str (kt ),
94
- value_dict = vt ,
95
- nodes_mapping_dict = nodes_mapping_dict ,
96
- ))
133
+ if (
134
+ isinstance (vt , dict )
135
+ and "@module" in vt
136
+ and "@class" in vt
137
+ and "@version" in vt
138
+ ):
139
+ edges_lst .append (
140
+ _get_edge_from_dict (
141
+ target = node_list_index ,
142
+ key = str (kt ),
143
+ value_dict = vt ,
144
+ nodes_mapping_dict = nodes_mapping_dict ,
145
+ )
146
+ )
97
147
else :
98
148
if vt not in nodes_dict .values ():
99
149
node_index = len (nodes_dict )
100
150
nodes_dict [node_index ] = vt
101
151
else :
102
- node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[str (vt )]
103
- edges_lst .append ({TARGET_LABEL : node_list_index , TARGET_PORT_LABEL : kt , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
104
- edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_list_index , SOURCE_PORT_LABEL : None })
152
+ node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[
153
+ str (vt )
154
+ ]
155
+ edges_lst .append (
156
+ {
157
+ TARGET_LABEL : node_list_index ,
158
+ TARGET_PORT_LABEL : kt ,
159
+ SOURCE_LABEL : node_index ,
160
+ SOURCE_PORT_LABEL : None ,
161
+ }
162
+ )
163
+ edges_lst .append (
164
+ {
165
+ TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]],
166
+ TARGET_PORT_LABEL : k ,
167
+ SOURCE_LABEL : node_list_index ,
168
+ SOURCE_PORT_LABEL : None ,
169
+ }
170
+ )
105
171
else :
106
172
if v not in nodes_dict .values ():
107
173
node_index = len (nodes_dict )
108
174
nodes_dict [node_index ] = v
109
175
else :
110
176
node_index = {tv : tk for tk , tv in nodes_dict .items ()}[v ]
111
- edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
177
+ edges_lst .append (
178
+ {
179
+ TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]],
180
+ TARGET_PORT_LABEL : k ,
181
+ SOURCE_LABEL : node_index ,
182
+ SOURCE_PORT_LABEL : None ,
183
+ }
184
+ )
112
185
return edges_lst , nodes_dict
113
186
114
187
115
188
def _resort_total_lst (total_dict , nodes_dict ):
116
189
nodes_with_dep_lst = list (sorted (total_dict .keys ()))
117
- nodes_without_dep_lst = [k for k in nodes_dict .keys () if k not in nodes_with_dep_lst ]
190
+ nodes_without_dep_lst = [
191
+ k for k in nodes_dict .keys () if k not in nodes_with_dep_lst
192
+ ]
118
193
ordered_lst = []
119
194
total_new_dict = {}
120
195
while len (total_new_dict ) < len (total_dict ):
121
196
for ind in sorted (total_dict .keys ()):
122
197
connect = total_dict [ind ]
123
198
if ind not in ordered_lst :
124
199
source_lst = [sd [SOURCE_LABEL ] for sd in connect .values ()]
125
- if all ([s in ordered_lst or s in nodes_without_dep_lst for s in source_lst ]):
200
+ if all (
201
+ [s in ordered_lst or s in nodes_without_dep_lst for s in source_lst ]
202
+ ):
126
203
ordered_lst .append (ind )
127
204
total_new_dict [ind ] = connect
128
205
return total_new_dict
@@ -142,7 +219,7 @@ def _group_edges(edges_lst):
142
219
143
220
144
221
def _get_input_dict (nodes_dict ):
145
- return {k :v for k , v in nodes_dict .items () if not isfunction (v )}
222
+ return {k : v for k , v in nodes_dict .items () if not isfunction (v )}
146
223
147
224
148
225
def _get_workflow (nodes_dict , input_dict , total_dict , source_handles_dict ):
@@ -157,12 +234,21 @@ def get_attr_helper(obj, source_handle):
157
234
v = nodes_dict [k ]
158
235
if isfunction (v ):
159
236
if k in source_handles_dict .keys ():
160
- fn = job (method = v , data = [el for el in source_handles_dict [k ] if el is not None ])
237
+ fn = job (
238
+ method = v ,
239
+ data = [el for el in source_handles_dict [k ] if el is not None ],
240
+ )
161
241
else :
162
242
fn = job (method = v )
163
243
kwargs = {
164
- kw : input_dict [vw [SOURCE_LABEL ]] if vw [SOURCE_LABEL ] in input_dict else get_attr_helper (
165
- obj = memory_dict [vw [SOURCE_LABEL ]], source_handle = vw [SOURCE_PORT_LABEL ])
244
+ kw : (
245
+ input_dict [vw [SOURCE_LABEL ]]
246
+ if vw [SOURCE_LABEL ] in input_dict
247
+ else get_attr_helper (
248
+ obj = memory_dict [vw [SOURCE_LABEL ]],
249
+ source_handle = vw [SOURCE_PORT_LABEL ],
250
+ )
251
+ )
166
252
for kw , vw in total_dict [k ].items ()
167
253
}
168
254
memory_dict [k ] = fn (** kwargs )
@@ -197,7 +283,7 @@ def load_workflow_json(file_name):
197
283
nodes_new_dict = {}
198
284
for k , v in convert_nodes_list_to_dict (nodes_list = content [NODES_LABEL ]).items ():
199
285
if isinstance (v , str ) and "." in v :
200
- p , m = v .rsplit ('.' , 1 )
286
+ p , m = v .rsplit ("." , 1 )
201
287
mod = import_module (p )
202
288
nodes_new_dict [int (k )] = getattr (mod , m )
203
289
else :
@@ -229,7 +315,9 @@ def write_workflow_json(flow, file_name="workflow.json"):
229
315
nodes_store_lst = []
230
316
for k , v in nodes_dict .items ():
231
317
if isfunction (v ):
232
- nodes_store_lst .append ({"id" : k , "function" : v .__module__ + "." + v .__name__ })
318
+ nodes_store_lst .append (
319
+ {"id" : k , "function" : v .__module__ + "." + v .__name__ }
320
+ )
233
321
elif isinstance (v , np .ndarray ):
234
322
nodes_store_lst .append ({"id" : k , "value" : v .tolist ()})
235
323
else :
0 commit comments