-
Notifications
You must be signed in to change notification settings - Fork 0
/
callbacks.py
157 lines (127 loc) · 6.97 KB
/
callbacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from math import exp
from keras.callbacks import Callback
import numpy as np
class TestPerformanceCallback(Callback):
"""
Callback class for testing normal model performance at the beginning of every epoch.
"""
def __init__(self, X_test, y_test, model):
super().__init__()
self.X_test = X_test
self.y_test = y_test
self.model = model # this is only a reference, not a deep copy
self.accuracies = []
def on_epoch_begin(self, epoch, logs=None):
loss, accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=2)
self.accuracies.append(accuracy * 100)
class TestSuperpositionPerformanceCallback(Callback):
"""
Callback class for testing superposition NN model performance at the beginning of every epoch.
"""
def __init__(self, X_test, y_test, context_matrices, model, task_index):
super().__init__()
self.X_test = X_test
self.y_test = y_test
self.context_matrices = context_matrices
self.model = model # this is only a reference, not a deep copy
self.task_index = task_index
self.accuracies = []
def on_epoch_begin(self, epoch, logs=None):
if self.task_index == 0: # first task (original MNIST images) - we did not use context yet
loss, accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=2)
self.accuracies.append(accuracy * 100)
return
# save current model weights (without bias node)
curr_w_matrices = []
curr_bias_vectors = []
for layer in self.model.layers[1:]: # first layer is Flatten so we skip it
curr_w_matrices.append(layer.get_weights()[0])
curr_bias_vectors.append(layer.get_weights()[1])
# temporarily change model weights to be suitable for first task (without bias node)
for i, layer in enumerate(self.model.layers[1:]): # first layer is Flatten so we skip it
# not multiplying with inverse because inverse is the same in binary superposition with {-1, 1} on the diagonal
# using only element-wise multiplication on diagonal vectors for speed-up
context_inverse_multiplied = self.context_matrices[self.task_index][i]
for task_i in range(self.task_index - 1, 0, -1):
context_inverse_multiplied = np.multiply(context_inverse_multiplied, self.context_matrices[task_i][i])
context_inverse_multiplied = np.diag(context_inverse_multiplied) # vector to diagonal matrix
layer.set_weights([context_inverse_multiplied @ curr_w_matrices[i], curr_bias_vectors[i]])
# evaluate accuracy
loss, accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=2)
self.accuracies.append(accuracy * 100)
# change model weights back (without bias node)
for i, layer in enumerate(self.model.layers[1:]): # first layer is Flatten so we skip it
layer.set_weights([curr_w_matrices[i], curr_bias_vectors[i]])
class TestSuperpositionPerformanceCallback_CNN(Callback):
"""
Callback class for testing superposition CNN model performance at the beginning of every epoch.
"""
def __init__(self, X_test, y_test, context_matrices, model, task_index):
super().__init__()
self.X_test = X_test
self.y_test = y_test
self.context_matrices = context_matrices
self.model = model # this is only a reference, not a deep copy
self.task_index = task_index
self.accuracies = []
def on_epoch_begin(self, epoch, logs=None):
if self.task_index == 0: # first task - we did not use context yet
loss, accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=2)
self.accuracies.append(accuracy * 100)
return
# save current model weights (without bias node)
curr_w_matrices = []
curr_bias_vectors = []
for i, layer in enumerate(self.model.layers):
if i < 2 or i > 3: # conv or dense layer
curr_w_matrices.append(layer.get_weights()[0])
curr_bias_vectors.append(layer.get_weights()[1])
# temporarily change model weights to be suitable for first task (without bias node)
for i, layer in enumerate(self.model.layers):
if i < 2 or i > 3: # conv or dense layer
# not multiplying with inverse because inverse is the same in binary superposition with {-1, 1} on the diagonal
# using only element-wise multiplication on diagonal vectors for speed-up
if i < 2: # conv layer
# flatten
context_vector = self.context_matrices[self.task_index][i]
for task_i in range(self.task_index - 1, 0, -1):
context_vector = np.multiply(context_vector, self.context_matrices[task_i][i])
new_w = np.reshape(np.multiply(curr_w_matrices[i].flatten(), context_vector), curr_w_matrices[i].shape)
layer.set_weights([new_w, curr_bias_vectors[i]])
else: # dense layer
context_inverse_multiplied = self.context_matrices[self.task_index][i - 2]
for task_i in range(self.task_index - 1, 0, -1):
context_inverse_multiplied = np.multiply(context_inverse_multiplied, self.context_matrices[task_i][i - 2])
context_inverse_multiplied = np.diag(context_inverse_multiplied) # vector to diagonal matrix
layer.set_weights([context_inverse_multiplied @ curr_w_matrices[i - 2], curr_bias_vectors[i - 2]])
# evaluate accuracy
loss, accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=2)
self.accuracies.append(accuracy * 100)
# change model weights back (without bias node)
for i, layer in enumerate(self.model.layers):
if i < 2 or i > 3: # conv or dense layer
if i < 2: # conv layer
layer.set_weights([curr_w_matrices[i], curr_bias_vectors[i]])
else: # dense layer
layer.set_weights([curr_w_matrices[i - 2], curr_bias_vectors[i - 2]])
lr_over_time = [] # global variable to store changing learning rates
def lr_scheduler(epoch, lr):
"""
Learning rate scheduler function to set how learning rate changes each epoch.
:param epoch: current epoch number
:param lr: current learning rate
:return: new learning rate
"""
global lr_over_time
lr_over_time.append(lr)
decay_type = 'exponential'
if decay_type == 'linear':
lr -= 10 ** -5
elif decay_type == 'exponential':
initial_lr = 0.0001
k = 0.07
t = len(lr_over_time)
lr = initial_lr * exp(-k * t)
if len(lr_over_time) % 10 == 0: # to start each new task with the same learning rate as the first one (1 task - 10 epochs)
lr_over_time = [] # re-initiate learning rate
return max(lr, 0.000001) # don't let learning rate go to 0