Skip to content

Joining with link key #150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -416,6 +416,26 @@ def test_handle_tx_status_duplicate(api):
assert send_fut.set_exception.call_count == 0


def test_handle_registration_status(api):
frame_id = 0x12
status = xbee_api.RegistrationStatus.SUCCESS
fut = asyncio.Future()
api._awaiting[frame_id] = (fut,)
api._handle_registration_status(frame_id, status)
assert fut.done() is True
assert fut.result() == xbee_api.RegistrationStatus.SUCCESS
assert fut.exception() is None

frame_id = 0x13
status = xbee_api.RegistrationStatus.KEY_TABLE_IS_FULL
fut = asyncio.Future()
api._awaiting[frame_id] = (fut,)
api._handle_registration_status(frame_id, status)
assert fut.done() is True
with pytest.raises(RuntimeError, match="Registration Status: KEY_TABLE_IS_FULL"):
fut.result()


async def test_command_mode_at_cmd(api):
command = "+++"

26 changes: 26 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
@@ -413,6 +413,32 @@ async def test_permit(app):
assert app._api._at_command.call_args_list[0][0][1] == time_s


async def test_permit_with_key(app):
app._api._command = mock.AsyncMock(return_value=xbee_t.TXStatus.SUCCESS)
app._api._at_command = mock.AsyncMock(return_value="OK")
node = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08")
code = b"\xC9\xA7\xD2\x44\x1A\x71\x16\x95\xCD\x62\x17\x0D\x33\x28\xEA\x2B\x42\x3D"
time_s = 500
await app.permit_with_key(node=node, code=code, time_s=time_s)
app._api._at_command.assert_called_once_with("KT", time_s)
app._api._command.assert_called_once_with(
"register_joining_device", node, 0xFFFE, 1, code
)


async def test_permit_with_link_key(app):
app._api._command = mock.AsyncMock(return_value=xbee_t.TXStatus.SUCCESS)
app._api._at_command = mock.AsyncMock(return_value="OK")
node = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08")
link_key = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F"
time_s = 500
await app.permit_with_link_key(node=node, link_key=link_key, time_s=time_s)
app._api._at_command.assert_called_once_with("KT", time_s)
app._api._command.assert_called_once_with(
"register_joining_device", node, 0xFFFE, 0, link_key
)


async def _test_request(
app, expect_reply=True, send_success=True, send_timeout=False, **kwargs
):
31 changes: 30 additions & 1 deletion zigpy_xbee/api.py
Original file line number Diff line number Diff line change
@@ -89,6 +89,20 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
_UNDEFINED = 0xFF


class RegistrationStatus(t.uint8_t, t.UndefinedEnum):
SUCCESS = 0x00
KEY_TOO_LONG = 0x01
TRANSIENT_KEY_TABLE_IS_FULL = 0x18
ADDRESS_NOT_FOUND_IN_THE_KEY_TABLE = 0xB1
KEY_IS_INVALID_OR_RESERVED = 0xB2
INVALID_ADDRESS = 0xB3
KEY_TABLE_IS_FULL = 0xB4
SECURITY_DATA_IS_INVALID_INSTALL_CODE_CRC_FAILS = 0xBD

UNKNOWN_MODEM_STATUS = 0xFF
_UNDEFINED = 0xFF


# https://www.digi.com/resources/documentation/digidocs/PDFs/90000976.pdf
COMMAND_REQUESTS = {
"at": (0x08, (t.FrameId, t.ATCommand, t.Bytes), 0x88),
@@ -120,7 +134,11 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
(t.FrameId, t.EUI64, t.NWK, t.uint8_t, t.Relays),
None,
),
"register_joining_device": (0x24, (), None),
"register_joining_device": (
0x24,
(t.FrameId, t.EUI64, t.uint16_t, t.uint8_t, t.Bytes),
0xA4,
),
}
COMMAND_RESPONSES = {
"at_response": (0x88, (t.FrameId, t.ATCommand, t.uint8_t, t.Bytes), None),
@@ -155,6 +173,7 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
"extended_status": (0x98, (), None),
"route_record_indicator": (0xA1, (t.EUI64, t.NWK, t.uint8_t, t.Relays), None),
"many_to_one_rri": (0xA3, (t.EUI64, t.NWK, t.uint8_t), None),
"registration_status": (0xA4, (t.FrameId, RegistrationStatus), None),
"node_id_indicator": (0x95, (), None),
}

@@ -201,6 +220,7 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum):
"EO": t.uint8_t,
"NK": t.Bytes, # 128-bit value
"KY": t.Bytes, # 128-bit value
"KT": t.uint16_t, # 0x1E - 0xFFFF
# RF interfacing commands
"PL": t.uint8_t, # 0 - 4 (basically an Enum)
"PM": t.Bool,
@@ -549,6 +569,15 @@ def _handle_tx_status(self, frame_id, nwk, tries, tx_status, dsc_status):
except asyncio.InvalidStateError as ex:
LOGGER.debug("duplicate tx_status for %s nwk? State: %s", nwk, ex)

def _handle_registration_status(self, frame_id, status):
(fut,) = self._awaiting.pop(frame_id)
if status:
fut.set_exception(RuntimeError(f"Registration Status: {status.name}"))
return
LOGGER.debug(f"Registration Status: {status.name}")

fut.set_result(status)

def set_application(self, app):
self._app = app

17 changes: 15 additions & 2 deletions zigpy_xbee/zigbee/application.py
Original file line number Diff line number Diff line change
@@ -319,8 +319,21 @@ async def permit_ncp(self, time_s=60):
await self._api._at_command("NJ", time_s)
await self._api._at_command("AC")

async def permit_with_key(self, node, code, time_s=60):
raise NotImplementedError("XBee does not support install codes")
async def permit_with_link_key(
self, node: EUI64, link_key: zigpy.types.KeyData, time_s: int = 500, key_type=0
):
"""Permits a new device to join with the given IEEE and link key."""
assert 0x1E <= time_s <= 0xFFFF
await self._api._at_command("KT", time_s)
reserved = 0xFFFE
# Key type:
# 0 = Pre-configured Link Key (KY command of the joining device)
# 1 = Install Code With CRC (I? command of the joining device)
await self._api.register_joining_device(node, reserved, key_type, link_key)

async def permit_with_key(self, node: EUI64, code: bytes, time_s=500):
"""Permits a new device to join with the given IEEE and Install Code."""
await self.permit_with_link_key(node, code, time_s, key_type=1)

def handle_modem_status(self, status):
LOGGER.info("Modem status update: %s (%s)", status.name, status.value)