Skip to content

Commit

Permalink
Add JK BMS software v10.x support (#116)
Browse files Browse the repository at this point in the history
* remove non-available temp sensors
* integrated JK02_24S protocol
* corrected protocol sequence
* update docu
* fixed tests for both protocols
  • Loading branch information
patman15 authored Dec 18, 2024
1 parent f9ed521 commit 6d9ad87
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 267 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ This integration allows to monitor Bluetooth Low Energy (BLE) battery management
- JBD BMS, Jiabaida (show up as `SP..S`…)
- accurat batteries (show up as `GJ-`…)
- Supervolt v3 batteries (show up as `SX1*`…)
- JK BMS, Jikong, (HW version >=11 required)
- JK BMS, Jikong, (HW version >=6 required)
- Offgridtec LiFePo4 Smart Pro: type A & B (show up as `SmartBat-A`… or `SmartBat-B`…)
- LiTime, Redodo batteries
- Seplos v2 (show up as `BP0`?)
Expand Down
83 changes: 57 additions & 26 deletions custom_components/bms_ble/plugins/jikong_bms.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class BMS(BaseBMS):
INFO_LEN: Final[int] = 300
_FIELDS: Final[list[tuple[str, int, int, bool, Callable[[int], int | float]]]] = (
[ # Protocol: JK02_32S; JK02_24S has offset -32
(KEY_CELL_COUNT, 70, 4, False, lambda x: x.bit_count()),
(ATTR_DELTA_VOLTAGE, 76, 2, False, lambda x: float(x / 1000)),
(ATTR_VOLTAGE, 150, 4, False, lambda x: float(x / 1000)),
(ATTR_CURRENT, 158, 4, True, lambda x: float(x / 1000)),
(ATTR_BATTERY_LEVEL, 173, 1, False, lambda x: x),
Expand All @@ -55,9 +53,11 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None:
"""Intialize private BMS members."""
super().__init__(LOGGER, self._notification_handler, ble_device, reconnect)
self._data: bytearray = bytearray()
self._data_final: bytearray | None = None
self._data_final: bytearray = bytearray()
self._char_write_handle: int | None = None
self._valid_replies: list[int] = [0x2] # BMS ready confirmation
self._bms_info: dict[str, str] = {}
self._prot_offset: int = 0
self._valid_reply: int = 0x02

@staticmethod
def matcher_dict_list() -> list[dict[str, Any]]:
Expand Down Expand Up @@ -131,7 +131,7 @@ def _notification_handler(self, _sender, data: bytearray) -> None:
return

# check that message type is expected
if self._data[BMS.TYPE_POS] not in self._valid_replies:
if self._data[BMS.TYPE_POS] != self._valid_reply:
LOGGER.debug(
"%s: unexpected message type 0x%X (length %i): %s",
self.name,
Expand Down Expand Up @@ -159,10 +159,9 @@ def _notification_handler(self, _sender, data: bytearray) -> None:
self._data[-1],
crc,
)
self._data_final = None # reset invalid data
else:
self._data_final = self._data
return

self._data_final = self._data
self._data_event.set()

async def _init_characteristics(self) -> None:
Expand Down Expand Up @@ -202,13 +201,20 @@ async def _init_characteristics(self) -> None:
char_notify_handle or 0, self._notification_handler
)

# query device info frame and wait for BMS ready (0xC8)
self._valid_replies.append(0xC8)
# query device info frame (0x03) and wait for BMS ready (0xC8)
self._valid_reply = 0x03
await self._client.write_gatt_char(
self._char_write_handle or 0, data=self._cmd(b"\x97")
)
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)
self._valid_replies.remove(0xC8)
self._bms_info = BMS._dec_devinfo(self._data_final or bytearray())
LOGGER.debug("%s: device information: %s", self.name, self._bms_info)
self._prot_offset = (
-32 if int(self._bms_info.get("sw_version", "")[:2]) < 11 else 0
)
self._valid_reply = 0xC8 # BMS ready confirmation
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)
self._valid_reply = 0x02 # cell information

@staticmethod
def _cmd(cmd: bytes, value: list[int] | None = None) -> bytes:
Expand All @@ -221,6 +227,10 @@ def _cmd(cmd: bytes, value: list[int] | None = None) -> bytes:
frame += bytes([crc_sum(frame)])
return frame

@staticmethod
def _dec_devinfo(data: bytearray) -> dict[str, str]:
return {"hw_version": data[22:27].decode(), "sw_version": data[30:35].decode()}

@staticmethod
def _cell_voltages(data: bytearray, cells: int) -> dict[str, float]:
"""Return cell voltages from status message."""
Expand All @@ -235,41 +245,62 @@ def _cell_voltages(data: bytearray, cells: int) -> dict[str, float]:
}

@staticmethod
def _temp_sensors(data: bytearray) -> dict[str, float]:
def _temp_sensors(data: bytearray, offs: int) -> dict[str, float]:
temp_pos: Final[list[tuple[int, int]]] = (
[(0, 130), (1, 132), (2, 134)]
if offs
else [(0, 144), (1, 162), (2, 164), (3, 256), (4, 258)]
)
return {
f"{KEY_TEMP_VALUE}{idx}": int.from_bytes(
data[pos : pos + 2], byteorder="little", signed=False
)
/ 10
for idx, pos in [(0, 144), (1, 162), (2, 164), (3, 256), (4, 258)]
for idx, pos in temp_pos
if int.from_bytes(data[pos : pos + 2], byteorder="little", signed=False)
}

@staticmethod
def _decode_data(data: bytearray) -> BMSsample:
def _decode_data(data: bytearray, offs: int) -> BMSsample:
"""Return BMS data from status message."""
return {
key: func(
int.from_bytes(data[idx : idx + size], byteorder="little", signed=sign)
)
for key, idx, size, sign, func in BMS._FIELDS
}
return (
{
KEY_CELL_COUNT: int.from_bytes(
data[70 + (offs >> 1) : 74 + (offs >> 1)],
byteorder="little",
).bit_count()
}
| {
ATTR_DELTA_VOLTAGE: int.from_bytes(
data[76 + (offs >> 1) : 78 + (offs >> 1)],
byteorder="little",
)
/ 1000
}
| {
key: func(
int.from_bytes(
data[idx + offs : idx + offs + size],
byteorder="little",
signed=sign,
)
)
for key, idx, size, sign, func in BMS._FIELDS
}
)

async def _async_update(self) -> BMSsample:
"""Update battery status information."""
if not self._data_event.is_set():
if not self._data_event.is_set() or self._data_final[4] != 0x02:
# request cell info (only if data is not constantly published)
LOGGER.debug("%s: request cell info", self.name)
await self._client.write_gatt_char(
self._char_write_handle or 0, data=BMS._cmd(b"\x96")
)
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)

if self._data_final is None:
return {}

data = self._decode_data(self._data_final)
data.update(BMS._temp_sensors(self._data_final))
data: BMSsample = self._decode_data(self._data_final, self._prot_offset)
data.update(BMS._temp_sensors(self._data_final, self._prot_offset))
data.update(BMS._cell_voltages(self._data_final, int(data[KEY_CELL_COUNT])))

return data
Loading

0 comments on commit 6d9ad87

Please sign in to comment.