forked from andiburger/growatt2mqtt
-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathserial_frame_client.py
168 lines (127 loc) · 5.43 KB
/
serial_frame_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from typing import Callable
import serial
import threading
import time
class serial_frame_client():
''' basic serial client implenting a empty SOI/EOI frame'''
client : serial.Serial
running : bool = False
soi : bytes
'''start of information'''
eoi : bytes
'''end of information'''
pending_frames : list[bytes] = []
max_frame_size : int = 256
port : str = '/dev/ttyUSB0'
baud : int = 9600
timeout : float = 5
''' timeout in seconds '''
#region asyncronous
asynchronous : bool = False
''' if set, runs main loop'''
on_message : Callable[[bytes], None] = None
''' async mode only'''
thread : threading.Thread
''' main thread for read loop'''
callback_lock : threading.Lock = threading.Lock()
'''lock for callback'''
#endregion asyncronous
def __init__(self, port : str , baud : int , soi : bytes, eoi : bytes, **kwrgs) -> None:
self.soi = soi
self.eoi = eoi
self.port = port
self.baud = baud
self.client = serial.Serial(port, baud, **kwrgs)
def connect(self):
if self.asynchronous:
self.running = True
self.pending_frames = []
self.thread = threading.Thread(target=self.read_thread)
self.thread.daemon = True
self.thread.start()
return True
def write(self, data : bytes):
''' write data, excluding SOI and EOI bytes'''
data = self.soi + data + self.eoi
self.client.write(data)
def read(self, reset_buffer = True, frames = 1) -> list[bytes] | bytes:
''' returns list of frames, if frames > 1 '''
buffer = bytearray()
self.pending_frames.clear()
#for shatty order sensitive protocols.
# Clear input buffers
if reset_buffer:
self.client.reset_input_buffer()
timedout = time.time() + self.timeout
self.client.timeout = self.timeout
frameCount = 0
while time.time() < timedout:
# Read data from serial port
data = self.client.read()
# Check if data is available
if data:
# Append data to buffer
buffer += data
# Find SOI index in buffer
soi_index = buffer.find(self.soi)
# Process all occurrences of SOI in buffer
while soi_index != -1:
# Remove data before SOI sequence
buffer = buffer[soi_index:]
# Find EOI index in buffer
eoi_index = buffer.find(self.eoi)
if eoi_index != -1:
frame = buffer[len(self.soi):eoi_index]
if frames == 1:
return frame
if frameCount > 1:
# Extract and store the complete frame
self.pending_frames.append(frame)
if self.pending_frames.count() == frames:
return self.pending_frames
# Remove the processed data from the buffer
buffer = buffer[eoi_index + len(self.eoi) : ]
# Find next SOI index in the remaining buffer
soi_index = buffer.find(self.soi)
else:
# If no EOI is found and buffer size exceeds max_frame_size, clear buffer
if len(buffer) > self.max_frame_size:
buffer.clear()
break #no eoi, continue waiting
time.sleep(0.01)
def read_thread(self):
buffer = bytearray()
self.running = True
while self.running:
# Read data from serial port
data = self.client.read()
# Check if data is available
if data:
# Append data to buffer
buffer += data
# Find SOI index in buffer
soi_index = buffer.find(self.soi)
# Process all occurrences of SOI in buffer
while soi_index != -1:
# Remove data before SOI sequence
buffer = buffer[soi_index:]
# Find EOI index in buffer
eoi_index = buffer.find(self.eoi)
if eoi_index != -1:
# Extract and store the complete frame
self.pending_frames.append(buffer[len(self.soi):eoi_index])
# Remove the processed data from the buffer
buffer = buffer[eoi_index + len(self.eoi) : ]
# Find next SOI index in the remaining buffer
soi_index = buffer.find(self.soi)
else:
# If no EOI is found and buffer size exceeds max_frame_size, clear buffer
if len(buffer) > self.max_frame_size:
buffer.clear()
break #no eoi, continue waiting
#can probably be in the loop, but being cautious
for frame in self.pending_frames:
with self.callback_lock:
if self.on_message:
self.on_message(frame)
time.sleep(0.01)