-
Notifications
You must be signed in to change notification settings - Fork 12
/
crf.py
123 lines (110 loc) · 4.16 KB
/
crf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import theano
import theano.tensor as T
def log_sum_exp(x, axis=None):
"""
Sum probabilities in the log-space.
"""
xmax = x.max(axis=axis, keepdims=True)
xmax_ = x.max(axis=axis)
return xmax_ + T.log(T.exp(x - xmax).sum(axis=axis))
def forward(observations, transitions, viterbi=False,
return_alpha=False, return_best_sequence=False):
"""
Takes as input:
- observations, sequence of shape (n_steps, n_classes)
- transitions, sequence of shape (n_classes, n_classes)
Probabilities can be given in the log space.
Compute alpha, matrix of size (n_steps, n_classes), such that
alpha[i, j] represents one of these 2 values:
- the probability that the real path at node i is in j
- the maximum probability of a path finishing in j at node i (Viterbi)
Returns one of these 2 values:
- alpha
- the final probability, which can be:
- the sum of the probabilities of all the paths
- the probability of the best path (Viterbi)
"""
assert not return_best_sequence or (viterbi and not return_alpha)
def recurrence(obs, previous, transitions):
previous = previous.dimshuffle(0, 'x')
obs = obs.dimshuffle('x', 0)
if viterbi:
scores = previous + obs + transitions
out = scores.max(axis=0)
if return_best_sequence:
out2 = scores.argmax(axis=0)
return out, out2
else:
return out
else:
return log_sum_exp(previous + obs + transitions, axis=0)
# assert observations.ndim == 2
# assert transitions.ndim == 2
initial = observations[0]
alpha, _ = theano.scan(
fn=recurrence,
outputs_info=(initial, None) if return_best_sequence else initial,
sequences=[observations[1:]],
non_sequences=transitions
)
if return_alpha:
return alpha
elif return_best_sequence:
sequence, _ = theano.scan(
fn=lambda beta_i, previous: beta_i[previous],
outputs_info=T.cast(T.argmax(alpha[0][-1]), 'int32'),
sequences=T.cast(alpha[1][::-1], 'int32')
)
sequence = T.concatenate([sequence[::-1], [T.argmax(alpha[0][-1])]])
return sequence
else:
if viterbi:
return alpha[-1].max(axis=0)
else:
return log_sum_exp(alpha[-1], axis=0)
def forward_dynamic(observations, transitions, log_space=True, viterbi=False,
return_alpha=False):
"""
Takes as input:
- observations, sequence of shape (n_steps, n_classes)
- transitions, sequence of shape (n_classes, n_classes, n_classes)
Probabilities can be given in the log space.
Compute alpha, matrix of size (n_steps, n_classes), such that
alpha[i, j] represents one of these 2 values:
- the probability that the real path at node i is in j
- the maximum probability of a path finishing in j at node i (Viterbi)
Returns one of these 3 values:
- alpha
- the final probability, which can be:
- the sum of the probabilities of all the paths
- the probability of the best path (Viterbi)
- the best sequence using Viterbi decoding
"""
def recurrence(obs, previous, transitions):
previous = previous.dimshuffle(0, 'x')
obs = obs.dimshuffle('x', 0)
local_transitions = transitions.dot(obs)
if viterbi:
out = (previous + obs + local_transitions).max(axis=0)
else:
if log_space:
out = log_sum_exp(previous + obs + local_transitions, axis=0)
else:
out = (previous + obs + local_transitions).sum(axis=0)
return out
# assert observations.ndim == 2
# assert transitions.ndim == 3
initial = observations[0]
alpha, _ = theano.scan(
fn=recurrence,
outputs_info=initial,
sequences=[observations[1:]],
non_sequences=transitions
)
if return_alpha:
return alpha
else:
if viterbi:
return alpha[-1].max(axis=0)
else:
return log_sum_exp(alpha[-1], axis=0)