Skip to content

Commit

Permalink
Fix command deserialization for getTokenData (#619)
Browse files Browse the repository at this point in the history
* Allow passing structs as command schemas

* Fix uses of `getTokenData`

* Fix unit tests

* Bring test coverage up to 100%
  • Loading branch information
puddly authored Apr 29, 2024
1 parent 7833647 commit fbf754f
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 30 deletions.
6 changes: 3 additions & 3 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,14 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None:
t.NV3KeyId.NVM3KEY_STACK_RESTORED_EUI64, # RCP firmware
):
try:
status, data = await self.getTokenData(key, 0)
rsp = await self.getTokenData(key, 0)
except (InvalidCommandError, AttributeError):
# Either the command doesn't exist in the EZSP version, or the command
# is not implemented in the firmware
return None

if status == t.EmberStatus.SUCCESS:
nv3_restored_eui64, _ = t.EUI64.deserialize(data)
if rsp.status == t.EmberStatus.SUCCESS:
nv3_restored_eui64, _ = t.EUI64.deserialize(rsp.value)
LOGGER.debug("NV3 restored EUI64: %s=%s", key, nv3_restored_eui64)

return key
Expand Down
13 changes: 8 additions & 5 deletions bellows/ezsp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ async def command(self, name, *args) -> Any:
LOGGER.debug("Send command %s: %s", name, args)
data = self._ezsp_frame(name, *args)
self._gw.data(data)
c = self.COMMANDS[name]
future = asyncio.Future()
self._awaiting[self._seq] = (c[0], c[2], future)
cmd_id, _, rx_schema = self.COMMANDS[name]
future = asyncio.get_running_loop().create_future()
self._awaiting[self._seq] = (cmd_id, rx_schema, future)
self._seq = (self._seq + 1) % 256

async with asyncio_timeout(EZSP_CMD_TIMEOUT):
Expand All @@ -89,7 +89,7 @@ def __call__(self, data: bytes) -> None:
sequence, frame_id, data = self._ezsp_frame_rx(data)

