-
-
Notifications
You must be signed in to change notification settings - Fork 32k
/
Copy pathutils.py
488 lines (429 loc) · 17 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
"""Utilities used by insteon component."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
import logging
from typing import TYPE_CHECKING, Any
from pyinsteon import devices
from pyinsteon.address import Address
from pyinsteon.constants import ALDBStatus, DeviceAction
from pyinsteon.device_types.device_base import Device
from pyinsteon.events import OFF_EVENT, OFF_FAST_EVENT, ON_EVENT, ON_FAST_EVENT, Event
from pyinsteon.managers.link_manager import (
async_enter_linking_mode,
async_enter_unlinking_mode,
)
from pyinsteon.managers.scene_manager import (
async_trigger_scene_off,
async_trigger_scene_on,
)
from pyinsteon.managers.x10_manager import (
async_x10_all_lights_off,
async_x10_all_lights_on,
async_x10_all_units_off,
)
from pyinsteon.x10_address import create as create_x10_address
from serial.tools import list_ports
from homeassistant.components import usb
from homeassistant.const import (
CONF_ADDRESS,
CONF_ENTITY_ID,
CONF_PLATFORM,
ENTITY_MATCH_ALL,
Platform,
)
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
dispatcher_send,
)
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import (
CONF_CAT,
CONF_DIM_STEPS,
CONF_HOUSECODE,
CONF_SUBCAT,
CONF_UNITCODE,
DOMAIN,
EVENT_CONF_BUTTON,
EVENT_GROUP_OFF,
EVENT_GROUP_OFF_FAST,
EVENT_GROUP_ON,
EVENT_GROUP_ON_FAST,
SIGNAL_ADD_DEFAULT_LINKS,
SIGNAL_ADD_DEVICE_OVERRIDE,
SIGNAL_ADD_ENTITIES,
SIGNAL_ADD_X10_DEVICE,
SIGNAL_LOAD_ALDB,
SIGNAL_PRINT_ALDB,
SIGNAL_REMOVE_DEVICE_OVERRIDE,
SIGNAL_REMOVE_ENTITY,
SIGNAL_REMOVE_HA_DEVICE,
SIGNAL_REMOVE_INSTEON_DEVICE,
SIGNAL_REMOVE_X10_DEVICE,
SIGNAL_SAVE_DEVICES,
SRV_ADD_ALL_LINK,
SRV_ADD_DEFAULT_LINKS,
SRV_ALL_LINK_GROUP,
SRV_ALL_LINK_MODE,
SRV_CONTROLLER,
SRV_DEL_ALL_LINK,
SRV_HOUSECODE,
SRV_LOAD_ALDB,
SRV_LOAD_DB_RELOAD,
SRV_PRINT_ALDB,
SRV_PRINT_IM_ALDB,
SRV_SCENE_OFF,
SRV_SCENE_ON,
SRV_X10_ALL_LIGHTS_OFF,
SRV_X10_ALL_LIGHTS_ON,
SRV_X10_ALL_UNITS_OFF,
)
from .ipdb import get_device_platform_groups, get_device_platforms
from .schemas import (
ADD_ALL_LINK_SCHEMA,
ADD_DEFAULT_LINKS_SCHEMA,
DEL_ALL_LINK_SCHEMA,
LOAD_ALDB_SCHEMA,
PRINT_ALDB_SCHEMA,
TRIGGER_SCENE_SCHEMA,
X10_HOUSECODE_SCHEMA,
)
if TYPE_CHECKING:
from .entity import InsteonEntity
_LOGGER = logging.getLogger(__name__)
def _register_event(event: Event, listener: Callable) -> None:
"""Register the events raised by a device."""
_LOGGER.debug(
"Registering on/off event for %s %d %s",
str(event.address),
event.group,
event.name,
)
event.subscribe(listener, force_strong_ref=True)
def add_insteon_events(hass: HomeAssistant, device: Device) -> None:
"""Register Insteon device events."""
@callback
def async_fire_insteon_event(
name: str, address: Address, group: int, button: str | None = None
):
# Firing an event when a button is pressed.
if button and button[-2] == "_":
button_id = button[-1].lower()
else:
button_id = None
schema = {CONF_ADDRESS: address, "group": group}
if button_id:
schema[EVENT_CONF_BUTTON] = button_id
if name == ON_EVENT:
event = EVENT_GROUP_ON
elif name == OFF_EVENT:
event = EVENT_GROUP_OFF
elif name == ON_FAST_EVENT:
event = EVENT_GROUP_ON_FAST
elif name == OFF_FAST_EVENT:
event = EVENT_GROUP_OFF_FAST
else:
event = f"insteon.{name}"
_LOGGER.debug("Firing event %s with %s", event, schema)
hass.bus.async_fire(event, schema)
if str(device.address).startswith("X10"):
return
for name_or_group, event in device.events.items():
if isinstance(name_or_group, int):
for event in device.events[name_or_group].values():
_register_event(event, async_fire_insteon_event)
else:
_register_event(event, async_fire_insteon_event)
def register_new_device_callback(hass):
"""Register callback for new Insteon device."""
@callback
def async_new_insteon_device(address, action: DeviceAction):
"""Detect device from transport to be delegated to platform."""
if action == DeviceAction.ADDED:
hass.async_create_task(async_create_new_entities(address))
async def async_create_new_entities(address):
_LOGGER.debug(
"Adding new INSTEON device to Home Assistant with address %s", address
)
await devices.async_save(workdir=hass.config.config_dir)
device = devices[address]
await device.async_status()
platforms = get_device_platforms(device)
for platform in platforms:
groups = get_device_platform_groups(device, platform)
signal = f"{SIGNAL_ADD_ENTITIES}_{platform}"
dispatcher_send(hass, signal, {"address": device.address, "groups": groups})
add_insteon_events(hass, device)
devices.subscribe(async_new_insteon_device, force_strong_ref=True)
@callback
def async_register_services(hass): # noqa: C901
"""Register services used by insteon component."""
save_lock = asyncio.Lock()
async def async_srv_add_all_link(service: ServiceCall) -> None:
"""Add an INSTEON All-Link between two devices."""
group = service.data[SRV_ALL_LINK_GROUP]
mode = service.data[SRV_ALL_LINK_MODE]
link_mode = mode.lower() == SRV_CONTROLLER
await async_enter_linking_mode(link_mode, group)
async def async_srv_del_all_link(service: ServiceCall) -> None:
"""Delete an INSTEON All-Link between two devices."""
group = service.data.get(SRV_ALL_LINK_GROUP)
await async_enter_unlinking_mode(group)
async def async_srv_load_aldb(service: ServiceCall) -> None:
"""Load the device All-Link database."""
entity_id = service.data[CONF_ENTITY_ID]
reload = service.data[SRV_LOAD_DB_RELOAD]
if entity_id.lower() == ENTITY_MATCH_ALL:
await async_srv_load_aldb_all(reload)
else:
signal = f"{entity_id}_{SIGNAL_LOAD_ALDB}"
async_dispatcher_send(hass, signal, reload)
async def async_srv_load_aldb_all(reload):
"""Load the All-Link database for all devices."""
# Cannot be done concurrently due to issues with the underlying protocol.
for address in devices:
device = devices[address]
if device != devices.modem and device.cat != 0x03:
await device.aldb.async_load(refresh=reload)
await async_srv_save_devices()
async def async_srv_save_devices():
"""Write the Insteon device configuration to file."""
async with save_lock:
_LOGGER.debug("Saving Insteon devices")
await devices.async_save(hass.config.config_dir)
def print_aldb(service: ServiceCall) -> None:
"""Print the All-Link Database for a device."""
# For now this sends logs to the log file.
# Future direction is to create an INSTEON control panel.
entity_id = service.data[CONF_ENTITY_ID]
signal = f"{entity_id}_{SIGNAL_PRINT_ALDB}"
dispatcher_send(hass, signal)
def print_im_aldb(service: ServiceCall) -> None:
"""Print the All-Link Database for a device."""
# For now this sends logs to the log file.
# Future direction is to create an INSTEON control panel.
print_aldb_to_log(devices.modem.aldb)
async def async_srv_x10_all_units_off(service: ServiceCall) -> None:
"""Send the X10 All Units Off command."""
housecode = service.data.get(SRV_HOUSECODE)
await async_x10_all_units_off(housecode)
async def async_srv_x10_all_lights_off(service: ServiceCall) -> None:
"""Send the X10 All Lights Off command."""
housecode = service.data.get(SRV_HOUSECODE)
await async_x10_all_lights_off(housecode)
async def async_srv_x10_all_lights_on(service: ServiceCall) -> None:
"""Send the X10 All Lights On command."""
housecode = service.data.get(SRV_HOUSECODE)
await async_x10_all_lights_on(housecode)
async def async_srv_scene_on(service: ServiceCall) -> None:
"""Trigger an INSTEON scene ON."""
group = service.data.get(SRV_ALL_LINK_GROUP)
await async_trigger_scene_on(group)
async def async_srv_scene_off(service: ServiceCall) -> None:
"""Trigger an INSTEON scene ON."""
group = service.data.get(SRV_ALL_LINK_GROUP)
await async_trigger_scene_off(group)
@callback
def async_add_default_links(service: ServiceCall) -> None:
"""Add the default All-Link entries to a device."""
entity_id = service.data[CONF_ENTITY_ID]
signal = f"{entity_id}_{SIGNAL_ADD_DEFAULT_LINKS}"
async_dispatcher_send(hass, signal)
async def async_add_device_override(override):
"""Remove an Insten device and associated entities."""
address = Address(override[CONF_ADDRESS])
await async_remove_ha_device(address)
devices.set_id(address, override[CONF_CAT], override[CONF_SUBCAT], 0)
await async_srv_save_devices()
async def async_remove_device_override(address):
"""Remove an Insten device and associated entities."""
address = Address(address)
await async_remove_ha_device(address)
devices.set_id(address, None, None, None)
await devices.async_identify_device(address)
await async_srv_save_devices()
@callback
def async_add_x10_device(x10_config):
"""Add X10 device."""
housecode = x10_config[CONF_HOUSECODE]
unitcode = x10_config[CONF_UNITCODE]
platform = x10_config[CONF_PLATFORM]
steps = x10_config.get(CONF_DIM_STEPS, 22)
x10_type = "on_off"
if platform == "light":
x10_type = "dimmable"
elif platform == "binary_sensor":
x10_type = "sensor"
_LOGGER.debug(
"Adding X10 device to Insteon: %s %d %s", housecode, unitcode, x10_type
)
# This must be run in the event loop
devices.add_x10_device(housecode, unitcode, x10_type, steps)
async def async_remove_x10_device(housecode, unitcode):
"""Remove an X10 device and associated entities."""
address = create_x10_address(housecode, unitcode)
devices.pop(address)
await async_remove_ha_device(address)
async def async_remove_ha_device(address: Address, remove_all_refs: bool = False):
"""Remove the device and all entities from hass."""
signal = f"{address.id}_{SIGNAL_REMOVE_ENTITY}"
async_dispatcher_send(hass, signal)
dev_registry = dr.async_get(hass)
device = dev_registry.async_get_device(identifiers={(DOMAIN, str(address))})
if device:
dev_registry.async_remove_device(device.id)
async def async_remove_insteon_device(
address: Address, remove_all_refs: bool = False
):
"""Remove the underlying Insteon device from the network."""
await devices.async_remove_device(
address=address, force=False, remove_all_refs=remove_all_refs
)
await async_srv_save_devices()
hass.services.async_register(
DOMAIN, SRV_ADD_ALL_LINK, async_srv_add_all_link, schema=ADD_ALL_LINK_SCHEMA
)
hass.services.async_register(
DOMAIN, SRV_DEL_ALL_LINK, async_srv_del_all_link, schema=DEL_ALL_LINK_SCHEMA
)
hass.services.async_register(
DOMAIN, SRV_LOAD_ALDB, async_srv_load_aldb, schema=LOAD_ALDB_SCHEMA
)
hass.services.async_register(
DOMAIN, SRV_PRINT_ALDB, print_aldb, schema=PRINT_ALDB_SCHEMA
)
hass.services.async_register(DOMAIN, SRV_PRINT_IM_ALDB, print_im_aldb, schema=None)
hass.services.async_register(
DOMAIN,
SRV_X10_ALL_UNITS_OFF,
async_srv_x10_all_units_off,
schema=X10_HOUSECODE_SCHEMA,
)
hass.services.async_register(
DOMAIN,
SRV_X10_ALL_LIGHTS_OFF,
async_srv_x10_all_lights_off,
schema=X10_HOUSECODE_SCHEMA,
)
hass.services.async_register(
DOMAIN,
SRV_X10_ALL_LIGHTS_ON,
async_srv_x10_all_lights_on,
schema=X10_HOUSECODE_SCHEMA,
)
hass.services.async_register(
DOMAIN, SRV_SCENE_ON, async_srv_scene_on, schema=TRIGGER_SCENE_SCHEMA
)
hass.services.async_register(
DOMAIN, SRV_SCENE_OFF, async_srv_scene_off, schema=TRIGGER_SCENE_SCHEMA
)
hass.services.async_register(
DOMAIN,
SRV_ADD_DEFAULT_LINKS,
async_add_default_links,
schema=ADD_DEFAULT_LINKS_SCHEMA,
)
async_dispatcher_connect(hass, SIGNAL_SAVE_DEVICES, async_srv_save_devices)
async_dispatcher_connect(
hass, SIGNAL_ADD_DEVICE_OVERRIDE, async_add_device_override
)
async_dispatcher_connect(
hass, SIGNAL_REMOVE_DEVICE_OVERRIDE, async_remove_device_override
)
async_dispatcher_connect(hass, SIGNAL_ADD_X10_DEVICE, async_add_x10_device)
async_dispatcher_connect(hass, SIGNAL_REMOVE_X10_DEVICE, async_remove_x10_device)
async_dispatcher_connect(hass, SIGNAL_REMOVE_HA_DEVICE, async_remove_ha_device)
async_dispatcher_connect(
hass, SIGNAL_REMOVE_INSTEON_DEVICE, async_remove_insteon_device
)
_LOGGER.debug("Insteon Services registered")
def print_aldb_to_log(aldb):
"""Print the All-Link Database to the log file."""
logger = logging.getLogger(f"{__name__}.links")
logger.info("%s ALDB load status is %s", aldb.address, aldb.status.name)
if aldb.status not in [ALDBStatus.LOADED, ALDBStatus.PARTIAL]:
_LOGGER.warning("All-Link database not loaded")
logger.info("RecID In Use Mode HWM Group Address Data 1 Data 2 Data 3")
logger.info("----- ------ ---- --- ----- -------- ------ ------ ------")
for mem_addr in aldb:
rec = aldb[mem_addr]
# For now we write this to the log
# Roadmap is to create a configuration panel
in_use = "Y" if rec.is_in_use else "N"
mode = "C" if rec.is_controller else "R"
hwm = "Y" if rec.is_high_water_mark else "N"
log_msg = (
f" {rec.mem_addr:04x} {in_use:s} {mode:s} {hwm:s} "
f"{rec.group:3d} {rec.target!s:s} {rec.data1:3d} "
f"{rec.data2:3d} {rec.data3:3d}"
)
logger.info(log_msg)
@callback
def async_add_insteon_entities(
hass: HomeAssistant,
platform: Platform,
entity_type: type[InsteonEntity],
async_add_entities: AddEntitiesCallback,
discovery_info: dict[str, Any],
) -> None:
"""Add an Insteon group to a platform."""
address = discovery_info["address"]
device = devices[address]
new_entities = [
entity_type(device=device, group=group) for group in discovery_info["groups"]
]
async_add_entities(new_entities)
@callback
def async_add_insteon_devices(
hass: HomeAssistant,
platform: Platform,
entity_type: type[InsteonEntity],
async_add_entities: AddEntitiesCallback,
) -> None:
"""Add all entities to a platform."""
for address in devices:
device = devices[address]
groups = get_device_platform_groups(device, platform)
discovery_info = {"address": address, "groups": groups}
async_add_insteon_entities(
hass, platform, entity_type, async_add_entities, discovery_info
)
def get_usb_ports() -> dict[str, str]:
"""Return a dict of USB ports and their friendly names."""
ports = list_ports.comports()
port_descriptions = {}
for port in ports:
vid: str | None = None
pid: str | None = None
if port.vid is not None and port.pid is not None:
usb_device = usb.usb_device_from_port(port)
vid = usb_device.vid
pid = usb_device.pid
dev_path = usb.get_serial_by_id(port.device)
human_name = usb.human_readable_device_name(
dev_path,
port.serial_number,
port.manufacturer,
port.description,
vid,
pid,
)
port_descriptions[dev_path] = human_name
return port_descriptions
async def async_get_usb_ports(hass: HomeAssistant) -> dict[str, str]:
"""Return a dict of USB ports and their friendly names."""
return await hass.async_add_executor_job(get_usb_ports)
def compute_device_name(ha_device) -> str:
"""Return the HA device name."""
return ha_device.name_by_user if ha_device.name_by_user else ha_device.name
async def async_device_name(dev_registry: dr.DeviceRegistry, address: Address) -> str:
"""Get the Insteon device name from a device registry id."""
ha_device = dev_registry.async_get_device(identifiers={(DOMAIN, str(address))})
if not ha_device:
if device := devices[address]:
return f"{device.description} ({device.model})"
return ""
return compute_device_name(ha_device)