Skip to content

Commit

Permalink
feat: add check for increasing encryption counter (#92)
Browse files Browse the repository at this point in the history
* feat: add check for increasing encryption counter

* feat: add check for increasing encryption counter

* feat: add check for increasing encryption counter

* fix: verification of counter

* fix: verification of counter

* fix: do not update encryption counter if not increasing

* fix: add exceptions for equal counter

* fix: allow reset of the counter
  • Loading branch information
Ernst79 authored Dec 13, 2023
1 parent 4cb6858 commit a58cf7b
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 6 deletions.
60 changes: 54 additions & 6 deletions src/bthome_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ def __init__(self, bindkey: bytes | None = None) -> None:
# Encryption to expect, based on flags in the UUID.
self.encryption_scheme = EncryptionScheme.NONE

# The encryption counter can be used to verify that the counter of encrypted
# advertisements is increasing, to have some replay protection. We always
# start at zero allow the first message after a restart.
self.encryption_counter = 0.0

# If True, then we know the actual MAC of the device.
# On macOS, we don't unless the device includes it in the advertisement
# (CoreBluetooth uses UUID's generated by CoreBluetooth instead of the MAC)
Expand Down Expand Up @@ -253,7 +258,9 @@ def _parse_bthome_v1(
source_mac = bytes.fromhex(mac_readable.replace(":", ""))

try:
payload = self._decrypt_bthome(data, source_mac, sw_version)
payload = self._decrypt_bthome(
service_info, data, source_mac, sw_version
)
except (ValueError, TypeError):
return True

Expand Down Expand Up @@ -355,7 +362,7 @@ def _parse_bthome_v2(
# Decode encrypted payload
try:
payload = self._decrypt_bthome(
payload, bthome_mac, sw_version, adv_info
service_info, payload, bthome_mac, sw_version, adv_info
)
except (ValueError, TypeError):
return True
Expand Down Expand Up @@ -531,7 +538,12 @@ def _parse_payload(self, payload: bytes, sw_version: int) -> bool:
return True

def _decrypt_bthome(
self, data: bytes, bthome_mac: bytes, sw_version: int, adv_info: int = 65
self,
service_info: BluetoothServiceInfo,
data: bytes,
bthome_mac: bytes,
sw_version: int,
adv_info: int = 65,
) -> bytes:
"""Decrypt encrypted BTHome BLE advertisements"""
if not self.bindkey:
Expand All @@ -555,17 +567,53 @@ def _decrypt_bthome(
else:
uuid = b"\xd2\xfc" + bytes([adv_info])
encrypted_payload = data[:-8]
count_id = data[-8:-4]
last_encryption_counter = self.encryption_counter
counter = data[-8:-4]
new_encryption_counter = parse_uint(counter)
mic = data[-4:]

# nonce: mac [6], uuid16 [2 (v1) or 3 (v2)], count_id [4]
nonce = b"".join([bthome_mac, uuid, count_id])
# nonce: mac [6], uuid16 [2 (v1) or 3 (v2)], counter [4]
nonce = b"".join([bthome_mac, uuid, counter])

associated_data = None
if sw_version == 1:
associated_data = b"\x11"

assert self.cipher is not None # nosec

# verify that the encryption counter is the same or increasing, compared the previous value
if (
self.last_service_info
and new_encryption_counter == last_encryption_counter
and service_info.service_data == self.last_service_info.service_data
):
# the counter and service data are exactly the same as the previous, skipping the adv.
_LOGGER.debug(
"The new encryption counter (%i) and service data are the same as the previous "
"encryption counter (%i) and service data. Skipping this message.",
new_encryption_counter,
last_encryption_counter,
)
raise ValueError
elif new_encryption_counter <= last_encryption_counter:
# the counter is lower than the previous counter or equal, but with different service
# data.
if new_encryption_counter < 100 and last_encryption_counter >= 4294967195:
# the counter has (most likely) restarted from 0 after reaching the highest number.
self.encryption_counter = new_encryption_counter
else:
# in all other cases, we assume the data has been comprimised and skip the
# advertisement
_LOGGER.warning(
"The new encryption counter (%i) is lower than or equal to the previous value "
"(%i). The data might be compromised. BLE advertisement will be skipped.",
new_encryption_counter,
last_encryption_counter,
)
raise ValueError
else:
self.encryption_counter = new_encryption_counter

# decrypt the data
try:
decrypted_payload = self.cipher.decrypt(
Expand Down
179 changes: 179 additions & 0 deletions tests/test_parser_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,123 @@ def test_bindkey_verified_can_be_unset():
assert not device.bindkey_verified


def test_increasing_encryption_counter(caplog):
"""Test BTHome parser with increasing encryption counter."""
bindkey = "231d39c1d7cc1ab1aee224cd096db932"
data_string = b"\x41\xe4\x45\xf3\xc9\x96\x2b\x33\x22\x11\x00\x6c\x7c\x45\x19"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)

device = BTHomeBluetoothDeviceData(bindkey=bytes.fromhex(bindkey))
assert device.supported(advertisement)
assert device.bindkey_verified
assert device.encryption_counter == 1122867

data_string = b"\x41\x3e\x93\x2c\xc7\x17\x5f\x34\x22\x11\x00\x55\x38\x76\xaf"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)
assert device.supported(advertisement)
assert device.bindkey_verified
assert device.encryption_counter == 1122868


def test_same_encryption_counter_same_data(caplog):
"""Test BTHome parser with the same encryption counter and service data."""
bindkey = "231d39c1d7cc1ab1aee224cd096db932"
data_string = b"\x41\xe4\x45\xf3\xc9\x96\x2b\x33\x22\x11\x00\x6c\x7c\x45\x19"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)

device = BTHomeBluetoothDeviceData(bindkey=bytes.fromhex(bindkey))
assert device.supported(advertisement)
assert device.bindkey_verified
assert device.encryption_counter == 1122867

data_string = b"\x41\xe4\x45\xf3\xc9\x96\x2b\x33\x22\x11\x00\x6c\x7c\x45\x19"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)
assert device.supported(advertisement)
assert device.bindkey_verified
# encryption counter should not be updated as it is lower
assert device.encryption_counter == 1122867
assert (
"The new encryption counter (1122867) and service data are the same as the previous "
"encryption counter (1122867) and service data. Skipping this message."
in caplog.text
)


def test_decreasing_encryption_counter(caplog):
"""Test BTHome parser with decreasing encryption counter."""
bindkey = "231d39c1d7cc1ab1aee224cd096db932"
data_string = b"\x41\xe4\x45\xf3\xc9\x96\x2b\x33\x22\x11\x00\x6c\x7c\x45\x19"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)

