-
Notifications
You must be signed in to change notification settings - Fork 2
/
dataController.py
156 lines (122 loc) · 5.56 KB
/
dataController.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from typing import Callable, List, Awaitable
from clock import TimeController
from db import Database
from csvwriter import CsvWriter
from scipy import signal
from normalization import Normalization
class ModelMachine:
def __init__(self, name: str,
norm: Normalization,
callback: Callable[[List[float], List[float], List[float], str], Awaitable[None]],
batch_size: int = 10, ):
self.vib_left = []
self.vib_right = []
self.temp = []
self.batch_size = batch_size
self.callback = callback
self.name = name
self.norm = norm
async def trigger(self):
if self.is_batch():
left, right, temp = await self.norm.norm(self.vib_left[:self.batch_size],
self.vib_right[:self.batch_size],
self.temp[:self.batch_size])
await self.callback(left, right, temp, self.name)
self.clear_batch()
def is_batch(self):
return len(self.vib_left) >= self.batch_size \
and len(self.temp) >= self.batch_size \
and len(self.vib_right) >= self.batch_size
def clear_batch(self):
del self.vib_left[:self.batch_size]
del self.vib_right[:self.batch_size]
del self.temp[:self.batch_size]
def add_vib_left(self, data):
self.vib_left.extend(data)
def add_vib_right(self, data):
self.vib_right.extend(data)
async def add_vib(self, left_data, right_data):
self.add_vib_left(left_data)
self.add_vib_right(right_data)
await self.trigger()
async def add_temp(self, data):
self.temp.extend(data)
await self.trigger()
class StatMachine:
def __init__(self, name, db: Database):
self.name = name
self.left = Statistics()
self.right = Statistics()
self.temp = Statistics()
self.time = TimeController()
self.db = db
async def callback(self):
avr_left = self.left.get_average()
avr_right = self.right.get_average()
avr_temp = self.temp.get_average()
await self.db.save_now(avr_left, avr_right, avr_temp)
async def trigger(self):
is_hour_changed = self.time.is_hour_change()
if is_hour_changed:
await self.callback()
async def add_vib(self, left_data, right_data):
self.left.add(left_data)
self.right.add(right_data)
await self.trigger()
async def add_temp(self, datas):
self.temp.add(datas)
await self.trigger()
class Statistics:
def __init__(self):
self.data_sum = 0
self.size = 0
def add(self, datas):
self.data_sum += sum(list(map(abs, datas)))
self.size += len(datas)
def reset(self):
self.data_sum = 0
self.size = 0
def get_average(self):
average = self.data_sum / self.size
self.reset()
return average
class DataController:
def __init__(self, model_req: Callable[[List[float], List[float], List[float]], Awaitable[None]],
norm: Normalization, batch_size,
sampling_rate: int, db_1_path, db_2_path, raw_directory):
db1 = Database(db_1_path)
db2 = Database(db_2_path)
self.machine1 = ModelMachine('machine1', norm, model_req, batch_size)
self.machine2 = ModelMachine('machine2', norm, model_req, batch_size)
self.machine1_stat = StatMachine('machine1', db1)
self.machine2_stat = StatMachine('machine2', db2)
self.vib_writer = CsvWriter(raw_directory, 'vib',
['time', 'machine1_left', 'machine1_right', 'machine2_left', 'machine2_right'])
self.temp_writer = CsvWriter(raw_directory, 'temp', ['time', 'machine1', 'machine2'])
self.sampling_rate = sampling_rate
async def add_vib(self, message: dict):
machine1_left = message['machine1_left']
machine1_right = message['machine1_right']
machine2_left = message['machine2_left']
machine2_right = message['machine2_right']
await self.machine1_stat.add_vib(machine1_left, machine1_right)
await self.machine2_stat.add_vib(machine2_left, machine2_right)
await self.vib_writer.save([[message['time'] for _ in range(len(message['machine1_left']))],
machine1_left, machine1_right, machine2_left, machine2_right])
machine1_left_resampled = signal.resample(machine1_left, self.sampling_rate)
machine1_right_resampled = signal.resample(machine1_right, self.sampling_rate)
machine2_left_resampled = signal.resample(machine2_left, self.sampling_rate)
machine2_right_resampled = signal.resample(machine2_right, self.sampling_rate)
await self.machine1.add_vib(machine1_left_resampled, machine1_right_resampled)
await self.machine2.add_vib(machine2_left_resampled, machine2_right_resampled)
async def add_temp(self, message: dict):
machine1 = message['machine1']
machine2 = message['machine2']
await self.machine1_stat.add_temp(machine1)
await self.machine2_stat.add_temp(machine2)
await self.temp_writer.save([[message['time'] for _ in range(len(message['machine1']))],
machine1, machine2])
machine1_resampled = signal.resample(machine1, self.sampling_rate)
machine2_resampled = signal.resample(machine2, self.sampling_rate)
await self.machine1.add_temp(machine1_resampled)
await self.machine2.add_temp(machine2_resampled)