-
Notifications
You must be signed in to change notification settings - Fork 177
/
threading.py
153 lines (130 loc) · 5.49 KB
/
threading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# TinyTuya Example
# -*- coding: utf-8 -*-
"""
TinyTuya - Multi-threaded Example
This demonstrates how to use threading to listen for status updates from multiple
Tuya devices.
Setup:
Set the config for each device and the script will create a thread for each device
to listen for status updates. The main thread will continue to run and can be used
to send commands to the devices.
Author: Jason A. Cox
For more information see https://github.com/jasonacox/tinytuya
"""
import threading
import time
import tinytuya
# Define the devices to control
config = {
"TuyaDevices": [
{
"Address": "192.168.1.10",
"Device ID": "00112233445566778899",
"Local Key": "1234567890123abc",
"Version": "3.3",
},
{
"Address": "192.168.1.11",
"Device ID": "10112233445566778899",
"Local Key": "1234567890123abc",
"Version": "3.3",
},
{
"Address": "192.168.1.12",
"Device ID": "20112233445566778899",
"Local Key": "1234567890123abc",
"Version": "3.3",
},
{
"Address": "192.168.1.13",
"Device ID": "30112233445566778899",
"Local Key": "1234567890123abc",
"Version": "3.3",
}
]
}
# Settings
TTL_HEARTBEAT = 12 # Time in seconds between heartbeats
# Create array, devices, that is an array of tinytuya.Device objects
devices = []
for i in config["TuyaDevices"]:
d = tinytuya.Device(i["Device ID"], i["Address"], i["Local Key"], version=i["Version"])
devices.append(d) # Add the device to the devices array
# Function to listen for status updates from each device
def getDeviceStatuses():
global devices
global statuses
def listen_for_status_updates(device, index):
"""
Thread function to continuously listen for status updates and send heartbeats.
"""
# Enable persistent connection to the device
def reconnect():
time.sleep(5) # Cool-down before reconnecting
try:
print(f"Reconnecting to {device.id}...")
device.set_socketPersistent(True)
initial_status = device.status()
print(f"Reconnected and got status from {device.id}: {initial_status}")
statuses[index] = {"id": device.id, "status": initial_status["dps"]}
return True
except Exception as e:
print(f"Failed to reconnect to {device.id}: {e}")
return False
try:
# Call status() once to establish connection and get initial status
device.set_socketPersistent(True)
initial_status = device.status()
print(f"INITIAL status from {device.id}: {initial_status}")
statuses[index] = {"id": device.id, "status": initial_status["dps"]}
except Exception as e:
print(f"Error getting initial status from {device.id}: {e}")
statuses[index] = {"id": device.id, "status": "Disconnected"}
return
# Variables to track the last heartbeat
last_heartbeat_time = time.time()
# Infinite loop to listen for status updates
while True:
try:
# Send a heartbeat every 5 seconds
if time.time() - last_heartbeat_time >= TTL_HEARTBEAT:
try:
device.heartbeat()
print(f"Heartbeat sent to {device.id}")
last_heartbeat_time = time.time()
except Exception as hb_error:
print(f"Failed to send heartbeat to {device.id}: {hb_error}")
# Try to reconnect if the heartbeat fails
if not reconnect():
statuses[index]["status"] = "Disconnected"
break # Exit the loop if reconnection fails
# Listen for updates from the device
updated_status = device.receive()
if updated_status:
print(f"UPDATE status from {device.id}: {updated_status}")
# We may only get one DPS, so just update that one item
if "dps" in updated_status:
for key in updated_status["dps"]:
statuses[index]["status"][key] = updated_status["dps"][key]
print(f" - Updated status for {device.id} DPS {key} to {updated_status['dps'][key]}")
# Small delay to avoid tight loops
time.sleep(0.1)
except Exception as e:
print(f"Error receiving status from {device.id}: {e}")
statuses[index]["status"] = "Disconnected"
if not reconnect():
break # Exit the loop if reconnection fails
threads = []
# Create and start a thread for each device
for index, device in enumerate(devices):
print(f"Starting thread for device {device.id}")
thread = threading.Thread(target=listen_for_status_updates, args=(device, index))
thread.daemon = True # Daemon threads exit with the main program
threads.append(thread)
thread.start()
# Example usage
statuses = [None] * len(devices) # Initialize statuses list to hold results for each device
getDeviceStatuses()
# Optionally, keep the main program running indefinitely
while True:
time.sleep(1) # Keep the main thread alive