-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathIntegratedGradients.py
174 lines (147 loc) · 6.37 KB
/
IntegratedGradients.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
################################################################
# Implemented by Naozumi Hiranuma (hiranumn@uw.edu) #
# #
# Keras-compatible implmentation of Integrated Gradients #
# proposed in "Axiomatic attribution for deep neuron networks" #
# (https://arxiv.org/abs/1703.01365). #
# #
# Keywords: Shapley values, interpretable machine learning #
################################################################
from __future__ import division, print_function
import numpy as np
from time import sleep
import sys
import keras.backend as K
from keras.models import Model, Sequential
'''
Integrated gradients approximates Shapley values by integrating partial
gradients with respect to input features from reference input to the
actual input. The following class implements this concept.
'''
class integrated_gradients:
# model: Keras model that you wish to explain.
# outchannels: In case the model are multi tasking, you can specify which channels you want.
def __init__(self, model, outchannels=[], verbose=1):
# Bacnend: either tensorflow or theano)
self.backend = K.backend()
# load model supports keras.Model and keras.Sequential
if isinstance(model, Sequential):
self.model = model.model
elif isinstance(model, Model):
self.model = model
else:
print("Invalid input model")
return -1
# load input tensors
self.input_tensors = []
for i in self.model.inputs:
self.input_tensors.append(i)
# The learning phase flag is a bool tensor (0 = test, 1 = train)
# to be passed as input to any Keras function that uses
# a different behavior at train time and test time.
self.input_tensors.append(K.learning_phase())
# If outputchanel is specified, use it.
# Otherwise evalueate all outputs.
self.outchannels = outchannels
if len(self.outchannels) == 0:
if verbose: print("Evaluated output channel (0-based index): All")
if K.backend() == "tensorflow":
self.outchannels = range(self.model.output.shape[1]._value)
elif K.backend() == "theano":
self.outchannels = range(model1.output._keras_shape[1])
else:
if verbose:
print("Evaluated output channels (0-based index):")
print(','.join([str(i) for i in self.outchannels]))
# Build gradient functions for desired output channels.
self.get_gradients = {}
if verbose: print("Building gradient functions")
# Evaluate over all channels.
for c in self.outchannels:
# Get tensor that calcuates gradient
if K.backend() == "tensorflow":
gradients = self.model.optimizer.get_gradients(self.model.output[:, c], self.model.input)
if K.backend() == "theano":
gradients = self.model.optimizer.get_gradients(self.model.output[:, c].sum(), self.model.input)
# Build computational graph that calculates the tensfor given inputs
self.get_gradients[c] = K.function(inputs=self.input_tensors, outputs=gradients)
# This takes a lot of time for a big model with many tasks.
# So lets pring the progress.
if verbose:
sys.stdout.write('\r')
sys.stdout.write("Progress: " + str(int((c + 1) * 1.0 / len(self.outchannels) * 1000) * 1.0 / 10) + "%")
sys.stdout.flush()
# Done
if verbose: print("\nDone.")
'''
Input: sample to explain, channel to explain
Optional inputs:
- reference: reference values (defaulted to 0s).
- steps: # steps from reference values to the actual sample.
Output: list of numpy arrays to integrated over.
'''
def explain(self, sample, outc=0, reference=False, num_steps=50, verbose=0):
# Each element for each input stream.
samples = []
numsteps = []
step_sizes = []
# If multiple inputs are present, feed them as list of np arrays.
if isinstance(sample, list):
# If reference is present, reference and sample size need to be equal.
if reference != False:
assert len(sample) == len(reference)
for i in range(len(sample)):
if reference == False:
_output = integrated_gradients.linearly_interpolate(sample[i], False, num_steps)
else:
_output = integrated_gradients.linearly_interpolate(sample[i], False, num_steps)
samples.append(_output[0])
numsteps.append(_output[1])
step_sizes.append(_output[2])
# Or you can feed just a single numpy arrray.
elif isinstance(sample, np.ndarray):
_output = integrated_gradients.linearly_interpolate(sample, reference, num_steps)
samples.append(_output[0])
numsteps.append(_output[1])
step_sizes.append(_output[2])
# Desired channel must be in the list of outputchannels
assert outc in self.outchannels
if verbose: print("Explaning the " + str(self.outchannels[outc]) + "th output.")
# For tensorflow backend
_input = []
for s in samples:
_input.append(s)
_input.append(0)
if K.backend() == "tensorflow":
gradients = self.get_gradients[outc](_input)
elif K.backend() == "theano":
gradients = self.get_gradients[outc](_input)
if len(self.model.inputs) == 1:
gradients = [gradients]
explanation = []
for i in range(len(gradients)):
_temp = np.sum(gradients[i], axis=0)
explanation.append(np.multiply(_temp, step_sizes[i]))
if isinstance(sample, list):
return explanation
elif isinstance(sample, np.ndarray):
return explanation[0]
return -1
'''
Input: numpy array of a sample
Optional inputs:
- reference: reference values (defaulted to 0s).
- steps: # steps from reference values to the actual sample.
Output: list of numpy arrays to integrated over.
'''
@staticmethod
def linearly_interpolate(sample, reference=False, num_steps=50):
# Use default reference values if reference is not specified
if reference is False: reference = np.zeros(sample.shape);
# Reference and sample shape needs to match exactly
assert sample.shape == reference.shape
# Calcuated stepwise difference from reference to the actual sample.
ret = np.zeros(tuple([num_steps] + [i for i in sample.shape]))
for s in range(num_steps):
ret[s] = reference + (sample - reference) * (s * 1.0 / num_steps)
return ret, num_steps, (sample - reference) * (1.0 / num_steps)