-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathzdo.py
115 lines (85 loc) · 3.37 KB
/
zdo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import asyncio
import logging
import zigpy.zdo.types as zdo_t
import zigpy.types as t
import zigpy.device
import zigpy.zdo
LOGGER = logging.getLogger(__name__)
async def leave(app, listener, ieee, cmd, data, service):
if ieee is None or not data:
LOGGER.warning("Incorrect parameters for 'zdo.leave' command: %s", service)
return
LOGGER.debug(
"running 'leave' command. Telling 0x%s to remove %s: %s", data, ieee, service
)
parent = int(data, base=16)
parent = app.get_device(nwk=parent)
res = await parent.zdo.request(zdo_t.ZDOCmd.Mgmt_Leave_req, ieee, 0x02)
LOGGER.debug("0x%04x: Mgmt_Leave_req: %s", parent.nwk, res)
async def ieee_ping(app, listener, ieee, cmd, data, service):
if ieee is None:
LOGGER.warning("Incorrect parameters for 'ieee_ping' command: %s", service)
return
dev = app.get_device(ieee=ieee)
LOGGER.debug("running 'ieee_ping' command to 0x%s", dev.nwk)
res = await dev.zdo.request(zdo_t.ZDOCmd.IEEE_addr_req, dev.nwk, 0x00, 0x00)
LOGGER.debug("0x%04x: IEEE_addr_req: %s", dev.nwk, res)
async def join_with_code(app, listener, ieee, cmd, data, service):
code = b"\xA8\x16\x92\x7F\xB1\x9B\x78\x55\xC1\xD7\x76\x0D\x5C\xAD\x63\x7F\x69\xCC"
# res = await app.permit_with_key(node, code, 60)
import bellows.types as bt
node = t.EUI64.convert("04:cf:8c:df:3c:75:e1:e7")
link_key = bt.EmberKeyData(b"ZigBeeAlliance09")
res = await app._ezsp.addTransientLinkKey(node, link_key)
LOGGER.debug("permit with key: %s", res)
res = await app.permit(60)
async def update_nwk_id(app, listener, ieee, cmd, data, service):
"""Update NWK id. data contains new NWK id."""
if data is None:
LOGGER.error("Need NWK update id in the data")
return
nwk_upd_id = t.uint8_t(data)
await zigpy.device.broadcast(
app,
0,
zdo_t.ZDOCmd.Mgmt_NWK_Update_req,
0,
0,
0x0000,
0x00,
0xEE,
b"\xee"
+ t.Channels.ALL_CHANNELS.serialize()
+ b"\xFF"
+ nwk_upd_id.serialize()
+ b"\x00\x00",
)
res = await app._ezsp.getNetworkParameters()
LOGGER.debug("Network params: %s", res)
async def topo_scan_now(app, listener, ieee, cmd, data, service):
LOGGER.debug("Scanning topology")
asyncio.create_task(app.topology.scan())
async def flood_parent_annce(app, listener, ieee, cmd, data, service):
LOGGER.debug("flooding network with parent annce")
flooder_task = getattr(app, "flooder_task", None)
if flooder_task and not flooder_task.done():
flooder_task.cancel()
LOGGER.debug("Stop flooding network with parent annce messages")
setattr(app, "flooder_tass", None)
return
flooder_task = asyncio.create_task(_flood_with_parent_annce(app))
setattr(app, "flooder_task", flooder_task)
async def _flood_with_parent_annce(app):
coord = app.get_device(app.ieee)
while True:
children = [nei.device.ieee for nei in coord.neighbors if nei.device.node_desc.is_end_device]
coord.debug("Have the following children: %s", children)
res = await zigpy.zdo.broadcast(
app,
zigpy.zdo.types.ZDOCmd.Parent_annce,
0x0000,
0x00,
children,
broadcast_address=t.BroadcastAddress.ALL_ROUTERS_AND_COORDINATOR,
)
await asyncio.sleep(.1)