-
Notifications
You must be signed in to change notification settings - Fork 7
/
Wifi_sqlwriter.py
64 lines (52 loc) · 2.04 KB
/
Wifi_sqlwriter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from time import gmtime, strftime
import paho.mqtt.client as mqtt
import sqlite3
from sqlite3 import Error
import json
status_topic = "#"
dbFile = "data.db"
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
print(client)
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(status_topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
theTime = strftime("%Y-%m-%d %H:%M:%S", gmtime())
result = (theTime + "\t" + str(msg.payload))
print(msg.topic + ":\t" + result)
# if (msg.topic == status_topic):
p = json.loads(msg.payload)
print (json.dumps(p))
print("New message recieved")
writeToDb(theTime, p["DeviceID"], p["topic"], p["MessageID"], p["Payload"], p["path"],p["hops"],p["duckType"])
return
def writeToDb(theTime, duckId, topic, messageId, payload, path, hops, duckType):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
print ("Writing to db...")
try:
c.execute("INSERT INTO clusterData VALUES (?,?,?,?,?,?,?,?)", (theTime, duckId, topic, messageId, payload, path, hops, duckType))
conn.commit()
conn.close()
except Error as e:
print("Not Correct Packet")
print(e)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("127.0.1.1", 1883, 60)
try:
db = sqlite3.connect(dbFile)
db.cursor().execute("CREATE TABLE IF NOT EXISTS clusterData (timestamp datetime, duck_id TEXT, topic TEXT, message_id TEXT, payload TEXT, path TEXT, hops INT, duck_type INT)")
db.commit()
db.close()
except Error as e:
print(e)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()