-
Notifications
You must be signed in to change notification settings - Fork 1
/
preprocess.py
188 lines (157 loc) · 6.8 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from scipy.sparse.linalg import eigs
class Dataset(object):
def __init__(self, data, stats):
self.__data = data
self.mean = stats['mean']
self.std = stats['std']
def get_data(self, type):
return self.__data[type]
def get_stats(self):
return {'mean': self.mean, 'std': self.std}
def get_len(self, type):
return len(self.__data[type])
def z_inverse(self, type):
return self.__data[type] * self.std + self.mean
def weight_matrix(file_path, sigma = 1000):
'''
Load weight matrix function.
:param file_path: str, the path of saved weight matrix file.
:param sigma: float, scalar of matrix W.
:return: np.ndarray, [n_route, n_route].
'''
try:
W = pd.read_csv(file_path, header=None).values
except FileNotFoundError:
print(f'ERROR: input file was not found in {file_path}.')
for i in range(len(W)):
for j in range(len(W[i])):
if W[i][j] > 0:
W[i][j] = np.exp(-W[i][j] / sigma)
return W
def scaled_laplacian(W):
'''
Normalized graph Laplacian function.
:param W: np.ndarray, [n_route, n_route], weighted adjacency matrix of G.
:return: np.matrix, [n_route, n_route].
'''
# d -> diagonal degree matrix
n, d = np.shape(W)[0], np.sum(W, axis=1)
# L -> graph Laplacian
L = -W
L[np.diag_indices_from(L)] = d
for i in range(n):
for j in range(n):
if (d[i] > 0) and (d[j] > 0):
L[i, j] = L[i, j] / np.sqrt(d[i] * d[j])
# lambda_max \approx 2.0, the largest eigenvalues of L.
lambda_max = eigs(L, k=1, which='LR')[0][0].real
return np.mat(2 * L / lambda_max - np.identity(n))
def cheb_poly_approx(L, Ks, n):
'''
Chebyshev polynomials approximation function.
:param L: np.matrix, [n_route, n_route], graph Laplacian.
:param Ks: int, kernel size of spatial convolution.
:param n: int, number of routes / size of graph.
:return: np.ndarray, [n_route, Ks*n_route].
'''
L0, L1 = np.mat(np.identity(n)), np.mat(np.copy(L))
if Ks > 1:
L_list = [np.copy(L0), np.copy(L1)]
for i in range(Ks - 2):
Ln = np.mat(2 * L * L1 - L0)
L_list.append(np.copy(Ln))
L0, L1 = np.matrix(np.copy(L1)), np.matrix(np.copy(Ln))
# L_lsit [Ks, n*n], Lk [n, Ks*n]
return np.concatenate(L_list, axis=-1)
elif Ks == 1:
return np.asarray(L0)
else:
raise ValueError(f'ERROR: the size of spatial kernel must be greater than 1, but received "{Ks}".')
def z_score(x, mean, std):
'''
Z-score normalization function.
:param x: np.ndarray, input array to be normalized.
:param mean: float, the value of mean.
:param std: float, the value of standard deviation.
:return: np.ndarray, z-score normalized array.
'''
return (x - mean) / std
def seq_gen(len_seq, data_seq, offset, n_frame, n_route, day_slot, C_0=1):
'''
Generate data in the form of standard sequence unit.
:param len_seq: int, the length of target date sequence.
:param data_seq: np.ndarray, source data / time-series.
:param offset: int, the starting index of different dataset type.
:param n_frame: int, the number of frame within a standard sequence unit,
which contains n_his = 12 and n_pred = 12 (3 /15 min, 6 /30 min, 9 /45 min, 12 /60min).
:param n_route: int, the number of routes in the graph.
:param day_slot: int, the number of time slots per day, controlled by the time window
(5 min as default, that is 288 per day).
:param C_0: int, the size of input channel.
:return: np.ndarray, [len_seq, n_frame, n_route, C_0].
'''
n_slot = day_slot - n_frame + 1
tmp_seq = np.zeros((len_seq * n_slot, n_frame, n_route, C_0))
for i in range(len_seq):
for j in range(n_slot):
sta = ((i + offset) * day_slot + j) * C_0
end = sta + n_frame * C_0
tmp_seq[i * n_slot + j, :, :, :] = np.reshape(data_seq[sta:end, :], [n_frame, n_route, C_0])
return tmp_seq
def data_gen(file_path, data_config, n_route, n_frame=21, day_slot=288):
'''
Source file load and dataset generation.
:param file_path: str, the file path of data source.
:param data_config: tuple, the configs of dataset in train, validation, test.
:param n_route: int, the number of routes in the graph.
:param n_frame: int, the number of frame within a standard sequence unit,
which contains n_his = 12 and n_pred = 9 (3 /15 min, 6 /30 min & 9 /45 min).
:param day_slot: int, the number of time slots per day, controlled by the time window (5 min as default).
:return: dict, dataset that contains training, validation and test with stats.
'''
n_train, n_val, n_test = data_config
# generate training, validation and test data
try:
data_seq = pd.read_csv(file_path, header=None).values
except FileNotFoundError:
print(f'ERROR: input file was not found in {file_path}.')
seq_train = seq_gen(n_train, data_seq, 0, n_frame, n_route, day_slot, 1)
seq_test = seq_gen(n_test, data_seq, n_train, n_frame, n_route, day_slot, 1)
seq_val = seq_gen(n_val, data_seq, n_train + n_test, n_frame, n_route, day_slot, 1)
# x_stats: dict, the stats for the train dataset, including the value of mean and standard deviation.
x_stats = {'mean': np.mean(seq_train), 'std': np.std(seq_train)}
# x_train, x_val, x_test: np.array, [sample_size, n_frame, n_route, channel_size].
x_train = z_score(seq_train, x_stats['mean'], x_stats['std'])
x_val = z_score(seq_val, x_stats['mean'], x_stats['std'])
x_test = z_score(seq_test, x_stats['mean'], x_stats['std'])
x_data = {'train': x_train, 'val': x_val, 'test': x_test}
dataset = Dataset(x_data, x_stats)
return dataset
def gen_batch(inputs, batch_size, dynamic_batch=False, shuffle=False):
'''
Data iterator in batch.
:param inputs: np.ndarray, [len_seq, n_frame, n_route, C_0], standard sequence units.
:param batch_size: int, the size of batch.
:param dynamic_batch: bool, whether changes the batch size in the last batch if its length is less than the default.
:param shuffle: bool, whether shuffle the batches.
'''
len_inputs = len(inputs)
if shuffle:
idx = np.arange(len_inputs)
np.random.shuffle(idx)
for start_idx in range(0, len_inputs, batch_size):
end_idx = start_idx + batch_size
if end_idx > len_inputs:
if dynamic_batch:
end_idx = len_inputs
else:
break
if shuffle:
slide = idx[start_idx:end_idx]
else:
slide = slice(start_idx, end_idx)
yield inputs[slide]