Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Aug 14, 2024
1 parent 66a32a0 commit 7792587
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
4 changes: 4 additions & 0 deletions bellows/ezsp/v4/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ async def set_extended_timeout(
)

if t.sl_Status.from_ember_status(status) != t.sl_Status.OK:
# Last-ditch effort
await self.setExtendedTimeout(
remoteEui64=ieee, extendedTimeout=extended_timeout
)
return

# Replace a random entry in the address table
Expand Down
1 change: 1 addition & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def form_network():
)

proto.factory_reset = AsyncMock(proto=proto.factory_reset)
proto.set_extended_timeout = AsyncMock(proto=proto.set_extended_timeout)

proto.read_link_keys = MagicMock()
proto.read_link_keys.return_value.__aiter__.return_value = [
Expand Down
109 changes: 108 additions & 1 deletion tests/test_ezsp_v4.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from unittest.mock import MagicMock, call
from unittest.mock import MagicMock, call, patch

import pytest
import zigpy.state
Expand Down Expand Up @@ -379,3 +379,110 @@ async def test_read_counters(ezsp_f, length: int) -> None:
)

assert counters1 == counters2 == {t.EmberCounterType(i): i for i in range(length)}


async def test_set_extended_timeout_no_entry(ezsp_f) -> None:
# Typical invocation
ezsp_f.getExtendedTimeout.return_value = (t.Bool.false,)
ezsp_f.lookupNodeIdByEui64.return_value = (0xFFFF,) # No address table entry
ezsp_f.getConfigurationValue.return_value = (t.EmberStatus.SUCCESS, 8)
ezsp_f.replaceAddressTableEntry.return_value = (
t.EmberStatus.SUCCESS,
t.EUI64.convert("ff:ff:ff:ff:ff:ff:ff:ff"),
0xFFFF,
t.Bool.false,
)

with patch("bellows.ezsp.v4.random.randint") as mock_random:
mock_random.return_value = 0
await ezsp_f.set_extended_timeout(
nwk=0x1234,
ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"),
extended_timeout=True,
)

assert ezsp_f.getExtendedTimeout.mock_calls == [
call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"))
]
assert ezsp_f.lookupNodeIdByEui64.mock_calls == [
call(eui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"))
]
assert ezsp_f.getConfigurationValue.mock_calls == [
call(t.EzspConfigId.CONFIG_ADDRESS_TABLE_SIZE)
]
assert mock_random.mock_calls == [call(0, 8 - 1)]
assert ezsp_f.replaceAddressTableEntry.mock_calls == [
call(
addressTableIndex=0,
newEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"),
newId=0x1234,
newExtendedTimeout=True,
)
]


async def test_set_extended_timeout_already_set(ezsp_f) -> None:
# No-op, it's already set
ezsp_f.setExtendedTimeout.return_value = ()
ezsp_f.getExtendedTimeout.return_value = (t.Bool.true,)

await ezsp_f.set_extended_timeout(
nwk=0x1234,
ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"),
extended_timeout=True,
)

assert ezsp_f.getExtendedTimeout.mock_calls == [
call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"))
]
assert ezsp_f.setExtendedTimeout.mock_calls == []


async def test_set_extended_timeout_already_have_entry(ezsp_f) -> None:
# An address table entry is present
ezsp_f.setExtendedTimeout.return_value = ()
ezsp_f.getExtendedTimeout.return_value = (t.Bool.false,)
ezsp_f.lookupNodeIdByEui64.return_value = (0x1234,)

await ezsp_f.set_extended_timeout(
nwk=0x1234,
ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"),
extended_timeout=True,
)

assert ezsp_f.getExtendedTimeout.mock_calls == [
call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"))
]
assert ezsp_f.lookupNodeIdByEui64.mock_calls == [
call(eui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"))
]
assert ezsp_f.setExtendedTimeout.mock_calls == [
call(
remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"), extendedTimeout=True
)
]


async def test_set_extended_timeout_bad_table_size(ezsp_f) -> None:
ezsp_f.setExtendedTimeout.return_value = ()
ezsp_f.getExtendedTimeout.return_value = (t.Bool.false,)
ezsp_f.lookupNodeIdByEui64.return_value = (0xFFFF,)
ezsp_f.getConfigurationValue.return_value = (t.EmberStatus.ERR_FATAL, 0xFF)

with patch("bellows.ezsp.v4.random.randint") as mock_random:
mock_random.return_value = 0
await ezsp_f.set_extended_timeout(
nwk=0x1234,
ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"),
extended_timeout=True,
)

assert ezsp_f.getExtendedTimeout.mock_calls == [
call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"))
]
assert ezsp_f.lookupNodeIdByEui64.mock_calls == [
call(eui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"))
]
assert ezsp_f.getConfigurationValue.mock_calls == [
call(t.EzspConfigId.CONFIG_ADDRESS_TABLE_SIZE)
]

0 comments on commit 7792587

Please sign in to comment.