Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process undecoded messages from MQTT the same way as from BLE #192

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,13 @@ def get_address(data: DataJSONType) -> str:

def add_manufacturer(
data: DataJSONType,
company_id: int | None,
) -> None:
"""Add the name of the manufacturer based on the company ID."""
if company_id is not None:
if "manufacturerdata" in data:
company_id = int.from_bytes(
bytes.fromhex(data["manufacturerdata"][:4]), # type: ignore[index]
byteorder="little",
)
with suppress(No16BitIntegerError, UnknownCICError):
data["mfr"] = company[company_id]

Expand Down Expand Up @@ -207,20 +210,9 @@ def on_message(client, userdata, msg) -> None: # noqa: ANN001,ARG001
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(exception)
return
decoded_json = decodeBLE(json.dumps(msg_json))

if decoded_json:
decoded_json = json.loads(decoded_json)
if self.configuration["presence"]:
self.hass_presence(decoded_json)
if self.configuration["discovery"]:
self.publish_device_info(
decoded_json,
) # Publish sensor data to Home Assistant MQTT discovery
else:
self.publish_json(decoded_json, decoded=True)
elif self.configuration["publish_all"]:
self.publish_json(msg_json, decoded=False)
msg_json["id"] = self.rpa2id(msg_json["id"])
self.decode_advertisement(msg_json)

self.client.subscribe(sub_topic)
self.client.on_message = on_message
Expand Down Expand Up @@ -402,22 +394,12 @@ def detection_callback(
logger.debug("%s:%s", device.address, advertisement_data)

# Try to resolve private addresses with known IRKs
address = device.address
for identity, irk in self.configuration["identities"].items():
if resolve_private_address(address, irk):
address = identity
logger.debug(
"Using identity address %s instead of random private address %s",
address,
device.address,
)
break
address = self.rpa2id(device.address)

# Try to add the device to dictionary of clocks to synchronize time.
self.add_clock(address)

data_json: DataJSONType = {}
company_id = None

if advertisement_data.manufacturer_data:
# Only look at the first manufacturer data in the advertisement
Expand All @@ -443,16 +425,15 @@ def detection_callback(
data_json_copy = data_json.copy()
data_json_copy["servicedatauuid"] = uuid[4:8]
data_json_copy["servicedata"] = data.hex()
self.decode_advertisement(data_json_copy, company_id)
elif data_json:
data_json["id"] = address
data_json["rssi"] = advertisement_data.rssi
self.decode_advertisement(data_json, company_id)

self.decode_advertisement(data_json)

def decode_advertisement(
self,
data_json: DataJSONType,
company_id: int | None,
) -> None:
"""Decode device from data JSON."""
decoded_json = decodeBLE(json.dumps(data_json))
Expand All @@ -471,7 +452,7 @@ def decode_advertisement(
decoded_json.get("cidc", True)
and decoded_json.get("type", "UNIQ") != "BCON"
):
add_manufacturer(data_json, company_id)
add_manufacturer(data_json)

if self.configuration["presence"]:
self.hass_presence(decoded_json)
Expand All @@ -495,9 +476,26 @@ def decode_advertisement(
else:
self.publish_json(decoded_json, decoded=True)
elif self.configuration["publish_all"]:
add_manufacturer(data_json, company_id)
add_manufacturer(data_json)
self.publish_json(data_json, decoded=False)

def rpa2id(self, address: str) -> str:
"""Replace a random private address by its corresponding identity address.

If there's no identity known for the address, this function just returns
the original address.
"""
for identity, irk in self.configuration["identities"].items():
if resolve_private_address(address, irk):
logger.debug(
"Using identity address %s instead of random private address %s",
identity,
address,
)
return identity

return address

def process_prmacs(
self,
decoded_json: DataJSONType,
Expand Down