try:
frame_name, _, schema = self.COMMANDS_BY_ID[frame_id]
frame_name, _, rx_schema = self.COMMANDS_BY_ID[frame_id]
except KeyError:
LOGGER.warning(
"Unknown application frame 0x%04X received: %s (%s). This is a bug!",
Expand All @@ -100,7 +100,10 @@ def __call__(self, data: bytes) -> None:
return

try:
result, data = self.types.deserialize(data, schema)
if isinstance(rx_schema, tuple):
result, data = self.types.deserialize(data, rx_schema)
else:
result, data = rx_schema.deserialize(data)
except Exception:
LOGGER.warning(
"Failed to parse frame %s: %s", frame_name, binascii.hexlify(data)
Expand Down
4 changes: 3 additions & 1 deletion bellows/ezsp/v10/commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from bellows.ezsp.v9.commands import GetTokenDataRsp

from . import types as t

COMMANDS = {
Expand Down Expand Up @@ -691,7 +693,7 @@
"getTokenData": (
0x0102,
(t.uint32_t, t.uint32_t),
(t.EmberStatus, t.LVBytes32),
GetTokenDataRsp,
),
"setTokenData": (
0x0103,
Expand Down
12 changes: 11 additions & 1 deletion bellows/ezsp/v9/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
from zigpy.types import Struct, StructField

from . import types as t


class GetTokenDataRsp(Struct):
status: t.EmberStatus
value: t.LVBytes32 = StructField(
requires=lambda rsp: rsp.status == t.EmberStatus.SUCCESS
)


COMMANDS = {
# 4. Configuration frames
"version": (0x0000, (t.uint8_t,), (t.uint8_t, t.uint8_t, t.uint16_t)),
Expand Down Expand Up @@ -687,7 +697,7 @@
"getTokenData": (
0x0102,
(t.uint32_t, t.uint32_t),
(t.EmberStatus, t.LVBytes32),
GetTokenDataRsp,
),
"setTokenData": (
0x0103,
Expand Down
8 changes: 3 additions & 5 deletions bellows/zigbee/repairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ async def fix_invalid_tclk_partner_ieee(ezsp: EZSP) -> bool:
)

try:
(status, value) = await ezsp.getTokenData(
t.NV3KeyId.NVM3KEY_STACK_TRUST_CENTER, 0
)
assert status == t.EmberStatus.SUCCESS
rsp = await ezsp.getTokenData(t.NV3KeyId.NVM3KEY_STACK_TRUST_CENTER, 0)
assert rsp.status == t.EmberStatus.SUCCESS
except (InvalidCommandError, AttributeError, AssertionError):
LOGGER.warning("NV3 interface not available in this firmware, please upgrade!")
return False

token, remaining = t.NV3StackTrustCenterToken.deserialize(value)
token, remaining = t.NV3StackTrustCenterToken.deserialize(rsp.value)
assert not remaining
assert token.eui64 == state.trustCenterLongAddress

Expand Down
5 changes: 4 additions & 1 deletion tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import bellows.ezsp.v6.types as ezsp_t6
import bellows.ezsp.v7.types as ezsp_t7
import bellows.ezsp.v8.types as ezsp_t8
from bellows.ezsp.v9.commands import GetTokenDataRsp
import bellows.types.struct
import bellows.uart as uart
import bellows.zigbee.application
Expand Down Expand Up @@ -154,7 +155,9 @@ async def nop_mock():
ezsp_mock.readAndClearCounters = AsyncMock(side_effect=nop_mock)
ezsp_mock._protocol = AsyncMock()
ezsp_mock.setConcentrator = AsyncMock()
ezsp_mock.getTokenData = AsyncMock(return_value=[t.EmberStatus.ERR_FATAL, b""])
ezsp_mock.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.ERR_FATAL)
)
ezsp_mock._command = AsyncMock(return_value=t.EmberStatus.SUCCESS)
ezsp_mock.addEndpoint = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp_mock.setConfigurationValue = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
Expand Down
5 changes: 4 additions & 1 deletion tests/test_application_network_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from bellows.exception import EzspError
from bellows.ezsp import EZSP
from bellows.ezsp.v9.commands import GetTokenDataRsp
import bellows.types as t

from tests.async_mock import AsyncMock, PropertyMock
Expand Down Expand Up @@ -518,7 +519,9 @@ def form_network(params):

ezsp.setValue = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp.setMfgToken = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp.getTokenData = AsyncMock(return_value=[t.EmberStatus.LIBRARY_NOT_PRESENT, b""])
ezsp.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.LIBRARY_NOT_PRESENT)
)


@pytest.mark.parametrize("ezsp_ver", [4, 7, 13])
Expand Down
14 changes: 10 additions & 4 deletions tests/test_ezsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from unittest.mock import ANY, AsyncMock, MagicMock, call, patch, sentinel

from bellows.ezsp.v9.commands import GetTokenDataRsp

DEVICE_CONFIG = {
zigpy.config.CONF_DEVICE_PATH: "/dev/null",
zigpy.config.CONF_DEVICE_BAUDRATE: 115200,
Expand Down Expand Up @@ -524,9 +526,9 @@ async def test_can_rewrite_custom_eui64(ezsp_f, tokens, expected_key, expected_r

def get_token_data(key, index):
if key not in tokens or index != 0:
return [t.EmberStatus.ERR_FATAL, b""]
return GetTokenDataRsp(status=t.EmberStatus.ERR_FATAL)

return [t.EmberStatus.SUCCESS, tokens[key]]
return GetTokenDataRsp(status=t.EmberStatus.SUCCESS, value=tokens[key])

ezsp_f.getTokenData = AsyncMock(side_effect=get_token_data)

Expand Down Expand Up @@ -623,7 +625,9 @@ async def test_write_custom_eui64_rcp(ezsp_f):

# RCP firmware does not support manufacturing tokens
ezsp_f.getMfgToken = AsyncMock(return_value=[b""])
ezsp_f.getTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS, b"\xFF" * 8])
ezsp_f.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.SUCCESS, value=b"\xFF" * 8)
)

await ezsp_f.write_custom_eui64(new_eui64)

Expand Down Expand Up @@ -858,7 +862,9 @@ async def test_reset_custom_eui64(ezsp_f):
assert len(ezsp_f.setTokenData.mock_calls) == 0

