From 06eb3d9223a01b6ea8dce64b40f3c845533c0d2d Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 9 Nov 2024 12:02:26 -0500 Subject: [PATCH] Support firmware extensions (#611) * Support firmware extensions * Fix startup * Expand the custom command protocol * Fix existing unit tests * Handle empty `customFrame` response * Support setting source routes as well * Rename custom commands to XNCP and support board string overrides * Use a generic interface to override manufacturing tokens * Hide firmware-level manual source routing behind `manual_source_routing` * Fix ruff issue * Remove erroneous `getMulticastTableEntry(0)` * Fix unit tests * Support `GET_BUILD_STRING_REQ` * Fix up after rebase * Add support for the `GET_FLOW_CONTROL_TYPE` XNCP command * Fix up after rebase * Coverage * Unit test XNCP commands * Move XNCP tests into their own module * Get coverage up to 100% --- bellows/config/__init__.py | 9 ++ bellows/ezsp/__init__.py | 101 ++++++++++++++++++--- bellows/ezsp/xncp.py | 161 ++++++++++++++++++++++++++++++++++ bellows/zigbee/application.py | 69 +++++++++++---- bellows/zigbee/device.py | 28 +++--- tests/test_application.py | 61 ++++++++++++- tests/test_ezsp.py | 49 ++++++++++- tests/test_xncp.py | 138 +++++++++++++++++++++++++++++ 8 files changed, 569 insertions(+), 47 deletions(-) create mode 100644 bellows/ezsp/xncp.py create mode 100644 tests/test_xncp.py diff --git a/bellows/config/__init__.py b/bellows/config/__init__.py index 29ffe647..74d91d9c 100644 --- a/bellows/config/__init__.py +++ b/bellows/config/__init__.py @@ -18,6 +18,9 @@ cv_boolean, ) +CONF_BELLOWS_CONFIG = "bellows_config" +CONF_MANUAL_SOURCE_ROUTING = "manual_source_routing" + CONF_USE_THREAD = "use_thread" CONF_EZSP_CONFIG = "ezsp_config" CONF_EZSP_POLICIES = "ezsp_policies" @@ -31,6 +34,12 @@ {vol.Optional(str): int} ), vol.Optional(CONF_USE_THREAD, default=True): cv_boolean, + # The above config really should belong in here + vol.Optional(CONF_BELLOWS_CONFIG, default={}): vol.Schema( + { + vol.Optional(CONF_MANUAL_SOURCE_ROUTING, default=False): bool, + } + ), } ) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 669fd0f2..e0c1dcfe 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -23,7 +23,9 @@ import bellows.config as conf from bellows.exception import EzspError, InvalidCommandError +from bellows.ezsp import xncp from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig +from bellows.ezsp.xncp import FirmwareFeatures, FlowControlType import bellows.types as t import bellows.uart @@ -62,6 +64,7 @@ def __init__(self, device_config: dict, application: Any | None = None): self._callbacks = {} self._ezsp_event = asyncio.Event() self._ezsp_version = v4.EZSPv4.VERSION + self._xncp_features = FirmwareFeatures.NONE self._gw = None self._protocol = None self._application = application @@ -124,6 +127,7 @@ async def startup_reset(self) -> None: await self.reset() await self.version() + await self.get_xncp_features() async def connect(self, *, use_thread: bool = True) -> None: assert self._gw is None @@ -167,13 +171,22 @@ async def version(self): if ver != self.ezsp_version: self._switch_protocol_version(ver) await self._command("version", desiredProtocolVersion=ver) + LOGGER.debug( - "EZSP Stack Type: %s, Stack Version: %04x, Protocol version: %s", + ("EZSP Stack Type: %s" ", Stack Version: %04x" ", Protocol version: %s"), stack_type, stack_version, ver, ) + async def get_xncp_features(self) -> None: + try: + self._xncp_features = await self.xncp_get_supported_firmware_features() + except InvalidCommandError: + self._xncp_features = xncp.FirmwareFeatures.NONE + + LOGGER.debug("XNCP features: %s", self._xncp_features) + async def disconnect(self): self.stop_ezsp() if self._gw: @@ -308,11 +321,10 @@ async def get_board_info( ) -> tuple[str, str, str | None] | tuple[None, None, str | None]: """Return board info.""" - tokens = {} + tokens: dict[t.EzspMfgTokenId, str | None] = {} - for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME): - (value,) = await self.getMfgToken(tokenId=token) - LOGGER.debug("Read %s token: %s", token.name, value) + for token_id in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME): + value = await self.get_mfg_token(token_id) # Tokens are fixed-length and initially filled with \xFF but also can end # with \x00 @@ -324,10 +336,7 @@ async def get_board_info( except UnicodeDecodeError: result = "0x" + value.hex().upper() - if not result: - result = None - - tokens[token] = result + tokens[token_id] = result or None (status, ver_info_bytes) = await self.getValue( valueId=t.EzspValueId.VALUE_VERSION_INFO @@ -342,6 +351,14 @@ async def get_board_info( special, ver_info_bytes = t.uint8_t.deserialize(ver_info_bytes) version = f"{major}.{minor}.{patch}.{special} build {build}" + try: + build_string = await self.xncp_get_build_string() + except InvalidCommandError: + build_string = None + + if build_string: + version = f"{version} ({build_string})" + return ( tokens[t.EzspMfgTokenId.MFG_STRING], tokens[t.EzspMfgTokenId.MFG_BOARD_NAME], @@ -369,9 +386,23 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None: return None + async def get_mfg_token(self, token: t.EzspMfgTokenId) -> bytes: + (value,) = await self.getMfgToken(tokenId=token) + LOGGER.debug("Read manufacturing token %s: %s", token.name, value) + + override_value = None + + if FirmwareFeatures.MFG_TOKEN_OVERRIDES in self._xncp_features: + with contextlib.suppress(InvalidCommandError): + override_value = await self.xncp_get_mfg_token_override(token) + + LOGGER.debug("XNCP override token %s: %s", token.name, override_value) + + return override_value or value + async def _get_mfg_custom_eui_64(self) -> t.EUI64 | None: """Get the custom EUI 64 manufacturing token, if it has a valid value.""" - (data,) = await self.getMfgToken(tokenId=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + data = await self.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) # Manufacturing tokens do not exist in RCP firmware: all reads are empty if not data: @@ -616,3 +647,53 @@ async def write_config(self, config: dict) -> None: status, ) continue + + async def send_xncp_frame( + self, payload: xncp.XncpCommandPayload + ) -> xncp.XncpCommandPayload: + """Send an XNCP frame.""" + req_frame = xncp.XncpCommand.from_payload(payload) + LOGGER.debug("Sending XNCP frame: %s", req_frame) + status, data = await self.customFrame(req_frame.serialize()) + + if status != t.EmberStatus.SUCCESS: + raise InvalidCommandError("XNCP is not supported") + + rsp_frame = xncp.XncpCommand.from_bytes(data) + LOGGER.debug("Received XNCP frame: %s", rsp_frame) + + if rsp_frame.status != t.EmberStatus.SUCCESS: + raise InvalidCommandError(f"XNCP response error: {rsp_frame.status}") + + return rsp_frame.payload + + async def xncp_get_supported_firmware_features(self) -> xncp.FirmwareFeatures: + """Get supported firmware extensions.""" + rsp = await self.send_xncp_frame(xncp.GetSupportedFeaturesReq()) + return rsp.features + + async def xncp_set_manual_source_route( + self, destination: t.NWK, route: list[t.NWK] + ) -> None: + """Set a manual source route.""" + await self.send_xncp_frame( + xncp.SetSourceRouteReq( + destination=destination, + source_route=route, + ) + ) + + async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes: + """Get manufacturing token override.""" + rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token)) + return rsp.value + + async def xncp_get_build_string(self) -> str: + """Get build string.""" + rsp = await self.send_xncp_frame(xncp.GetBuildStringReq()) + return rsp.build_string.decode("utf-8") + + async def xncp_get_flow_control_type(self) -> FlowControlType: + """Get flow control type.""" + rsp = await self.send_xncp_frame(xncp.GetFlowControlTypeReq()) + return rsp.flow_control_type diff --git a/bellows/ezsp/xncp.py b/bellows/ezsp/xncp.py new file mode 100644 index 00000000..f09c1064 --- /dev/null +++ b/bellows/ezsp/xncp.py @@ -0,0 +1,161 @@ +"""Custom EZSP commands.""" +from __future__ import annotations + +import dataclasses +import logging +from typing import Callable + +import zigpy.types as t + +from bellows.types import EmberStatus, EzspMfgTokenId + +_LOGGER = logging.getLogger(__name__) + +COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {} +REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {} + + +def register_command(command_id: XncpCommandId) -> Callable[[type], type]: + def decorator(cls: type) -> type: + COMMANDS[command_id] = cls + REV_COMMANDS[cls] = command_id + return cls + + return decorator + + +class Bytes(bytes): + def serialize(self) -> Bytes: + return self + + @classmethod + def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]: + return cls(data), b"" + + +class XncpCommandId(t.enum16): + GET_SUPPORTED_FEATURES_REQ = 0x0000 + SET_SOURCE_ROUTE_REQ = 0x0001 + GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002 + GET_BUILD_STRING_REQ = 0x0003 + GET_FLOW_CONTROL_TYPE_REQ = 0x0004 + + GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000 + SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000 + GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000 + GET_BUILD_STRING_RSP = GET_BUILD_STRING_REQ | 0x8000 + GET_FLOW_CONTROL_TYPE_RSP = GET_FLOW_CONTROL_TYPE_REQ | 0x8000 + + UNKNOWN = 0xFFFF + + +@dataclasses.dataclass +class XncpCommand: + command_id: XncpCommandId + status: EmberStatus + payload: XncpCommandPayload + + @classmethod + def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand: + return cls( + command_id=REV_COMMANDS[type(payload)], + status=EmberStatus.SUCCESS, + payload=payload, + ) + + @classmethod + def from_bytes(cls, data: bytes) -> XncpCommand: + command_id, data = XncpCommandId.deserialize(data) + status, data = EmberStatus.deserialize(data) + payload, rest = COMMANDS[command_id].deserialize(data) + + if rest: + _LOGGER.debug("Unparsed data remains after %s frame: %s", payload, rest) + + return cls(command_id=command_id, status=status, payload=payload) + + def serialize(self) -> Bytes: + return ( + self.command_id.serialize() + + self.status.serialize() + + self.payload.serialize() + ) + + +class FirmwareFeatures(t.bitmap32): + NONE = 0 + + # The firmware passes through all group traffic, regardless of group membership + MEMBER_OF_ALL_GROUPS = 1 << 0 + + # Source routes can be overridden by the application + MANUAL_SOURCE_ROUTE = 1 << 1 + + # The firmware supports overriding some manufacturing tokens + MFG_TOKEN_OVERRIDES = 1 << 2 + + # The firmware contains a free-form build string + BUILD_STRING = 1 << 3 + + # The flow control type (software or hardware) can be queried + FLOW_CONTROL_TYPE = 1 << 4 + + +class XncpCommandPayload(t.Struct): + pass + + +class FlowControlType(t.enum8): + Software = 0x00 + Hardware = 0x01 + + +@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_REQ) +class GetSupportedFeaturesReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_RSP) +class GetSupportedFeaturesRsp(XncpCommandPayload): + features: FirmwareFeatures + + +@register_command(XncpCommandId.SET_SOURCE_ROUTE_REQ) +class SetSourceRouteReq(XncpCommandPayload): + destination: t.NWK + source_route: t.List[t.NWK] + + +@register_command(XncpCommandId.SET_SOURCE_ROUTE_RSP) +class SetSourceRouteRsp(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_REQ) +class GetMfgTokenOverrideReq(XncpCommandPayload): + token: EzspMfgTokenId + + +@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_RSP) +class GetMfgTokenOverrideRsp(XncpCommandPayload): + value: Bytes + + +@register_command(XncpCommandId.GET_BUILD_STRING_REQ) +class GetBuildStringReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_BUILD_STRING_RSP) +class GetBuildStringRsp(XncpCommandPayload): + build_string: Bytes + + +@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_REQ) +class GetFlowControlTypeReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_RSP) +class GetFlowControlTypeRsp(XncpCommandPayload): + flow_control_type: FlowControlType diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 229e1499..c8cfdd35 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -25,17 +25,25 @@ import bellows from bellows.config import ( + CONF_BELLOWS_CONFIG, CONF_EZSP_CONFIG, CONF_EZSP_POLICIES, + CONF_MANUAL_SOURCE_ROUTING, CONF_USE_THREAD, CONFIG_SCHEMA, ) -from bellows.exception import ControllerError, EzspError, StackAlreadyRunning +from bellows.exception import ( + ControllerError, + EzspError, + InvalidCommandError, + StackAlreadyRunning, +) import bellows.ezsp +from bellows.ezsp.xncp import FirmwareFeatures import bellows.multicast import bellows.types as t from bellows.zigbee import repairs -from bellows.zigbee.device import EZSPEndpoint +from bellows.zigbee.device import EZSPEndpoint, EZSPGroupEndpoint import bellows.zigbee.util as util APS_ACK_TIMEOUT = 120 @@ -202,13 +210,19 @@ async def start_network(self): group_membership = {} - try: - db_device = self.get_device(ieee=self.state.node_info.ieee) - except KeyError: - pass + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in self._ezsp._xncp_features: + # If the firmware passes through all incoming group messages, do nothing + endpoint_cls = EZSPEndpoint else: - if 1 in db_device.endpoints: - group_membership = db_device.endpoints[1].member_of + endpoint_cls = EZSPGroupEndpoint + + try: + db_device = self.get_device(ieee=self.state.node_info.ieee) + except KeyError: + pass + else: + if 1 in db_device.endpoints: + group_membership = db_device.endpoints[1].member_of ezsp_device = zigpy.device.Device( application=self, @@ -220,18 +234,17 @@ async def start_network(self): # The coordinator device does not respond to attribute reads so we have to # divine the internal NCP state. for zdo_desc in self._created_device_endpoints: - ep = EZSPEndpoint(ezsp_device, zdo_desc.endpoint, zdo_desc) + ep = endpoint_cls.from_descriptor(ezsp_device, zdo_desc.endpoint, zdo_desc) ezsp_device.endpoints[zdo_desc.endpoint] = ep ezsp_device.model = ep.model ezsp_device.manufacturer = ep.manufacturer await ezsp_device.schedule_initialize() - # Group membership is stored in the database for EZSP coordinators - ezsp_device.endpoints[1].member_of.update(group_membership) - - self._multicast = bellows.multicast.Multicast(ezsp) - await self._multicast.startup(ezsp_device) + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS not in self._ezsp._xncp_features: + ezsp_device.endpoints[1].member_of.update(group_membership) + self._multicast = bellows.multicast.Multicast(ezsp) + await self._multicast.startup(ezsp_device) async def load_network_info(self, *, load_devices=False) -> None: ezsp = self._ezsp @@ -286,6 +299,11 @@ async def load_network_info(self, *, load_devices=False) -> None: can_burn_userdata_custom_eui64 = await ezsp.can_burn_userdata_custom_eui64() can_rewrite_custom_eui64 = await ezsp.can_rewrite_custom_eui64() + try: + flow_control = await self._ezsp.xncp_get_flow_control_type() + except InvalidCommandError: + flow_control = None + self.state.network_info = zigpy.state.NetworkInfo( source=f"bellows@{LIB_VERSION}", extended_pan_id=zigpy.types.ExtendedPanId(nwk_params.extendedPanId), @@ -306,6 +324,9 @@ async def load_network_info(self, *, load_devices=False) -> None: "stack_version": ezsp.ezsp_version, "can_burn_userdata_custom_eui64": can_burn_userdata_custom_eui64, "can_rewrite_custom_eui64": can_rewrite_custom_eui64, + "flow_control": ( + flow_control.name.lower() if flow_control is not None else None + ), } }, ) @@ -753,10 +774,22 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: ) if packet.source_route is not None: - await self._ezsp.set_source_route( - nwk=packet.dst.address, - relays=packet.source_route, - ) + if ( + FirmwareFeatures.MANUAL_SOURCE_ROUTE + in self._ezsp._xncp_features + and self.config[CONF_BELLOWS_CONFIG][ + CONF_MANUAL_SOURCE_ROUTING + ] + ): + await self._ezsp.xncp_set_manual_source_route( + nwk=packet.dst.address, + relays=packet.source_route, + ) + else: + await self._ezsp.set_source_route( + nwk=packet.dst.address, + relays=packet.source_route, + ) status, _ = await self._ezsp.send_unicast( nwk=packet.dst.address, diff --git a/bellows/zigbee/device.py b/bellows/zigbee/device.py index f34f3b80..9e6b65f7 100644 --- a/bellows/zigbee/device.py +++ b/bellows/zigbee/device.py @@ -21,30 +21,32 @@ class EZSPEndpoint(zigpy.endpoint.Endpoint): - def __init__( - self, + @classmethod + def from_descriptor( + cls, device: zigpy.device.Device, endpoint_id: int, descriptor: zdo_t.SimpleDescriptor, ) -> None: - super().__init__(device, endpoint_id) + ep = cls(device, endpoint_id) + ep.profile_id = descriptor.profile - self.profile_id = descriptor.profile - - if self.profile_id in PROFILE_TO_DEVICE_TYPE: - self.device_type = PROFILE_TO_DEVICE_TYPE[self.profile_id]( + if ep.profile_id in PROFILE_TO_DEVICE_TYPE: + ep.device_type = PROFILE_TO_DEVICE_TYPE[ep.profile_id]( descriptor.device_type ) else: - self.device_type = descriptor.device_type + ep.device_type = descriptor.device_type for cluster in descriptor.input_clusters: - self.add_input_cluster(cluster) + ep.add_input_cluster(cluster) for cluster in descriptor.output_clusters: - self.add_output_cluster(cluster) + ep.add_output_cluster(cluster) + + ep.status = zigpy.endpoint.Status.ZDO_INIT - self.status = zigpy.endpoint.Status.ZDO_INIT + return ep @property def manufacturer(self) -> str: @@ -56,7 +58,9 @@ def model(self) -> str: """Model.""" return "EZSP" - async def add_to_group(self, grp_id: int, name: str = None) -> None: + +class EZSPGroupEndpoint(EZSPEndpoint): + async def add_to_group(self, grp_id: int, name: str = None) -> t.EmberStatus: if grp_id in self.member_of: return diff --git a/tests/test_application.py b/tests/test_application.py index 8581e52f..ffb2588a 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -17,6 +17,7 @@ from bellows.exception import ControllerError, EzspError import bellows.ezsp as ezsp from bellows.ezsp.v9.commands import GetTokenDataRsp +from bellows.ezsp.xncp import FirmwareFeatures import bellows.types import bellows.types as t import bellows.types.struct @@ -24,6 +25,7 @@ import bellows.zigbee.application from bellows.zigbee.application import ControllerApplication import bellows.zigbee.device +from bellows.zigbee.device import EZSPEndpoint, EZSPGroupEndpoint from bellows.zigbee.util import map_rssi_to_energy from tests.common import mock_ezsp_commands @@ -122,6 +124,13 @@ def _create_app_for_startup( ezsp_mock.wait_for_stack_status.return_value.__enter__ = AsyncMock( return_value=t.EmberStatus.NETWORK_UP ) + ezsp_mock.customFrame = AsyncMock( + return_value=[t.EmberStatus.LIBRARY_NOT_PRESENT, b""] + ) + ezsp_mock.xncp_get_supported_firmware_features = AsyncMock( + return_value=FirmwareFeatures.NONE + ) + ezsp_mock._xncp_features = FirmwareFeatures.NONE if board_info: ezsp_mock.get_board_info = AsyncMock( @@ -844,6 +853,36 @@ async def test_send_packet_unicast_source_route(make_app, packet): ) +async def test_send_packet_unicast_manual_source_route(make_app, packet): + app = make_app( + { + zigpy.config.CONF_SOURCE_ROUTING: True, + config.CONF_BELLOWS_CONFIG: {config.CONF_MANUAL_SOURCE_ROUTING: True}, + } + ) + + app._ezsp._xncp_features |= FirmwareFeatures.MANUAL_SOURCE_ROUTE + + app._ezsp.xncp_set_manual_source_route = AsyncMock( + return_value=None, spec=app._ezsp._protocol.set_source_route + ) + + packet.source_route = [0x0001, 0x0002] + await _test_send_packet_unicast( + app, + packet, + options=( + t.EmberApsOption.APS_OPTION_RETRY + | t.EmberApsOption.APS_OPTION_ENABLE_ADDRESS_DISCOVERY + ), + ) + + app._ezsp.xncp_set_manual_source_route.assert_called_once_with( + nwk=packet.dst.address, + relays=[0x0001, 0x0002], + ) + + async def test_send_packet_unicast_extended_timeout(app, ieee, packet): app.add_device(nwk=packet.dst.address, ieee=ieee) @@ -1290,7 +1329,9 @@ async def test_shutdown(app): @pytest.fixture def coordinator(app, ieee): dev = zigpy.device.Device(app, ieee, 0x0000) - dev.endpoints[1] = bellows.zigbee.device.EZSPEndpoint(dev, 1, MagicMock()) + dev.endpoints[1] = bellows.zigbee.device.EZSPGroupEndpoint.from_descriptor( + dev, 1, MagicMock() + ) dev.model = dev.endpoints[1].model dev.manufacturer = dev.endpoints[1].manufacturer @@ -1623,8 +1664,8 @@ async def test_startup_coordinator_existing_groups_joined(app, ieee): db_device = app.add_device(ieee, 0x0000) db_ep = db_device.add_endpoint(1) - app.groups.add_group(0x1234, "Group Name", suppress_event=True) - app.groups[0x1234].add_member(db_ep, suppress_event=True) + group = app.groups.add_group(0x1234, "Group Name", suppress_event=True) + group.add_member(db_ep, suppress_event=True) await app.start_network() @@ -1636,6 +1677,19 @@ async def test_startup_coordinator_existing_groups_joined(app, ieee): ] +async def test_startup_coordinator_xncp_wildcard_groups(app, ieee): + """Coordinator ignores multicast workarounds with XNCP firmware.""" + with mock_for_startup(app, ieee) as ezsp: + ezsp._xncp_features |= FirmwareFeatures.MEMBER_OF_ALL_GROUPS + + await app.connect() + await app.start_network() + + # No multicast workarounds are present + assert app._multicast is None + assert not isinstance(app._device.endpoints[1], EZSPGroupEndpoint) + + async def test_startup_new_coordinator_no_groups_joined(app, ieee): """Coordinator freshy added to the database has no groups to join.""" with mock_for_startup(app, ieee): @@ -1820,6 +1874,7 @@ def zigpy_backup() -> zigpy.backups.NetworkBackup: metadata={ "ezsp": { "stack_version": 8, + "flow_control": None, "can_burn_userdata_custom_eui64": True, "can_rewrite_custom_eui64": True, } diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index 28fbf2c2..fa07f3df 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -11,7 +11,7 @@ from bellows import config, uart from bellows.ash import NcpFailure from bellows.exception import EzspError, InvalidCommandError -from bellows.ezsp import EZSP, EZSP_LATEST +from bellows.ezsp import EZSP, EZSP_LATEST, xncp import bellows.types as t if sys.version_info[:2] < (3, 11): @@ -40,6 +40,9 @@ async def mock_command(command, *args, **kwargs): api._mock_commands = {} api._mock_commands["version"] = AsyncMock(return_value=[version, 0, 0]) + api._mock_commands["customFrame"] = AsyncMock( + return_value=[t.EmberStatus.LIBRARY_NOT_PRESENT, b""] + ) api._command = AsyncMock(side_effect=mock_command) return api @@ -320,6 +323,7 @@ async def test_ezsp_newer_version(ezsp_f): ( "mfg_board_name", "mfg_string", + "xncp_build_string", "value_version_info", "expected", ), @@ -327,39 +331,52 @@ async def test_ezsp_newer_version(ezsp_f): ( (b"\xfe\xff\xff\xff",), (b"Manufacturer\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b"\x01\x02\x03\x04\x05\x06"), ("Manufacturer", "0xFE", "3.4.5.6 build 513"), ), ( (b"\xfe\xff\xff\xff",), (b"Manufacturer\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.ERR_FATAL, b"\x01\x02\x03\x04\x05\x06"), ("Manufacturer", "0xFE", None), ), ( (b"SkyBlue v0.1\x00\xff\xff\xff",), (b"Nabu Casa\x00\xff\xff\xff\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b"\xbf\x00\x07\x01\x00\x00\xaa"), ("Nabu Casa", "SkyBlue v0.1", "7.1.0.0 build 191"), ), ( (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",), (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b"\xbf\x00\x07\x01\x00\x00\xaa"), (None, None, "7.1.0.0 build 191"), ), ( (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",), (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b")\x01\x06\n\x03\x00\xaa"), (None, None, "6.10.3.0 build 297"), ), + ( + (b"SkyBlue v0.1\x00\xff\xff\xff",), + (b"Nabu Casa\x00\xff\xff\xff\xff\xff\xff",), + ("special build",), + (t.EmberStatus.SUCCESS, b"\xbf\x00\x07\x01\x00\x00\xaa"), + ("Nabu Casa", "SkyBlue v0.1", "7.1.0.0 build 191 (special build)"), + ), ], ) async def test_board_info( ezsp_f, mfg_board_name: bytes, mfg_string: bytes, + xncp_build_string: str | Exception, value_version_info: tuple[t.EmberStatus, bytes], expected: tuple[str | None, str | None, str], ): @@ -381,7 +398,7 @@ async def replacement(command_name, tokenId=None, valueId=None): ("getValue", t.EzspValueId.VALUE_VERSION_INFO): value_version_info, } ), - ): + ), patch.object(ezsp_f, "xncp_get_build_string", side_effect=xncp_build_string): mfg, brd, ver = await ezsp_f.get_board_info() assert (mfg, brd, ver) == expected @@ -429,6 +446,28 @@ async def _mock_cmd(*args, **kwargs): await ezsp_f.leaveNetwork(timeout=0.01) +async def test_xncp_token_override(ezsp_f): + ezsp_f.getMfgToken = AsyncMock(return_value=[b"firmware value"]) + ezsp_f.xncp_get_mfg_token_override = AsyncMock(return_value=b"xncp value") + + # Without firmware support, the XNCP command isn't sent + assert ( + await ezsp_f.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ) == b"firmware value" + + # With firmware support, it is + ezsp_f._xncp_features |= xncp.FirmwareFeatures.MFG_TOKEN_OVERRIDES + assert ( + await ezsp_f.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ) == b"xncp value" + + # Tokens without overrides are still read normally + ezsp_f.xncp_get_mfg_token_override.side_effect = InvalidCommandError + assert ( + await ezsp_f.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ) == b"firmware value" + + @pytest.mark.parametrize( "value, expected_result", [ @@ -588,7 +627,8 @@ async def test_write_custom_eui64_rcp(ezsp_f): @patch.object(EZSP, "version", new_callable=AsyncMock) @patch.object(EZSP, "reset", new_callable=AsyncMock) -async def test_ezsp_init_zigbeed(reset_mock, version_mock): +@patch.object(EZSP, "get_xncp_features", new_callable=AsyncMock) +async def test_ezsp_init_zigbeed(xncp_mock, reset_mock, version_mock): """Test initialize method with a received startup reset frame.""" ezsp = make_ezsp( config={ @@ -609,8 +649,9 @@ async def test_ezsp_init_zigbeed(reset_mock, version_mock): @patch.object(EZSP, "version", new_callable=AsyncMock) @patch.object(EZSP, "reset", new_callable=AsyncMock) +@patch.object(EZSP, "get_xncp_features", new_callable=AsyncMock) @patch("bellows.ezsp.NETWORK_COORDINATOR_STARTUP_RESET_WAIT", 0.01) -async def test_ezsp_init_zigbeed_timeout(reset_mock, version_mock): +async def test_ezsp_init_zigbeed_timeout(reset_mock, xncp_mock, version_mock): """Test initialize method with a received startup reset frame.""" ezsp = make_ezsp( config={ diff --git a/tests/test_xncp.py b/tests/test_xncp.py new file mode 100644 index 00000000..60a7daee --- /dev/null +++ b/tests/test_xncp.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, call + +import pytest + +from bellows.exception import InvalidCommandError +from bellows.ezsp import EZSP, xncp +import bellows.types as t + +from tests.test_ezsp import ezsp_f + + +async def test_xncp_failure(ezsp_f: EZSP) -> None: + """Test XNCP failure.""" + + command = xncp.XncpCommand.from_payload( + xncp.GetSupportedFeaturesRsp(features=xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE) + ) + command.status = t.EmberStatus.ERR_FATAL + + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, # The frame itself encodes a status code + command.serialize(), + ] + ) + + with pytest.raises(InvalidCommandError): + await ezsp_f.xncp_get_supported_firmware_features() + + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()) + ] + + +async def test_xncp_get_supported_firmware_features(ezsp_f: EZSP) -> None: + """Test XNCP get_supported_firmware_features.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetSupportedFeaturesRsp( + features=xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE + ) + ).serialize(), + ] + ) + + assert ( + await ezsp_f.xncp_get_supported_firmware_features() + ) == xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()) + ] + + +async def test_xncp_get_build_string(ezsp_f: EZSP) -> None: + """Test XNCP get_build_string.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetBuildStringRsp(build_string="Some complex string 🦜".encode()) + ).serialize(), + ] + ) + + assert await ezsp_f.xncp_get_build_string() == "Some complex string 🦜" + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetBuildStringReq()).serialize()) + ] + + +async def test_xncp_set_manual_source_route(ezsp_f: EZSP) -> None: + """Test XNCP set_manual_source_route.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + ( + xncp.XncpCommand.from_payload(xncp.SetSourceRouteRsp()).serialize() + + b"some extra data" + ), + ] + ) + + await ezsp_f.xncp_set_manual_source_route( + destination=0x1234, route=[0x5678, 0xABCD] + ) + assert customFrame.mock_calls == [ + call( + xncp.XncpCommand.from_payload( + xncp.SetSourceRouteReq( + destination=0x1234, source_route=[0x5678, 0xABCD] + ) + ).serialize() + ) + ] + + +async def test_xncp_get_mfg_token_override(ezsp_f: EZSP) -> None: + """Test XNCP get_mfg_token_override.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetMfgTokenOverrideRsp(value=b"value") + ).serialize(), + ] + ) + + await ezsp_f.xncp_get_mfg_token_override(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + assert customFrame.mock_calls == [ + call( + xncp.XncpCommand.from_payload( + xncp.GetMfgTokenOverrideReq(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ).serialize() + ) + ] + + +async def test_xncp_get_flow_control_type(ezsp_f: EZSP) -> None: + """Test XNCP get_flow_control_type.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetFlowControlTypeRsp( + flow_control_type=xncp.FlowControlType.Hardware + ) + ).serialize(), + ] + ) + + assert await ezsp_f.xncp_get_flow_control_type() == xncp.FlowControlType.Hardware + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetFlowControlTypeReq()).serialize()) + ]