-
Notifications
You must be signed in to change notification settings - Fork 0
/
siamese.py
138 lines (111 loc) · 4.49 KB
/
siamese.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# coding:utf-8
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
import random
from keras import optimizers
from keras.datasets import mnist
from keras.models import Model
from keras.layers import Input, Flatten, Dense, Dropout, Lambda
from keras.optimizers import RMSprop
from keras import backend as K
num_classes = 10
epochs = 20
def euclidean_distance(vects):
x, y = vects
return K.sqrt(K.maximum(K.sum(K.square(x - y), axis=1, keepdims=True), K.epsilon())) # 1e-07
def eucl_dist_output_shape(shapes):
shape1, shape2 = shapes
return (shape1[0], 1) # (None, 1)
def contrastive_loss(y_true, y_pred): # siamese对比损失函数
margin = 1
return K.mean(y_true * K.square(y_pred) +
(1 - y_true) * K.square(K.maximum(margin - y_pred, 0)))
'''
input : x_train/x_test
[array([]),...] 包含10个数组
'''
def create_pairs(x, digit_indices):
'''Positive and negative pair creation.
Alternates between positive and negative pairs.
'''
pairs = []
labels = []
n = min([len(digit_indices[d]) for d in range(num_classes)]) - 1 # 5420, 891
for d in range(num_classes):
for i in range(n):
z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
pairs += [[x[z1], x[z2]]]
inc = random.randrange(1, num_classes)
dn = (d + inc) % num_classes
z1, z2 = digit_indices[d][i], digit_indices[dn][i] # 正好不算本身的数组
pairs += [[x[z1], x[z2]]]
labels += [1, 0]
# print(np.array(pairs).shape) # (108400/17820, 2, 28, 28)
# print(len(labels)) # 108400/17820
return np.array(pairs), np.array(labels)
def create_base_network(input_shape):
'''Base network to be shared (eq. to feature extraction).
'''
input = Input(shape=input_shape)
x = Flatten()(input)
x = Dense(128, activation='relu')(x)
x = Dropout(0.1)(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.1)(x)
x = Dense(128, activation='relu')(x)
return Model(input, x)
def compute_accuracy(y_true, y_pred):
'''Compute classification accuracy with a fixed threshold on distances.
'''
pred = y_pred.ravel() < 0.5
return np.mean(pred == y_true)
def accuracy(y_true, y_pred):
'''Compute classification accuracy with a fixed threshold on distances.
'''
return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))
# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
input_shape = x_train.shape[1:]
# create training+test positive and negative pairs
digit_indices = [np.where(y_train == i)[0] for i in range(num_classes)] # [array([]),...] 包含10个数组
tr_pairs, tr_y = create_pairs(x_train, digit_indices) # (108400, 2, 28, 28) 108400
digit_indices = [np.where(y_test == i)[0] for i in range(num_classes)]
te_pairs, te_y = create_pairs(x_test, digit_indices) # (17820, 2, 28, 28) 17820
# network definition
base_network = create_base_network(input_shape)
input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)
# because we re-use the same instance `base_network`,
# the weights of the network will be shared across the two branches
processed_a = base_network(input_a) # (None, 128)
processed_b = base_network(input_b)
distance = Lambda(euclidean_distance,
output_shape=eucl_dist_output_shape)([processed_a, processed_b])
# Lambda(function(仅接受一个参数,即上一层的输出),output_shape)
model = Model([input_a, input_b], distance)
model.load_weights('siamese.h5', by_name=True)
for i in range(10):
y_pred = model.predict([te_pairs[np.newaxis, i, 0], te_pairs[np.newaxis, i, 1]])
print(y_pred)
'''
# train
opt = optimizers.Adam(lr=1e-3,decay=1e-5, amsgrad=True) # RMSprop()
model.compile(loss=contrastive_loss, optimizer=opt, metrics=[accuracy])
model.fit([tr_pairs[:, 0], tr_pairs[:, 1]], tr_y,
batch_size=128,
epochs=epochs,
validation_data=([te_pairs[:, 0], te_pairs[:, 1]], te_y))
model.save('siamese.h5')
# compute final accuracy on training and test sets
y_pred = model.predict([tr_pairs[:, 0], tr_pairs[:, 1]])
tr_acc = compute_accuracy(tr_y, y_pred)
y_pred = model.predict([te_pairs[:, 0], te_pairs[:, 1]])
te_acc = compute_accuracy(te_y, y_pred)
print('* Accuracy on training set: %0.2f%%' % (100 * tr_acc))
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))
'''