Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

model_id included in tracker dict #237

Merged
merged 9 commits into from
Feb 13, 2024
42 changes: 34 additions & 8 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import struct
import sys
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime
from random import randrange
from threading import Thread
Expand Down Expand Up @@ -85,6 +86,18 @@
DataJSONType = Dict[str, Union[str, int, float, bool]]


@dataclass
class TnM:
"""Keep track of a timestamp and model_id for a tracker."""

time: int
model_id: str

def __str__(self) -> str:
"""Show time and model_id."""
return f"({self.time}, {self.model_id})"


def get_address(data: DataJSONType) -> str:
"""Return the device address from a data JSON."""
try:
Expand Down Expand Up @@ -117,7 +130,7 @@ def __init__(
self.stopped = False
self.clock_updates: dict[str, float] = {}
self.published_messages = 0
self.discovered_trackers: dict[str, int] = {}
self.discovered_trackers: dict[str, TnM] = {}

def connect_mqtt(self) -> None:
"""Connect to MQTT broker."""
Expand Down Expand Up @@ -350,24 +363,34 @@ def check_tracker_timeout(self) -> None:
"""Check if tracker timeout is over timeout limit."""
# If the timestamp is later than current time minus tracker_timeout
# Publish offline message
for address, timestamp in self.discovered_trackers.copy().items():
for address, time_model in self.discovered_trackers.copy().items():
if (
round(time()) - timestamp >= self.configuration["tracker_timeout"]
and timestamp != 0
round(time()) - time_model.time >= self.configuration["tracker_timeout"]
and time_model.time != 0
and (
self.configuration["discovery"]
or self.configuration["general_presence"]
)
):
message = json.dumps({"id": address, "presence": "absent"})
if (
time_model.model_id in ("APPLEWATCH", "APPLEDEVICE")
and not self.configuration["discovery"]
):
message = json.dumps(
{"id": address, "presence": "absent", "unlocked": False}
)
else:
message = json.dumps({"id": address, "presence": "absent"})

self.publish(
message,
self.configuration["publish_topic"]
+ "/"
+ address.replace(":", ""),
)
self.discovered_trackers[address] = 0
time_model.time = 0
self.discovered_trackers[address] = time_model
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

async def ble_scan_loop(self) -> None:
"""Scan for BLE devices."""
Expand Down Expand Up @@ -566,7 +589,7 @@ def publish_json(
data_json["id"] not in self.discovered_trackers
or (
data_json["id"] in self.discovered_trackers
and self.discovered_trackers[str(data_json["id"])] == 0
and self.discovered_trackers[str(data_json["id"])].time == 0
)
) and self.configuration["general_presence"]:
message = json.dumps({"id": data_json["id"], "presence": "present"})
Expand All @@ -576,7 +599,10 @@ def publish_json(
+ "/"
+ get_address(data_json).replace(":", ""),
)
self.discovered_trackers[str(data_json["id"])] = round(time())
self.discovered_trackers[str(data_json["id"])] = TnM(
round(time()),
str(data_json["model_id"]),
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

# Remove "track" if PUBLISH_ADVDATA is 0
Expand Down
6 changes: 4 additions & 2 deletions TheengsGateway/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from TheengsDecoder import getProperties

from .ble_gateway import DataJSONType, Gateway, logger
from .ble_gateway import DataJSONType, Gateway, TnM, logger

ha_dev_classes = [
"battery",
Expand Down Expand Up @@ -264,7 +264,9 @@ def copy_pub_device(self, device: dict) -> dict:
"""Copy pub_device and remove "track" if publish_advdata is false."""
# Update tracker last received time
if "track" in device:
self.discovered_trackers[device["id"]] = round(time())
self.discovered_trackers[device["id"]] = TnM(
round(time()), device["model_id"]
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

pub_device_copy = device.copy()
Expand Down
Loading