-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMeter.py
144 lines (111 loc) · 7.18 KB
/
Meter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import collections
import time
from datetime import timedelta, datetime
import thread
import logging
class Meter:
__historical_dial_positions = None
__furthest_dial_position = float(0)
__meter_reading = float(0)
def __init__(self, config):
"""
:param config:
:type config: Config
"""
self.__config = config
self.__historical_dial_positions = collections.deque(
maxlen=config.meter_max_historical_measurements)
self.__logger = logging.getLogger(__name__)
return
def update_with_dial_position(self, (utc_datetime, dial_position)):
if dial_position is not None:
self.__historical_dial_positions.append((utc_datetime, dial_position))
def get_meter_reading(self):
return self.__meter_reading
def get_average_per_min(self, lookback_in_seconds, current_date_time):
if len(self.__historical_dial_positions) == 0:
return float(0)
historical_dial_positions = list(self.__historical_dial_positions) # take a snapshot of the circular buffer into a list
readings = self.__remove_jitter_from_readings(historical_dial_positions)
window_start_time = current_date_time - timedelta(seconds=lookback_in_seconds)
readings_in_window = filter(lambda x: x[0] >= window_start_time and x[0] <= current_date_time, readings) # filter out those readings not in the time window
readings_in_window.sort(key=lambda x: x[0]) # sort by time, so earliest reading is first
last_dial_position = readings_in_window[len(readings_in_window)-1][1]
if self.__config.print_debug_info:
self.__log_debug_info(current_date_time, lookback_in_seconds, readings_in_window)
total_usage = self.__get_total_usage_over_readings(readings_in_window)
if last_dial_position > self.__furthest_dial_position: # the most recent dial position appears to be further on than our furthest dial position
self.__furthest_dial_position = last_dial_position
self.__meter_reading = abs(self.__meter_reading) + last_dial_position
elif last_dial_position < self.__furthest_dial_position: # we've either gone past top position, or it's some jitter
if (last_dial_position + 1) - self.__furthest_dial_position < 0.5: # if the dial has moved less than half a rotation since the last know furthest position, we assume it's gone past top position
self.__furthest_dial_position = last_dial_position
self.__meter_reading = abs(self.__meter_reading) + 1 + last_dial_position
# calculate the average usage over the desired period
readings_period_in_min = float(lookback_in_seconds) / 60
average = total_usage / readings_period_in_min
average = round(average, 2)
return average
def start_taking_regular_measurements(self, measurement_internal_ms, fn_get_latest_dial_position):
self.__logger.info("Starting to take measurements every %sms", measurement_internal_ms)
thread.start_new_thread(self.__take_regular_measurements,
(measurement_internal_ms, fn_get_latest_dial_position))
def __remove_jitter_from_readings(self, readings):
readings.sort(key=lambda x: x[0])
clean_readings = []
last_dial_position = float(0)
for reading in readings:
this_reading_dial_position = reading[1]
if this_reading_dial_position > last_dial_position:
clean_readings.append(reading)
last_dial_position = this_reading_dial_position
elif reading[1] == last_dial_position:
clean_readings.append(reading)
else: # we might have gone past top position, or it's jitter
if (this_reading_dial_position + 1) - last_dial_position < 0.5: # if the dial has moved less that half a rotation
clean_readings.append(reading)
last_dial_position = this_reading_dial_position
return clean_readings
def __get_total_usage_over_readings(self, readings):
total_usage = 0
absolute_total_usage = 0
dial_movements_count = 0
first_dial_position = readings[0][1]
greatest_dial_position = first_dial_position
previous_dial_position = first_dial_position
for i in range(0, len(readings)):
this_dial_position = float(readings[i][1])
if this_dial_position > greatest_dial_position: # if the dial appears to have moved forward since last reading
dial_position_movement = this_dial_position - greatest_dial_position # measure how much it's moved forward
total_usage = total_usage + dial_position_movement # record the movement
greatest_dial_position = this_dial_position # reset the marker for greatest known position
# if this_dial_position == greatest_dial_position then it hasn't moved, and we do nothing
elif this_dial_position < greatest_dial_position: # we've might have gone past top position, or it's jitter
if (this_dial_position + 1) - greatest_dial_position < 0.5: # if the dial has moved less than half a rotation
# since the last known greatest dial position, we assume it's gone past top position
dial_position_movement = (this_dial_position + 1) - greatest_dial_position # calc movement assuming
# dial has gone past top position
total_usage = total_usage + dial_position_movement # record the movement
greatest_dial_position = this_dial_position # reset the marker for greatest known position
dial_position_difference = this_dial_position - previous_dial_position # record any apparent movement (forward or back)
if dial_position_difference != 0:
dial_movements_count = dial_movements_count + 1
absolute_total_usage = absolute_total_usage + abs(dial_position_difference) # record absolute movement
previous_dial_position = this_dial_position # reset marker for previous dial position
return total_usage
def __take_regular_measurements(self, measurement_internal_ms, fn_get_latest_dial_position):
interval_in_sec = float(measurement_internal_ms / 1000)
start_time = time.time()
while True:
time.sleep(interval_in_sec - ((time.time() - start_time) % interval_in_sec))
latest_dial_position = fn_get_latest_dial_position()
self.__logger.info("Latest dial position is %s", latest_dial_position)
self.update_with_dial_position((datetime.utcnow(), latest_dial_position))
@staticmethod
def __log_debug_info(current_date_time, lookback_in_seconds, dial_positions):
Meter.__log_out_dial_positions(dial_positions)
print("current time: " + current_date_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + ", lookback seconds: " + str(lookback_in_seconds))
@staticmethod
def __log_out_dial_positions(dial_positions):
for i in range(0, len(dial_positions)):
print("(self.__time(\"" + dial_positions[i][0].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + "\"), " + str(dial_positions[i][1]) + "),")