forked from tgalarneau/bms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
jbdbms-4-socket-2temps.py
189 lines (173 loc) · 7.57 KB
/
jbdbms-4-socket-2temps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python3
# using python 3.9
from bluepy.btle import Peripheral, DefaultDelegate, BTLEException
import struct
import argparse
import sys
import time
import binascii
import socket
import atexit
# Command line arguments
parser = argparse.ArgumentParser(description='Fetches and outputs JBD bms data')
parser.add_argument("-b", "--BLEaddress", help="Device BLE Address", required=True)
parser.add_argument("-i", "--interval", type=int, help="Data fetch interval", required=True)
parser.add_argument("-m", "--meter", help="meter name", required=True)
args = parser.parse_args()
z = args.interval
meter = args.meter
class StatsReporter:
def __init__(
self,
socket_type,
socket_address,
socket_data,
encoding='utf-8',
):
self._socket_type = socket_type
self._socket_address = socket_address
self._encoding = encoding
self._socket_data = socket_data
self.create_socket()
def create_socket(self):
try:
#sock = socket.socket(*self._socket_type,self._socket_data)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.connect("/tmp/telegraf.sock")
self._sock = sock
print('Created socket')
except socket.error as e:
print(f'Error creating socket: {e}')
def close_socket(self):
try:
self._sock.close()
print('Closed socket')
except (AttributeError, socket.error) as e:
print(f'Error closing socket: {e}')
def send_data(self, data):
try:
sent = self._sock.send(data.encode(self._encoding))
print(data)
except (AttributeError, socket.error) as e:
print(f'Error sending data on socket: {e}')
# attempt to recreate socket on error
self.close_socket()
self.create_socket()
def cellinfo1(data): # process pack info
infodata = data
i = 4 # Unpack into variables, skipping header bytes 0-3
volts, amps, remain, capacity, cycles, mdate, balance1, balance2 = struct.unpack_from('>HhHHHHHH', infodata, i)
volts=volts/100
amps = amps/100
capacity = capacity/100
remain = remain/100
watts = volts*amps # adding watts field for dbase
bal1 = (format(balance1, "b").zfill(16))
c16 = int(bal1[0:1])
c15 = int(bal1[1:2]) # using balance1 bits for 16 cells
c14 = int(bal1[2:3]) # balance2 is for next 17-32 cells - not using
c13 = int(bal1[3:4])
c12 = int(bal1[4:5]) # bit shows (0,1) charging on-off
c11 = int(bal1[5:6])
c10 = int(bal1[6:7])
c09 = int(bal1[7:8])
c08 = int(bal1[8:9])
c07 = int(bal1[9:10])
c06 = int(bal1[10:11])
c05 = int(bal1[11:12])
c04 = int(bal1[12:13])
c03 = int(bal1[13:14])
c02 = int(bal1[14:15])
c01 = int(bal1[15:16])
message = ("meter,volts,amps,watts,remain,capacity,cycles\r\n%s,%0.2f,%0.2f,%0.2f,%0i,%0i,%0i" % (meter,volts,amps,watts,remain,capacity,cycles))
print(message)
#reporter.send_data(message)
message = ("meter,c01,c02,c03,c04,c05,c06,c07,c08\r\n%s,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i" % (meter,c01,c02,c03,c04,c05,c06,c07,c08))
print(message)
#reporter.send_data(message)
message = ("meter,c09,c10,c11,c12,c13,c14,c15,c16\r\n%s,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i" % (meter,c09,c10,c11,c12,c13,c14,c15,c16))
print(message)
#reporter.send_data(message)
def cellinfo2(data):
infodata = data
i = 0 # unpack into variables, ignore end of message byte '77'
protect,vers,percent,fet,cells,sensors,temp1,temp2,b77 = struct.unpack_from('>HBBBBBHHB', infodata, i)
temp1 = (temp1-2731)/10
temp2 = (temp2-2731)/10 # fet 0011 = 3 both on ; 0010 = 2 disch on ; 0001 = 1 chrg on ; 0000 = 0 both off
prt = (format(protect, "b").zfill(16)) # protect trigger (0,1)(off,on)
ovp = int(prt[0:1]) # overvoltage
uvp = int(prt[1:2]) # undervoltage
bov = int(prt[2:3]) # pack overvoltage
buv = int(prt[3:4]) # pack undervoltage
cot = int(prt[4:5]) # current over temp
cut = int(prt[5:6]) # current under temp
dot = int(prt[6:7]) # discharge over temp
dut = int(prt[7:8]) # discharge under temp
coc = int(prt[8:9]) # charge over current
duc = int(prt[9:10]) # discharge under current
sc = int(prt[10:11]) # short circuit
ic = int(prt[11:12]) # ic failure
cnf = int(prt[12:13]) # fet config problem
message = ("meter,ovp,uvp,bov,buv,cot,cut,dot,dut,coc,duc,sc,ic,cnf\r\n%s,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i,%0i" % (meter,ovp,uvp,bov,buv,cot,cut,dot,dut,coc,duc,sc,ic,cnf))
print(message)
#reporter.send_data(message)
message = ("meter,protect,percent,fet,cells,temp1,temp2\r\n%s,%0000i,%00i,%00i,%0i,%0.1f,%0.1f" % (meter,protect,percent,fet,cells,temp1,temp2))
print(message)
#reporter.send_data(message)
# not sending version number or number of temp sensors
def cellvolts1(data): # process cell voltages
global cells1
celldata = data
i = 4 # Unpack into variables, skipping header bytes 0-3
cell1, cell2, cell3, cell4 = struct.unpack_from('>HHHH', celldata, i)
cells1 = [cell1, cell2, cell3, cell4] # needed for max, min, delta calculations
message = ("meter,cell1,cell2,cell3,cell4\r\n%s,%0i,%0i,%0i,%0i" % (meter,cell1,cell2,cell3,cell4))
print(message)
#reporter.send_data(message)
cellmin = min(cells1)
cellmax = max(cells1)
delta = cellmax-cellmin
message = ("meter,cellmin,cellmax,delta\r\n%s,%0i,%0i,%0i" % (meter,cellmin,cellmax,delta))
print(message)
#reporter.send_data(message)
class MyDelegate(DefaultDelegate): # notification responses
def __init__(self):
DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
hex_data = binascii.hexlify(data) # Given raw bytes, get an ASCII string representing the hex values
text_string = hex_data.decode('utf-8') # check incoming data for routing to decoding routines
if text_string.find('dd04') != -1: # x04 (1-8 cells)
cellvolts1(data)
elif text_string.find('dd03') != -1: # x03
cellinfo1(data)
#elif text_string.find('77') != -1 and len(text_string) == 38: # x04 (9-16 cells)
# cellvolts2(data)
elif text_string.find('77') != -1 and len(text_string) == 28 or len(text_string) == 36: # x03
cellinfo2(data)
try:
print('attempting to connect')
bms = Peripheral(args.BLEaddress,addrType="public")
except BTLEException as ex:
time.sleep(10)
print('2nd try connect')
bms = Peripheral(args.BLEaddress,addrType="public")
except BTLEException as ex:
print('cannot connect')
exit()
else:
print('connected ',args.BLEaddress)
bms.setDelegate(MyDelegate()) # setup delegate for notifications
reporter = StatsReporter(
(socket.AF_UNIX, ),
'/tmp/telegraf.sock',
socket.SOCK_DGRAM)
atexit.register(reporter.close_socket)
# write empty data to 0x15 for notification request -- address x03 handle for info & x04 handle for cell voltage
# using waitForNotifications(5) as less than 5 seconds has caused some missed notifications
while True:
#print('sending')
result = bms.writeCharacteristic(0x15,b'\xdd\xa5\x03\x00\xff\xfd\x77',False) # write x03 w/o response cell info
bms.waitForNotifications(5)
result = bms.writeCharacteristic(0x15,b'\xdd\xa5\x04\x00\xff\xfc\x77',False) # write x04 w/o response cell voltages
bms.waitForNotifications(5)
time.sleep(z)