# With NV3 interface
ezsp_f.getTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS, b"\xAB" * 8])
ezsp_f.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.SUCCESS, value=b"\xAB" * 8)
)
await ezsp_f.reset_custom_eui64()
assert ezsp_f.setTokenData.mock_calls == [
call(t.NV3KeyId.CREATOR_STACK_RESTORED_EUI64, 0, t.LVBytes32(b"\xFF" * 8))
Expand Down
27 changes: 27 additions & 0 deletions tests/test_ezsp_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from bellows.ezsp import EZSP
import bellows.ezsp.v4
import bellows.ezsp.v4.types as t
import bellows.ezsp.v9
from bellows.ezsp.v9.commands import GetTokenDataRsp
from bellows.types import NV3KeyId

from .async_mock import ANY, AsyncMock, MagicMock, call, patch

Expand All @@ -16,6 +19,12 @@ def prot_hndl():
return bellows.ezsp.v4.EZSPv4(MagicMock(), MagicMock())


@pytest.fixture
def prot_hndl_v9():
"""Protocol handler mock."""
return bellows.ezsp.v9.EZSPv9(MagicMock(), MagicMock())


async def test_command(prot_hndl):
coro = prot_hndl.command("nop")
asyncio.get_running_loop().call_soon(
Expand Down Expand Up @@ -94,3 +103,21 @@ async def test_logging_frame_parsing_failure(prot_hndl, caplog) -> None:
prot_hndl(b"\xAA\xAA\x71\x22")

assert "Failed to parse frame getKeyTableEntry: b'22'" in caplog.text


async def test_parsing_schema_response(prot_hndl_v9):
"""Test parsing data with a struct schema."""

coro = prot_hndl_v9.command(
"getTokenData", NV3KeyId.CREATOR_STACK_RESTORED_EUI64, 0
)
asyncio.get_running_loop().call_soon(
lambda: prot_hndl_v9(
bytes([prot_hndl_v9._seq - 1, 0x00, 0x00])
+ t.uint16_t(prot_hndl_v9.COMMANDS["getTokenData"][0]).serialize()
+ bytes([0xB5])
)
)

rsp = await coro
assert rsp == GetTokenDataRsp(status=t.EmberStatus.LIBRARY_NOT_PRESENT)
21 changes: 12 additions & 9 deletions tests/test_zigbee_repairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from bellows.exception import InvalidCommandError
from bellows.ezsp import EZSP
from bellows.ezsp.v9.commands import GetTokenDataRsp
import bellows.types as t
from bellows.zigbee import repairs

Expand Down Expand Up @@ -71,16 +72,16 @@ async def test_fix_invalid_tclk(ezsp_tclk_f: EZSP, caplog) -> None:

ezsp_tclk_f.setTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp_tclk_f.getTokenData = AsyncMock(
return_value=[
t.EmberStatus.SUCCESS,
t.NV3StackTrustCenterToken(
return_value=GetTokenDataRsp(
status=t.EmberStatus.SUCCESS,
value=t.NV3StackTrustCenterToken(
mode=228,
eui64=t.EUI64.convert("BB:BB:BB:BB:BB:BB:BB:BB"),
key=t.KeyData.convert(
"21:8e:df:b8:50:a0:4a:b6:8b:c6:10:25:bc:4e:93:6a"
),
).serialize(),
]
)
)
ezsp_tclk_f.getEui64.return_value[0] = t.EUI64.convert("AA:AA:AA:AA:AA:AA:AA:AA")
ezsp_tclk_f.getCurrentSecurityState.return_value[
Expand Down Expand Up @@ -121,21 +122,23 @@ async def test_fix_invalid_tclk_all_versions(
if fw_has_token_interface:
ezsp.setTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
ezsp.getTokenData = AsyncMock(
return_value=[
t.EmberStatus.SUCCESS,
t.NV3StackTrustCenterToken(
return_value=GetTokenDataRsp(
status=t.EmberStatus.SUCCESS,
value=t.NV3StackTrustCenterToken(
mode=228,
eui64=t.EUI64.convert("BB:BB:BB:BB:BB:BB:BB:BB"),
key=t.KeyData.convert(
"21:8e:df:b8:50:a0:4a:b6:8b:c6:10:25:bc:4e:93:6a"
),
).serialize(),
]
)
)

if not has_library:
ezsp.setTokenData = AsyncMock(return_value=[t.EmberStatus.LIBRARY_NOT_LOADED])
ezsp.getTokenData = AsyncMock(return_value=[t.EmberStatus.LIBRARY_NOT_LOADED])
ezsp.getTokenData = AsyncMock(
return_value=GetTokenDataRsp(status=t.EmberStatus.LIBRARY_NOT_LOADED)
)

ezsp.getEui64 = ezsp_tclk_f.getEui64
ezsp.getCurrentSecurityState = ezsp_tclk_f.getCurrentSecurityState
Expand Down

0 comments on commit fbf754f

Please sign in to comment.