device = BTHomeBluetoothDeviceData(bindkey=bytes.fromhex(bindkey))
assert device.supported(advertisement)
assert device.bindkey_verified
assert device.encryption_counter == 1122867

data_string = b"\x41\x72\x3d\x30\x35\xfb\x88\x32\x22\x11\x00\x9e\x74\x14\xc0"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)
assert device.supported(advertisement)
assert device.bindkey_verified
# encryption counter should not be updated as it is lower
assert device.encryption_counter == 1122867
assert (
"The new encryption counter (1122866) is lower than or equal to the previous value "
"(1122867). The data might be compromised. BLE advertisement will be skipped."
in caplog.text
)


def test_reset_encryption_counter(caplog):
"""Test BTHome parser during reset of the encryption counter."""
bindkey = "231d39c1d7cc1ab1aee224cd096db932"
# data_string = b"\x41\xba\x0c\xb0\x7a\xee\xf6\xff\xff\xff\xff\x9c\x6d\xbe\xcc"
data_string = b"\x41\xba\x0d\xb0\x7a\xee\xf6\xff\xff\xff\xff\x21\xfd\x46\x00"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)

device = BTHomeBluetoothDeviceData(bindkey=bytes.fromhex(bindkey))
assert device.supported(advertisement)
assert device.bindkey_verified
assert device.encryption_counter == 4294967295

data_string = b"\x41\xde\x40\xb5\x0e\x67\x66\x00\x00\x00\x00\xd7\xcb\x95\xde"
advertisement = bytes_to_service_info(
data_string,
local_name="TEST DEVICE",
address="54:48:E6:8F:80:A5",
)
assert device.supported(advertisement)
assert device.bindkey_verified
assert device.encryption_counter == 0


def test_bthome_wrong_object_id(caplog):
"""Test BTHome parser for a non-existing Object ID xFE."""
data_string = b"\x40\xFE\xca\x09"
Expand Down Expand Up @@ -3053,3 +3170,65 @@ def test_bthome_shelly_button_no_press(caplog):
},
events={},
)


def test_bthome_test(caplog):
"""Test BTHome parser for acceleration in m/s°."""
data_string = b"@\x00\xa7\x0cx\x0b\x10\x00"
advertisement = bytes_to_service_info(
data_string, local_name="TEST DEVICE", address="A4:C1:38:8D:18:B2"
)

device = BTHomeBluetoothDeviceData()

assert device.update(advertisement) == SensorUpdate(
title="TEST DEVICE 18B2",
devices={
None: SensorDeviceInfo(
name="TEST DEVICE 18B2",
manufacturer=None,
model="BTHome sensor",
sw_version="BTHome BLE v2",
hw_version=None,
)
},
entity_descriptions={
KEY_PACKET_ID: SensorDescription(
device_key=KEY_PACKET_ID,
device_class=SensorDeviceClass.PACKET_ID,
native_unit_of_measurement=None,
),
KEY_VOLTAGE: SensorDescription(
device_key=KEY_VOLTAGE,
device_class=SensorDeviceClass.VOLTAGE,
native_unit_of_measurement=Units.ELECTRIC_POTENTIAL_VOLT,
),
KEY_SIGNAL_STRENGTH: SensorDescription(
device_key=KEY_SIGNAL_STRENGTH,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
},
entity_values={
KEY_PACKET_ID: SensorValue(
device_key=KEY_PACKET_ID, name="Packet Id", native_value=167
),
KEY_VOLTAGE: SensorValue(
device_key=KEY_VOLTAGE, name="Voltage", native_value=2.936
),
KEY_SIGNAL_STRENGTH: SensorValue(
device_key=KEY_SIGNAL_STRENGTH, name="Signal Strength", native_value=-60
),
},
binary_entity_descriptions={
KEY_BINARY_POWER: BinarySensorDescription(
device_key=KEY_BINARY_POWER,
device_class=BinarySensorDeviceClass.POWER,
),
},
binary_entity_values={
KEY_BINARY_POWER: BinarySensorValue(
device_key=KEY_BINARY_POWER, name="Power", native_value=False
),
},
)

0 comments on commit a58cf7b

Please sign in to comment.