forked from tamberg/fhnw-idb
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmqtt_pub_client.py
57 lines (44 loc) · 1.61 KB
/
mqtt_pub_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import board
import busio
import digitalio
import time
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_socket
from adafruit_minimqtt import adafruit_minimqtt
# TODO: Set your Wi-Fi ssid, password
wifi_ssid = "MY_SSID"
wifi_password = "MY_PASSWORD"
# FeatherWing ESP32 AirLift, nRF52840
cs = digitalio.DigitalInOut(board.D13)
rdy = digitalio.DigitalInOut(board.D11)
rst = digitalio.DigitalInOut(board.D12)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
wifi = adafruit_esp32spi.ESP_SPIcontrol(spi, cs, rdy, rst)
while not wifi.is_connected:
print("\nConnecting to Wi-Fi...")
try:
wifi.connect_AP(wifi_ssid, wifi_password)
except RuntimeError as e:
print("Cannot connect to Wi-Fi", e)
continue
print("Wi-Fi connected to", str(wifi.ssid, "utf-8"))
print("IP address", wifi.pretty_ip(wifi.ip_address))
# MQTT setup
mqtt_broker = "test.mosquitto.org"
mqtt_topic = "hello"
def handle_connect(client, userdata, flags, rc):
print("Connected to {0}".format(client.broker))
def handle_publish(client, userdata, topic, pid):
print("Published to {0} with PID {1}".format(topic, pid))
adafruit_minimqtt.set_socket(adafruit_esp32spi_socket, wifi)
mqtt_client = adafruit_minimqtt.MQTT(broker=mqtt_broker, is_ssl=False)
# Set callback handlers
mqtt_client.on_connect = handle_connect
mqtt_client.on_publish = handle_publish
print("\nConnecting to {0}".format(mqtt_broker))
mqtt_client.connect()
while True:
t = int(time.monotonic())
mqtt_message = "Hello, up for {0}s".format(t)
mqtt_client.publish(mqtt_topic, mqtt_message)
time.sleep(5)