-
Notifications
You must be signed in to change notification settings - Fork 0
/
viterbicont.py
executable file
·148 lines (143 loc) · 7.67 KB
/
viterbicont.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import numpy as np,numpy.random
from init_gaussian import hmmgaussian
from scipy import stats
from forwardcont import forwardcont
from backwardcont import backwardcont
from forward_backward_cont import forward_backwardcont
from sklearn.preprocessing import normalize
def viterbicont(transmtrx,obsmtrx,pie,observations):
''' Input : Transition matrix, pie, state_observation probs, observations
Output : The most likely sequence of states (and also the probabilite for each time point) and its probabilite for the given observations
Unlike forward backward, considers the most probable sequence given the state for all time points
not just the optimum state individually for each time point'''
# initialization
eps = 2.22044604925e-16
if len(np.shape(observations)) == 1:
numstates = np.shape(transmtrx)[0]
timelength = np.shape(observations)[0]
deltas = eps * np.ones((timelength,numstates))
optzis = eps * np.ones((timelength))
As = eps * np.ones((timelength,numstates))
# .reshape(-1,1)
probs = eps * np.ones(numstates)
for state in range(numstates):
distr = stats.norm(obsmtrx[state,0], obsmtrx[state,1])
probs[state] = distr.pdf(observations[0])
deltas[0,:] = normalize((np.multiply(probs,pie)).reshape(1, -1),norm = 'l1')
for t in range(1,timelength):
# set A here
for j in range(numstates):
# print deltas[t-1,:] * transmtrx[:,j] * obsmtrx[j,int(observations[t])]
distr = stats.norm(obsmtrx[j,0], obsmtrx[j,1])
prob = distr.pdf(observations[t])
normed = normalize((deltas[t-1,:] * transmtrx[:,j] * prob).reshape(1, -1),norm = 'l1')
# print normed
As[t,j] = int(np.argmax(normed))
cands = eps * np.ones((numstates))
for i in range(numstates):
cands[i] = deltas[t-1,i] *(transmtrx[i,j]) *(prob)
deltas[t,j] = max(cands)
deltas[t,:] = normalize(deltas[t,:].reshape(1, -1),norm = 'l1')
optzis[timelength-1] = int(np.argmax(deltas[timelength-1,:]))
for k in range(timelength-2,-1,-1):
optzis[k] = As[k+1,int(optzis[k+1])]
elif len(np.shape(observations)) == 2:
numstates = np.shape(transmtrx)[0]
timelength = np.shape(observations)[1]
numsamples = np.shape(observations)[0]
deltas = eps * np.ones((numsamples,timelength,numstates))
optzis = eps *np.ones((numsamples,timelength))
As = eps * np.ones((numsamples,timelength,numstates))
for sample in range(numsamples):
probs = eps * np.ones(numstates)
for state in range(numstates):
distr = stats.norm(obsmtrx[state,0], obsmtrx[state,1])
probs[state] = distr.pdf(observations[sample,0])
deltas[sample,0,:] = normalize((np.multiply(probs,pie)).reshape(1, -1),norm = 'l1')
# otherzero = np.argmax(deltas[sample,0,:])
for t in range(1,timelength):
# set A here
for j in range(numstates):
# print deltas[t-1,:] * transmtrx[:,j] * obsmtrx[j,int(observations[t])]
distr = stats.norm(obsmtrx[j,0], obsmtrx[j,1])
prob = distr.pdf(observations[sample,t])
normed = normalize((deltas[sample,t-1,:] * transmtrx[:,j] * prob).reshape(1, -1),norm = 'l1')
# print normed
As[sample,t,j] = int(np.argmax(normed))
cands = eps * np.ones((numstates))
for i in range(numstates):
cands[i] = deltas[sample,t-1,i] *(transmtrx[i,j]) *(prob)
deltas[sample,t,j] = max(cands)
deltas[sample,t,:] = normalize(deltas[sample,t,:].reshape(1, -1),norm = 'l1')
optzis[sample,timelength-1] = int(np.argmax(deltas[sample,timelength-1,:]))
for k in range(timelength-2,-1,-1):
optzis[sample,k] = As[sample,k+1,int(optzis[sample,k+1])]
else:
# multiple features case
numstates = np.shape(transmtrx)[0]
numfeats = np.shape(observations)[1]
timelength = np.shape(observations)[2]
numsamples = np.shape(observations)[0]
deltas = eps * np.ones((numsamples,timelength,numstates))
optzis = eps *np.ones((numsamples,timelength))
As = eps * np.ones((numsamples,timelength,numstates))
for sample in range(numsamples):
probs = np.ones(numstates)
for state in range(numstates):
for feat in range(numfeats):
distr = stats.norm(obsmtrx[feat,state,0], obsmtrx[feat,state,1])
probs[state] *= distr.pdf(observations[sample,feat,0])
deltas[sample,0,:] = normalize((np.multiply(probs,pie)).reshape(1, -1),norm = 'l1')
# otherzero = np.argmax(deltas[sample,0,:])
for t in range(1,timelength):
# set A here
for j in range(numstates):
# print deltas[t-1,:] * transmtrx[:,j] * obsmtrx[j,int(observations[t])]
prob = 1.0
for feat in range(numfeats):
distr = stats.norm(obsmtrx[feat,j,0], obsmtrx[feat,j,1])
prob *= distr.pdf(observations[sample,feat,t])
normed = normalize((deltas[sample,t-1,:] * transmtrx[:,j] * prob).reshape(1, -1),norm = 'l1')
# print normed
As[sample,t,j] = int(np.argmax(normed))
cands = eps * np.ones((numstates))
for i in range(numstates):
cands[i] = deltas[sample,t-1,i] *(transmtrx[i,j]) *(prob)
deltas[sample,t,j] = max(cands)
deltas[sample,t,:] = normalize(deltas[sample,t,:].reshape(1, -1),norm = 'l1')
optzis[sample,timelength-1] = int(np.argmax(deltas[sample,timelength-1,:]))
for k in range(timelength-2,-1,-1):
optzis[sample,k] = As[sample,k+1,int(optzis[sample,k+1])]
return (optzis,deltas)
# def main():
# exmodel = hmmgaussian(2,1,10,100,2,False)
# observations = exmodel.observations
# pie = exmodel.pie
# transmtrx = exmodel.transitionmtrx
# obsmtrx = exmodel.obsmtrx
# seqofstates = exmodel.seqofstates
# (gammas,betas,alphas,log_prob_most_likely_seq,most_likely_seq,forward_most_likely_seq,forward_log_prob_most_likely_seq,Ziis,logobservations) = forward_backwardcont(transmtrx,obsmtrx,pie,observations)
# # print "forward_backward acc"
# # print np.sum(seqofstates==most_likely_seq) / float(exmodel.obserlength)
# # print "forward acc"
# # print np.sum(seqofstates==forward_most_likely_seq) / float(exmodel.obserlength)
# # print "forward_backward prob"
# # print log_prob_most_likely_seq
# # print "forward prob"
# # print forward_log_prob_most_likely_seq
# (mlpath,deltas) = viterbicont(transmtrx,obsmtrx,pie,observations)
# print mlpath
# print "forward_backward is more certain at each time point"
# numwins = 0
# for i in range(exmodel.obserlength):
# if max(alphas[i,:]) <= max(gammas[i,:]):
# numwins +=1
# print numwins / float(exmodel.obserlength)
# # # print mlpath
# # print "viterbi similar to reality"
# # print np.sum(seqofstates==mlpath) / float(exmodel.obserlength)
# # print "vitervi similarity to forward_backward"
# # print np.sum(mlpath==most_likely_seq) / float(exmodel.obserlength)
# # print "viterbi similarity to forward seq"
# # print np.sum(mlpath==forward_most_likely_seq) / float(exmodel.obserlength)
# main()