Skip to content

Commit

Permalink
Use TimeAndModel class instead of tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvervloesem committed Feb 13, 2024
1 parent a7fce34 commit 7182bb8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
47 changes: 33 additions & 14 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import struct
import sys
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime
from random import randrange
from threading import Thread
Expand Down Expand Up @@ -85,6 +86,18 @@
DataJSONType = Dict[str, Union[str, int, float, bool]]


@dataclass
class TimeAndModel:
"""Keep track of a timestamp and model for a tracker."""

time: int
model: str

def __str__(self) -> str:
"""Show time and model."""
return f"({self.time}, '{self.model}')"


def get_address(data: DataJSONType) -> str:
"""Return the device address from a data JSON."""
try:
Expand Down Expand Up @@ -117,7 +130,7 @@ def __init__(
self.stopped = False
self.clock_updates: dict[str, float] = {}
self.published_messages = 0
self.discovered_trackers: dict[str, dict[int, str]] = {}
self.discovered_trackers: dict[str, TimeAndModel] = {}

def connect_mqtt(self) -> None:
"""Connect to MQTT broker."""
Expand Down Expand Up @@ -350,19 +363,17 @@ def check_tracker_timeout(self) -> None:
"""Check if tracker timeout is over timeout limit."""
# If the timestamp is later than current time minus tracker_timeout
# Publish offline message
for address, timemodeldict in self.discovered_trackers.copy().items():
for address, time_model in self.discovered_trackers.copy().items():
if (
round(time()) - timemodeldict[0]
>= self.configuration["tracker_timeout"]
and timemodeldict[0] != 0
round(time()) - time_model.time >= self.configuration["tracker_timeout"]
and time_model.time != 0
and (
self.configuration["discovery"]
or self.configuration["general_presence"]
)
):
if (
timemodeldict[1] == "APPLEWATCH"
or timemodeldict[1] == "APPLEDEVICE"
time_model.model in ("APPLEWATCH", "APPLEDEVICE")
) and not self.configuration["discovery"]:
message = json.dumps(
{"id": address, "presence": "absent", "unlocked": False}
Expand All @@ -376,12 +387,20 @@ def check_tracker_timeout(self) -> None:
+ "/"
+ address.replace(":", ""),
)
tup1 = list(timemodeldict)
tup1[0] = 0
logger.info("Dictionary: %s", tup1)
self.discovered_trackers[address] = tuple(tup1)
time_model.time = 0
logger.info("Dictionary: %s", time_model)
self.discovered_trackers[address] = time_model
logger.info(
"Dictionary - Discovered Trackers: %s", self.discovered_trackers
"Dictionary - Discovered Trackers: %s",
", ".join(
[
f"{address}: {time_model}"
for (
address,
time_model,
) in self.discovered_trackers.items()
]
),
)

async def ble_scan_loop(self) -> None:
Expand Down Expand Up @@ -581,7 +600,7 @@ def publish_json(
data_json["id"] not in self.discovered_trackers
or (
data_json["id"] in self.discovered_trackers
and self.discovered_trackers[str(data_json["id"])][0] == 0
and self.discovered_trackers[str(data_json["id"])].time == 0
)
) and self.configuration["general_presence"]:
message = json.dumps({"id": data_json["id"], "presence": "present"})
Expand All @@ -591,7 +610,7 @@ def publish_json(
+ "/"
+ get_address(data_json).replace(":", ""),
)
self.discovered_trackers[str(data_json["id"])] = (
self.discovered_trackers[str(data_json["id"])] = TimeAndModel(
round(time()),
str(data_json["model_id"]),
)
Expand Down
6 changes: 4 additions & 2 deletions TheengsGateway/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from TheengsDecoder import getProperties

from .ble_gateway import DataJSONType, Gateway, logger
from .ble_gateway import DataJSONType, Gateway, TimeAndModel, logger

ha_dev_classes = [
"battery",
Expand Down Expand Up @@ -264,7 +264,9 @@ def copy_pub_device(self, device: dict) -> dict:
"""Copy pub_device and remove "track" if publish_advdata is false."""
# Update tracker last received time
if "track" in device:
self.discovered_trackers[device["id"]] = (round(time()), device["model_id"])
self.discovered_trackers[device["id"]] = TimeAndModel(
round(time()), device["model_id"]
)
logger.info("Discovered Trackers: %s", self.discovered_trackers)

pub_device_copy = device.copy()
Expand Down

0 comments on commit 7182bb8

Please sign in to comment.