forked from KalmanNet/KalmanNet_TSP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLinear_sysmdl.py
169 lines (128 loc) · 5 KB
/
Linear_sysmdl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import torch
from torch.distributions.multivariate_normal import MultivariateNormal
class SystemModel:
def __init__(self, F, q, H, r, T, T_test, outlier_p=0,rayleigh_sigma=10000):
self.outlier_p = outlier_p
self.rayleigh_sigma = rayleigh_sigma
####################
### Motion Model ###
####################
self.F = F
self.m = self.F.size()[0]
self.q = q
self.Q = q * q * torch.eye(self.m)
#########################
### Observation Model ###
#########################
self.H = H
self.n = self.H.size()[0]
self.r = r
self.R = r * r * torch.eye(self.n)
#Assign T and T_test
self.T = T
self.T_test = T_test
#####################
### Init Sequence ###
#####################
def InitSequence(self, m1x_0, m2x_0):
self.m1x_0 = m1x_0
self.m2x_0 = m2x_0
#########################
### Update Covariance ###
#########################
def UpdateCovariance_Gain(self, q, r):
self.q = q
self.Q = q * q * torch.eye(self.m)
self.r = r
self.R = r * r * torch.eye(self.n)
def UpdateCovariance_Matrix(self, Q, R):
self.Q = Q
self.R = R
#########################
### Generate Sequence ###
#########################
def GenerateSequence(self, Q_gen, R_gen, T):
# Pre allocate an array for current state
self.x = torch.empty(size=[self.m, T])
# Pre allocate an array for current observation
self.y = torch.empty(size=[self.n, T])
# Set x0 to be x previous
self.x_prev = self.m1x_0
# Outliers
if self.outlier_p > 0:
b_matrix = torch.bernoulli(self.outlier_p *torch.ones(T))
# Generate Sequence Iteratively
for t in range(0, T):
########################
#### State Evolution ###
########################
# Process Noise
if self.q == 0:
xt = self.F.matmul(self.x_prev)
else:
xt = self.F.matmul(self.x_prev)
mean = torch.zeros([self.m])
distrib = MultivariateNormal(loc=mean, covariance_matrix=Q_gen)
eq = distrib.rsample()
# eq = torch.normal(mean, self.q)
eq = torch.reshape(eq[:],[self.m,1])
# Additive Process Noise
xt = torch.add(xt,eq)
################
### Emission ###
################
# Observation Noise
if self.r == 0:
yt = self.H.matmul(xt)
else:
yt = self.H.matmul(xt)
mean = torch.zeros([self.n])
distrib = MultivariateNormal(loc=mean, covariance_matrix=R_gen)
er = distrib.rsample()
er = torch.reshape(er[:],[self.n,1])
# mean = torch.zeros([self.n,1])
# er = torch.normal(mean, self.r)
# Additive Observation Noise
yt = torch.add(yt,er)
# Outliers
if self.outlier_p > 0:
if b_matrix[t] != 0:
btdt = self.rayleigh_sigma*torch.sqrt(-2*torch.log(torch.rand(self.n,1)))
yt = torch.add(yt,btdt)
########################
### Squeeze to Array ###
########################
# Save Current State to Trajectory Array
self.x[:, t] = torch.squeeze(xt)
# Save Current Observation to Trajectory Array
self.y[:, t] = torch.squeeze(yt)
################################
### Save Current to Previous ###
################################
self.x_prev = xt
######################
### Generate Batch ###
######################
def GenerateBatch(self, size, T, randomInit=False, seqInit=False, T_test=0):
# Allocate Empty Array for Input
self.Input = torch.empty(size, self.n, T)
# Allocate Empty Array for Target
self.Target = torch.empty(size, self.m, T)
### Generate Examples
initConditions = self.m1x_0
for i in range(0, size):
# Generate Sequence
# Randomize initial conditions to get a rich dataset
if(randomInit):
variance = 100
initConditions = torch.rand_like(self.m1x_0) * variance
if(seqInit):
initConditions = self.x_prev
if((i*T % T_test)==0):
initConditions = torch.zeros_like(self.m1x_0)
self.InitSequence(initConditions, self.m2x_0)
self.GenerateSequence(self.Q, self.R, T)
# Training sequence input
self.Input[i, :, :] = self.y
# Training sequence output
self.Target[i, :, :] = self.x