diff --git a/bellows/ezsp/v4/__init__.py b/bellows/ezsp/v4/__init__.py index b70b2664..ebf18001 100644 --- a/bellows/ezsp/v4/__init__.py +++ b/bellows/ezsp/v4/__init__.py @@ -217,6 +217,10 @@ async def set_extended_timeout( ) if t.sl_Status.from_ember_status(status) != t.sl_Status.OK: + # Last-ditch effort + await self.setExtendedTimeout( + remoteEui64=ieee, extendedTimeout=extended_timeout + ) return # Replace a random entry in the address table diff --git a/tests/test_application.py b/tests/test_application.py index c846f6bf..fce1a8b0 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -178,6 +178,7 @@ def form_network(): ) proto.factory_reset = AsyncMock(proto=proto.factory_reset) + proto.set_extended_timeout = AsyncMock(proto=proto.set_extended_timeout) proto.read_link_keys = MagicMock() proto.read_link_keys.return_value.__aiter__.return_value = [ diff --git a/tests/test_ezsp_v4.py b/tests/test_ezsp_v4.py index 69e00243..3f035366 100644 --- a/tests/test_ezsp_v4.py +++ b/tests/test_ezsp_v4.py @@ -1,5 +1,5 @@ import logging -from unittest.mock import MagicMock, call +from unittest.mock import MagicMock, call, patch import pytest import zigpy.state @@ -379,3 +379,110 @@ async def test_read_counters(ezsp_f, length: int) -> None: ) assert counters1 == counters2 == {t.EmberCounterType(i): i for i in range(length)} + + +async def test_set_extended_timeout_no_entry(ezsp_f) -> None: + # Typical invocation + ezsp_f.getExtendedTimeout.return_value = (t.Bool.false,) + ezsp_f.lookupNodeIdByEui64.return_value = (0xFFFF,) # No address table entry + ezsp_f.getConfigurationValue.return_value = (t.EmberStatus.SUCCESS, 8) + ezsp_f.replaceAddressTableEntry.return_value = ( + t.EmberStatus.SUCCESS, + t.EUI64.convert("ff:ff:ff:ff:ff:ff:ff:ff"), + 0xFFFF, + t.Bool.false, + ) + + with patch("bellows.ezsp.v4.random.randint") as mock_random: + mock_random.return_value = 0 + await ezsp_f.set_extended_timeout( + nwk=0x1234, + ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"), + extended_timeout=True, + ) + + assert ezsp_f.getExtendedTimeout.mock_calls == [ + call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")) + ] + assert ezsp_f.lookupNodeIdByEui64.mock_calls == [ + call(eui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")) + ] + assert ezsp_f.getConfigurationValue.mock_calls == [ + call(t.EzspConfigId.CONFIG_ADDRESS_TABLE_SIZE) + ] + assert mock_random.mock_calls == [call(0, 8 - 1)] + assert ezsp_f.replaceAddressTableEntry.mock_calls == [ + call( + addressTableIndex=0, + newEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"), + newId=0x1234, + newExtendedTimeout=True, + ) + ] + + +async def test_set_extended_timeout_already_set(ezsp_f) -> None: + # No-op, it's already set + ezsp_f.setExtendedTimeout.return_value = () + ezsp_f.getExtendedTimeout.return_value = (t.Bool.true,) + + await ezsp_f.set_extended_timeout( + nwk=0x1234, + ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"), + extended_timeout=True, + ) + + assert ezsp_f.getExtendedTimeout.mock_calls == [ + call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")) + ] + assert ezsp_f.setExtendedTimeout.mock_calls == [] + + +async def test_set_extended_timeout_already_have_entry(ezsp_f) -> None: + # An address table entry is present + ezsp_f.setExtendedTimeout.return_value = () + ezsp_f.getExtendedTimeout.return_value = (t.Bool.false,) + ezsp_f.lookupNodeIdByEui64.return_value = (0x1234,) + + await ezsp_f.set_extended_timeout( + nwk=0x1234, + ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"), + extended_timeout=True, + ) + + assert ezsp_f.getExtendedTimeout.mock_calls == [ + call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")) + ] + assert ezsp_f.lookupNodeIdByEui64.mock_calls == [ + call(eui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")) + ] + assert ezsp_f.setExtendedTimeout.mock_calls == [ + call( + remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"), extendedTimeout=True + ) + ] + + +async def test_set_extended_timeout_bad_table_size(ezsp_f) -> None: + ezsp_f.setExtendedTimeout.return_value = () + ezsp_f.getExtendedTimeout.return_value = (t.Bool.false,) + ezsp_f.lookupNodeIdByEui64.return_value = (0xFFFF,) + ezsp_f.getConfigurationValue.return_value = (t.EmberStatus.ERR_FATAL, 0xFF) + + with patch("bellows.ezsp.v4.random.randint") as mock_random: + mock_random.return_value = 0 + await ezsp_f.set_extended_timeout( + nwk=0x1234, + ieee=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11"), + extended_timeout=True, + ) + + assert ezsp_f.getExtendedTimeout.mock_calls == [ + call(remoteEui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")) + ] + assert ezsp_f.lookupNodeIdByEui64.mock_calls == [ + call(eui64=t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")) + ] + assert ezsp_f.getConfigurationValue.mock_calls == [ + call(t.EzspConfigId.CONFIG_ADDRESS_TABLE_SIZE) + ]