-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathuart.py
139 lines (118 loc) · 4.46 KB
/
uart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from __future__ import annotations
import asyncio
import binascii
import logging
import struct
from typing import Any
import zigpy.config
import zigpy.serial
from . import common as c
LOGGER = logging.getLogger(__name__)
class Gateway(zigpy.serial.SerialProtocol):
START = b"\x01"
END = b"\x03"
def __init__(self, api):
super().__init__()
self._api = api
def connection_lost(self, exc: Exception | None) -> None:
"""Port was closed expectedly or unexpectedly."""
super().connection_lost(exc)
if self._api is not None:
self._api.connection_lost(exc)
def close(self):
super().close()
self._api = None
def send(self, cmd, data=b""):
"""Send data, taking care of escaping and framing"""
LOGGER.debug("Send: 0x%04x %s", cmd, binascii.hexlify(data))
length = len(data)
byte_head = struct.pack("!HH", cmd, length)
checksum = self._checksum(byte_head, data)
frame = struct.pack("!HHB%ds" % length, cmd, length, checksum, data)
LOGGER.debug("Frame to send: %s", frame)
frame = self._escape(frame)
LOGGER.debug("Frame escaped: %s", frame)
self._transport.write(self.START + frame + self.END)
def data_received(self, data):
"""Callback when there is data received from the uart"""
super().data_received(data)
endpos = self._buffer.find(self.END)
while endpos != -1:
startpos = self._buffer.rfind(self.START, 0, endpos)
if startpos != -1 and startpos < endpos:
frame = self._buffer[startpos : endpos + 1]
frame = self._unescape(frame[1:-1])
cmd, length, checksum, f_data, lqi = struct.unpack(
"!HHB%dsB" % (len(frame) - 6), frame
)
if len(frame) - 5 != length:
LOGGER.warning(
"Invalid length: %s, data: %s", length, len(frame) - 6
)
self._buffer = self._buffer[endpos + 1 :]
endpos = self._buffer.find(self.END)
continue
if self._checksum(frame[:4], lqi, f_data) != checksum:
LOGGER.warning(
"Invalid checksum: %s, data: 0x%s",
checksum,
binascii.hexlify(frame).decode(),
)
self._buffer = self._buffer[endpos + 1 :]
endpos = self._buffer.find(self.END)
continue
LOGGER.debug("Frame received: %s", binascii.hexlify(frame).decode())
self._api.data_received(cmd, f_data, lqi)
else:
LOGGER.warning("Malformed packet received, ignore it")
self._buffer = self._buffer[endpos + 1 :]
endpos = self._buffer.find(self.END)
def _unescape(self, data):
flip = False
ret = []
for b in data:
if flip:
flip = False
ret.append(b ^ 0x10)
elif b == 0x02:
flip = True
else:
ret.append(b)
return bytes(ret)
def _escape(self, data):
ret = []
for b in data:
if b < 0x10:
ret.extend([0x02, 0x10 ^ b])
else:
ret.append(b)
return bytes(ret)
def _checksum(self, *args):
chcksum = 0
for arg in args:
if isinstance(arg, int):
chcksum ^= arg
continue
for x in arg:
chcksum ^= x
return chcksum
async def connect(device_config: dict[str, Any], api, loop=None):
loop = asyncio.get_running_loop()
port = device_config[zigpy.config.CONF_DEVICE_PATH]
if await c.async_is_pizigate(port):
LOGGER.debug("PiZiGate detected")
await c.async_set_pizigate_running_mode()
port = port.replace("pizigate:", "", 1)
elif await c.async_is_zigate_din(port):
LOGGER.debug("ZiGate USB DIN detected")
await c.async_set_zigatedin_running_mode()
protocol = Gateway(api)
_, protocol = await zigpy.serial.create_serial_connection(
loop,
lambda: protocol,
url=port,
baudrate=device_config[zigpy.config.CONF_DEVICE_BAUDRATE],
flow_control=device_config[zigpy.config.CONF_DEVICE_FLOW_CONTROL],
)
await protocol.wait_until_connected()
return protocol