forked from FS-smile/TUAF-2023
-
Notifications
You must be signed in to change notification settings - Fork 0
/
convert_triple_graph.py
50 lines (45 loc) · 1.86 KB
/
convert_triple_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import networkx as nx
import numpy as np
def convert_to_triple_graph(g):
adj = np.array(nx.to_numpy_matrix(g))
node_o_num = len(adj)
adj_tri_upper = np.triu(adj, k=0)
node_o_pairs = np.argwhere(adj_tri_upper == 1)
tuple_list = [str(i[0])+'_'+str(i[1]) for i in node_o_pairs]
node_t_index_list = np.arange(1, len(tuple_list)+1)
t_o_node_dict = zip(tuple_list, node_t_index_list)
t_o_node_dict = dict(t_o_node_dict)
# construct a new adjacent matrix
subgraph_list = []
adj_t = np.zeros((len(tuple_list), len(tuple_list)))
for i in range(1, node_o_num+1):
sub_node_o = []
for item in tuple_list:
tmp = item.split("_")
tmp1 = tmp[0]
tmp2 = tmp[1]
if (str(i-1) == str(tmp1)) | (str(i-1) == str(tmp2)):
sub_node_o.append(t_o_node_dict[item]-1)
if len(sub_node_o) > 1:
subgraph_list.append(sub_node_o)
sub_node_o = np.array(sub_node_o)
rows = [[i]*len(sub_node_o) for i in sub_node_o]
cols = [[sub_node_o]*len(sub_node_o)]
adj_t[rows, cols] = 1
row, col = np.diag_indices_from(adj_t)
adj_t[row, col] = 0
# convert to triple-uint graph
G_1 = nx.from_numpy_matrix(adj_t)
for k in range(0, len(G_1.nodes)):
tuple_node_name = list(t_o_node_dict.keys())[
list(t_o_node_dict.values()).index(k+1)]
node_tuple = tuple_node_name.split("_")
index1 = node_tuple[0]
index2 = node_tuple[1]
tuple_node1 = g.nodes[int(index1)]['node_label']
tuple_node2 = g.nodes[int(index2)]['node_label']
tuple_edge_label = g.edges[(int(index1), int(index2))]['edge_label']
G_1.nodes[k]['feat_triple'] = np.concatenate(
(tuple_node1, tuple_edge_label, tuple_node2), axis=0)
G_1.graph['label'] = g.graph['label']
return G_1