-
Notifications
You must be signed in to change notification settings - Fork 59
/
bluemaestro.py
executable file
Β·244 lines (205 loc) Β· 9.66 KB
/
bluemaestro.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/python3
"""
[[https://bluemaestro.com/products/product-details/bluetooth-environmental-monitor-and-logger][Bluemaestro]] temperature/humidity/pressure monitor
"""
# todo most of it belongs to DAL... but considering so few people use it I didn't bother for now
from datetime import datetime, timedelta
from pathlib import Path
import re
import sqlite3
from typing import Iterable, Sequence, Set, Optional
from .core import get_files, LazyLogger, dataclass
from my.config import bluemaestro as config
# todo control level via env variable?
# i.e. HPI_LOGGING_MY_BLUEMAESTRO_LEVEL=debug
logger = LazyLogger(__name__, level='debug')
def inputs() -> Sequence[Path]:
return get_files(config.export_path)
Celsius = float
Percent = float
mBar = float
@dataclass
class Measurement:
dt: datetime # todo aware/naive
temp : Celsius
humidity: Percent
pressure: mBar
dewpoint: Celsius
# fixme: later, rely on the timezone provider
# NOTE: the timezone should be set with respect to the export date!!!
import pytz # type: ignore
tz = pytz.timezone('Europe/London')
# TODO when I change tz, check the diff
def is_bad_table(name: str) -> bool:
# todo hmm would be nice to have a hook that can patch any module up to
delegate = getattr(config, 'is_bad_table', None)
return False if delegate is None else delegate(name)
from .core.cachew import cache_dir
from .core.common import mcachew
@mcachew(depends_on=lambda: inputs(), cache_path=cache_dir() / 'bluemaestro.cache')
def measurements() -> Iterable[Measurement]:
# todo ideally this would be via arguments... but needs to be lazy
dbs = inputs()
last: Optional[datetime] = None
# tables are immutable, so can save on processing..
processed_tables: Set[str] = set()
for f in dbs:
logger.debug('processing %s', f)
tot = 0
new = 0
# todo assert increasing timestamp?
with sqlite3.connect(f'file:{f}?immutable=1', uri=True) as db:
db_dt: Optional[datetime] = None
try:
datas = db.execute(f'SELECT "{f.name}" as name, Time, Temperature, Humidity, Pressure, Dewpoint FROM data ORDER BY log_index')
oldfmt = True
db_dts = list(db.execute(f'SELECT last_download FROM info'))[0][0]
if db_dts == 'N/A':
# ??? happens for 20180923-20180928
continue
if db_dts.endswith(':'):
db_dts += '00' # wtf.. happens on some day
db_dt = tz.localize(datetime.strptime(db_dts, '%Y-%m-%d %H:%M:%S'))
except sqlite3.OperationalError:
# Right, this looks really bad.
# The device doesn't have internal time & what it does is:
# 1. every X seconds, record a datapoint, store it in the internal memory
# 2. on sync, take the phone's datetime ('now') and then ASSIGN the timestamps to the collected data
# as now, now - X, now - 2X, etc
#
# that basically means that for example, hourly timestamps are completely useless? because their error is about 1h
# yep, confirmed on some historic exports. seriously, what the fuck???
#
# The device _does_ have an internal clock, but it's basically set to 0 every time you update settings
# So, e.g. if, say, at 17:15 you set the interval to 3600, the 'real' timestamps would be
# 17:15, 18:15, 19:15, etc
# But depending on when you export, you might get
# 17:35, 18:35, 19:35; or 17:55, 18:55, 19:55, etc
# basically all you guaranteed is that the 'correct' interval is within the frequency
# it doesn't seem to keep the reference time in the database
#
# UPD: fucking hell, so you can set the reference date in the settings (calcReferenceUnix field in meta db)
# but it's not set by default.
log_tables = [c[0] for c in db.execute('SELECT name FROM sqlite_sequence WHERE name LIKE "%_log"')]
log_tables = [t for t in log_tables if t not in processed_tables]
processed_tables |= set(log_tables)
# todo use later?
frequencies = [list(db.execute(f'SELECT interval from {t.replace("_log", "_meta")}'))[0][0] for t in log_tables]
# todo could just filter out the older datapoints?? dunno.
# eh. a bit horrible, but seems the easiest way to do it?
# note: for some reason everything in the new table multiplied by 10
query = ' UNION '.join(
f'SELECT "{t}" AS name, unix, tempReadings / 10.0, humiReadings / 10.0, pressReadings / 10.0, dewpReadings / 10.0 FROM {t}'
for t in log_tables
)
if len(log_tables) > 0: # ugh. otherwise end up with syntax error..
query = f'SELECT * FROM ({query}) ORDER BY name, unix'
datas = db.execute(query)
oldfmt = False
db_dt = None
for i, (name, tsc, temp, hum, pres, dewp) in enumerate(datas):
if is_bad_table(name):
continue
# note: bluemaestro keeps local datetime
if oldfmt:
tss = tsc.replace('Juli', 'Jul').replace('Aug.', 'Aug')
dt = datetime.strptime(tss, '%Y-%b-%d %H:%M')
dt = tz.localize(dt)
assert db_dt is not None
else:
# todo cache?
m = re.search(r'_(\d+)_', name)
assert m is not None
export_ts = int(m.group(1))
db_dt = datetime.fromtimestamp(export_ts / 1000, tz=tz)
dt = datetime.fromtimestamp(tsc / 1000, tz=tz)
## sanity checks (todo make defensive/configurable?)
# not sure how that happens.. but basically they'd better be excluded
lower = timedelta(days=6000 / 24) # ugh some time ago I only did it once in an hour.. in theory can detect from meta?
upper = timedelta(days=10) # kinda arbitrary
if not (db_dt - lower < dt < db_dt + timedelta(days=10)):
# todo could be more defenive??
raise RuntimeError('timestamp too far out', f, name, db_dt, dt)
assert -60 <= temp <= 60, (f, dt, temp)
##
tot += 1
if last is not None and last >= dt:
continue
# todo for performance, pass 'last' to sqlite instead?
last = dt
new += 1
p = Measurement(
dt=dt,
temp=temp,
pressure=pres,
humidity=hum,
dewpoint=dewp,
)
yield p
logger.debug('%s: new %d/%d', f, new, tot)
# logger.info('total items: %d', len(merged))
# for k, v in merged.items():
# # TODO shit. quite a few of them have varying values... how is that freaking possible????
# # most of them are within 0.5 degree though... so just ignore?
# if isinstance(v, set) and len(v) > 1:
# print(k, v)
# for k, v in merged.items():
# yield Point(dt=k, temp=v) # meh?
from .core import stat, Stats
def stats() -> Stats:
return stat(measurements)
from .core.pandas import DataFrameT, as_dataframe
def dataframe() -> DataFrameT:
"""
%matplotlib gtk
from my.bluemaestro import dataframe
dataframe().plot()
"""
df = as_dataframe(measurements(), schema=Measurement)
# todo not sure how it would handle mixed timezones??
# todo hmm, not sure about setting the index
return df.set_index('dt')
def fill_influxdb() -> None:
from itertools import islice
from .core.common import asdict
from influxdb import InfluxDBClient # type: ignore
client = InfluxDBClient()
db = 'db'
mname = __name__.replace('.', '_')
client.delete_series(database=db, measurement=mname)
def dissoc(d, k):
del d[k]
return d # meh
jsons = ({
'measurement': mname,
# todo maybe good idea to tags with database file/name? to inspect inconsistencies etc..
# 'tags': {'activity': e.activity},
'time': e.dt.isoformat(),
'fields': dissoc(asdict(e), 'dt'),
} for e in measurements())
from more_itertools import chunked
# "The optimal batch size is 5000 lines of line protocol."
# some chunking is def necessary, otherwise it fails
for chunk in chunked(jsons, n=5000):
cl = list(chunk)
logger.debug('writing next chunk %s', cl[-1])
client.write_points(cl, database=db)
# todo "Specify timestamp precision when writing to InfluxDB."?
def check() -> None:
temps = list(measurements())
latest = temps[:-2]
prev = latest[-2].dt
last = latest[-1].dt
# todo stat should expose a dataclass?
# TODO ugh. might need to warn about points past 'now'??
# the default shouldn't allow points in the future...
#
# TODO also needs to be filtered out on processing, should be rejected on the basis of export date?
POINTS_STORED = 6000 # on device?
FREQ_SEC = 60
SECS_STORED = POINTS_STORED * FREQ_SEC
HOURS_STORED = POINTS_STORED / (60 * 60 / FREQ_SEC) # around 4 days
NOW = datetime.now()
assert NOW - last < timedelta(hours=HOURS_STORED / 2), f'old backup! {last}'
assert last - prev < timedelta(minutes=3), f'bad interval! {last - prev}'
single = (last - prev).seconds