-
Notifications
You must be signed in to change notification settings - Fork 1
/
GetAllSensors.py
82 lines (65 loc) · 2.47 KB
/
GetAllSensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
""
Get latest sensor data for all sensors
Send updated data asynchronously to server with aiohttp.
Example sends data (application/json) to:
POST http://10.0.0.1:5000/api/sensordatas
PUT http://10.0.0.1:5000/api/sensors/{mac}
Requires:
asyncio - Python 3.5
aiohttp - pip install aiohttp
""
from datetime import datetime
from multiprocessing import Manager
from urllib.parse import quote
import json
from concurrent.futures import ProcessPoolExecutor
import asyncio
from aiohttp import ClientSession
from ruuvitag_sensor.ruuvi import RuuviTagSensor
all_data = {}
server_url = 'http://10.0.0.1:5000/api'
async def handle_queue(queue):
async def send_post(session, update_data):
async with session.post(
'{url}/sensordatas'.format(url=server_url),
data=json.dumps(update_data),
headers={'content-type': 'application/json'}
) as response:
response = await response.read()
async def send_put(session, update_data):
async with session.put(
'{url}/sensors/{mac}'.format(url=server_url, mac=quote(update_data['mac'])),
data=json.dumps(update_data),
headers={'content-type': 'application/json'}
) as response:
response = await response.read()
async with ClientSession() as session:
while True:
if not queue.empty():
funcs = []
while not queue.empty():
update_data = queue.get()
funcs.append(send_put(session, update_data))
funcs.append(send_post(session, update_data))
if len(funcs) == 50:
continue
if funcs:
await asyncio.wait(funcs)
else:
await asyncio.sleep(0.2)
def run_get_datas_background(queue):
def handle_new_data(new_data):
current_time = datetime.now()
sensor_mac = new_data[0]
sensor_data = new_data[1]
if sensor_mac not in all_data or all_data[sensor_mac]['data'] != sensor_data:
update_data = {'mac': sensor_mac, 'data': sensor_data, 'timestamp': current_time.isoformat()}
all_data[sensor_mac] = update_data
queue.put(update_data)
RuuviTagSensor.get_datas(handle_new_data)
m = Manager()
q = m.Queue()
executor = ProcessPoolExecutor()
executor.submit(run_get_datas_background, q)
loop = asyncio.get_event_loop()
loop.run_until_complete(handle_queue(q))