Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JK BMS software v10.x support #116

Merged
merged 9 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ This integration allows to monitor Bluetooth Low Energy (BLE) battery management
- JBD BMS, Jiabaida (show up as `SP..S`…)
- accurat batteries (show up as `GJ-`…)
- Supervolt v3 batteries (show up as `SX1*`…)
- JK BMS, Jikong, (HW version >=11 required)
- JK BMS, Jikong, (HW version >=6 required)
- Offgridtec LiFePo4 Smart Pro: type A & B (show up as `SmartBat-A`… or `SmartBat-B`…)
- LiTime, Power Queen, and Redodo batteries
- Seplos v3 (show up as `SP0`… or `SP1`…)
Expand Down
2 changes: 1 addition & 1 deletion custom_components/bms_ble/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@
"issue_tracker": "https://github.com/patman15/BMS_BLE-HA/issues",
"loggers": ["bleak_retry_connector"],
"requirements": [],
"version": "1.10.0"
"version": "1.11.0"
}
83 changes: 57 additions & 26 deletions custom_components/bms_ble/plugins/jikong_bms.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class BMS(BaseBMS):
INFO_LEN: Final[int] = 300
_FIELDS: Final[list[tuple[str, int, int, bool, Callable[[int], int | float]]]] = (
[ # Protocol: JK02_32S; JK02_24S has offset -32
(KEY_CELL_COUNT, 70, 4, False, lambda x: x.bit_count()),
(ATTR_DELTA_VOLTAGE, 76, 2, False, lambda x: float(x / 1000)),
(ATTR_VOLTAGE, 150, 4, False, lambda x: float(x / 1000)),
(ATTR_CURRENT, 158, 4, True, lambda x: float(x / 1000)),
(ATTR_BATTERY_LEVEL, 173, 1, False, lambda x: x),
Expand All @@ -55,9 +53,11 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None:
"""Intialize private BMS members."""
super().__init__(LOGGER, self._notification_handler, ble_device, reconnect)
self._data: bytearray = bytearray()
self._data_final: bytearray | None = None
self._data_final: bytearray = bytearray()
self._char_write_handle: int | None = None
self._valid_replies: list[int] = [0x2] # BMS ready confirmation
self._bms_info: dict[str, str] = {}
self._prot_offset: int = 0
self._valid_reply: int = 0x02

@staticmethod
def matcher_dict_list() -> list[dict[str, Any]]:
Expand Down Expand Up @@ -131,7 +131,7 @@ def _notification_handler(self, _sender, data: bytearray) -> None:
return

# check that message type is expected
if self._data[BMS.TYPE_POS] not in self._valid_replies:
if self._data[BMS.TYPE_POS] != self._valid_reply:
LOGGER.debug(
"%s: unexpected message type 0x%X (length %i): %s",
self.name,
Expand Down Expand Up @@ -159,10 +159,9 @@ def _notification_handler(self, _sender, data: bytearray) -> None:
self._data[-1],
crc,
)
self._data_final = None # reset invalid data
else:
self._data_final = self._data
return

self._data_final = self._data
self._data_event.set()

async def _init_characteristics(self) -> None:
Expand Down Expand Up @@ -202,13 +201,20 @@ async def _init_characteristics(self) -> None:
char_notify_handle or 0, self._notification_handler
)

# query device info frame and wait for BMS ready (0xC8)
self._valid_replies.append(0xC8)
# query device info frame (0x03) and wait for BMS ready (0xC8)
self._valid_reply = 0x03
await self._client.write_gatt_char(
self._char_write_handle or 0, data=self._cmd(b"\x97")
)
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)
self._valid_replies.remove(0xC8)
self._bms_info = BMS._dec_devinfo(self._data_final or bytearray())
LOGGER.debug("%s: device information: %s", self.name, self._bms_info)
self._prot_offset = (
-32 if int(self._bms_info.get("sw_version", "")[:2]) < 11 else 0
)
self._valid_reply = 0xC8 # BMS ready confirmation
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)
self._valid_reply = 0x02 # cell information

@staticmethod
def _cmd(cmd: bytes, value: list[int] | None = None) -> bytes:
Expand All @@ -221,6 +227,10 @@ def _cmd(cmd: bytes, value: list[int] | None = None) -> bytes:
frame += bytes([crc_sum(frame)])
return frame

@staticmethod
def _dec_devinfo(data: bytearray) -> dict[str, str]:
return {"hw_version": data[22:27].decode(), "sw_version": data[30:35].decode()}

@staticmethod
def _cell_voltages(data: bytearray, cells: int) -> dict[str, float]:
"""Return cell voltages from status message."""
Expand All @@ -235,41 +245,62 @@ def _cell_voltages(data: bytearray, cells: int) -> dict[str, float]:
}

@staticmethod
def _temp_sensors(data: bytearray) -> dict[str, float]:
def _temp_sensors(data: bytearray, offs: int) -> dict[str, float]:
temp_pos: Final[list[tuple[int, int]]] = (
[(0, 130), (1, 132), (2, 134)]
if offs
else [(0, 144), (1, 162), (2, 164), (3, 256), (4, 258)]
)
return {
f"{KEY_TEMP_VALUE}{idx}": int.from_bytes(
data[pos : pos + 2], byteorder="little", signed=False
)
/ 10
for idx, pos in [(0, 144), (1, 162), (2, 164), (3, 256), (4, 258)]
for idx, pos in temp_pos
if int.from_bytes(data[pos : pos + 2], byteorder="little", signed=False)
}

@staticmethod
def _decode_data(data: bytearray) -> BMSsample:
def _decode_data(data: bytearray, offs: int) -> BMSsample:
"""Return BMS data from status message."""
return {
key: func(
int.from_bytes(data[idx : idx + size], byteorder="little", signed=sign)
)
for key, idx, size, sign, func in BMS._FIELDS
}
return (
{
KEY_CELL_COUNT: int.from_bytes(
data[70 + (offs >> 1) : 74 + (offs >> 1)],
byteorder="little",
).bit_count()
}
| {
ATTR_DELTA_VOLTAGE: int.from_bytes(
data[76 + (offs >> 1) : 78 + (offs >> 1)],
byteorder="little",
)
/ 1000
}
| {
key: func(
int.from_bytes(
data[idx + offs : idx + offs + size],
byteorder="little",
signed=sign,
)
)
for key, idx, size, sign, func in BMS._FIELDS
}
)

async def _async_update(self) -> BMSsample:
"""Update battery status information."""
if not self._data_event.is_set():
if not self._data_event.is_set() or self._data_final[4] != 0x02:
# request cell info (only if data is not constantly published)
LOGGER.debug("%s: request cell info", self.name)
await self._client.write_gatt_char(
self._char_write_handle or 0, data=BMS._cmd(b"\x96")
)
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)

if self._data_final is None:
return {}

data = self._decode_data(self._data_final)
data.update(BMS._temp_sensors(self._data_final))
data: BMSsample = self._decode_data(self._data_final, self._prot_offset)
data.update(BMS._temp_sensors(self._data_final, self._prot_offset))
data.update(BMS._cell_voltages(self._data_final, int(data[KEY_CELL_COUNT])))

return data
Loading
Loading