forked from leech001/hass-mqtt-discovery
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathha_mqtt_device.py
152 lines (120 loc) · 4.27 KB
/
ha_mqtt_device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import json
import logging
import os
import yaml
import paho.mqtt.client as mqtt
DISCOVERY_PREFIX = "homeassistant"
logger = logging.getLogger(__name__)
if "DEBUG" in os.environ:
logger.setLevel(logging.DEBUG)
class Device(dict):
def __init__(self, identifiers, name, sw_version, model, manufacturer):
super().__init__()
self.name = name
self["identifiers"] = identifiers
self["name"] = name
self["sw_version"] = sw_version
self["model"] = model
self["manufacturer"] = manufacturer
@staticmethod
def from_config(config_yaml_path):
with open(config_yaml_path) as file:
device_config = yaml.safe_load(file)
device = Device(**device_config)
return device
class Component:
def __init__(self, name):
self.component = name
self.value_read_function = None
def set_value_read_function(self, function):
self.value_read_function = function
class Sensor(Component):
def __init__(
self,
client: mqtt.Client,
name,
parent_device,
unit_of_measurement,
icon=None,
topic_parent_level="",
):
super().__init__("sensor")
self.client = client
self.name = name
self.parent_device = parent_device
self.object_id = self.name.replace(" ", "_").lower()
self.unit_of_measurement = unit_of_measurement
self.icon = icon
self.topic_parent_level = topic_parent_level
self.topic = f"{self.parent_device.name}/{self.component}/{self.topic_parent_level}/{self.object_id}"
self._send_config()
def _send_config(self):
_config = {
"~": self.topic,
"name": self.name,
"state_topic": "~/state",
"unit_of_measurement": self.unit_of_measurement,
"device": self.parent_device,
}
if self.icon:
_config["icon"] = self.icon
self.client.publish(
f"{DISCOVERY_PREFIX}/{self.component}/{self.parent_device.name}/{self.object_id}/config",
json.dumps(_config),
retain=True,
).wait_for_publish()
def send(self, value=None, blocking=False):
if value is None:
if not self.value_read_function:
raise ValueError("Set either value or value_read_function")
publish_value = self.value_read_function()
else:
publish_value = value
logger.debug(f"{self.topic}: {publish_value}")
message_info = self.client.publish(f"{self.topic}/state", publish_value)
if blocking:
message_info.wait_for_publish()
class Tracker:
def __init__(self, client: mqtt.Client, name):
self.client = client
self.name = name
self.unique_id = self.name.replace(" ", "_").lower()
self.topic = f"{DISCOVERY_PREFIX}/device_tracker/{self.unique_id}"
self._send_config()
def _send_config(self):
_config = {
"~": self.topic,
"name": self.name,
"unique_id": self.unique_id,
"stat_t": "~/state",
"json_attr_t": "~/attributes",
"payload_home": "home",
"payload_not_home": "not_home",
}
self.client.publish(f"{self.topic}/config", json.dumps(_config))
def send(self, latitude, longitude, gps_accuracy):
_payload = {
"latitude": latitude,
"longitude": longitude,
"gps_accuracy": gps_accuracy,
}
self.client.publish(f"{self.topic}/attributes", json.dumps(_payload))
class Binary:
def __init__(self, client: mqtt.Client, name, icon):
self.client = client
self.name = name
self.unique_id = self.name.replace(" ", "_").lower()
self.topic = f"{DISCOVERY_PREFIX}/binary_sensor/{self.unique_id}"
self.icon = icon
self._send_config()
def _send_config(self):
_config = {
"~": self.topic,
"name": self.name,
"unique_id": self.unique_id,
"stat_t": "~/state",
"icon": self.icon,
}
self.client.publish(f"{self.topic}/config", json.dumps(_config))
def send(self, value):
self.client.publish(f"{self.topic}/state", str(value))