Skip to content

Commit

Permalink
Support firmware extensions (#611)
Browse files Browse the repository at this point in the history
* Support firmware extensions

* Fix startup

* Expand the custom command protocol

* Fix existing unit tests

* Handle empty `customFrame` response

* Support setting source routes as well

* Rename custom commands to XNCP and support board string overrides

* Use a generic interface to override manufacturing tokens

* Hide firmware-level manual source routing behind `manual_source_routing`

* Fix ruff issue

* Remove erroneous `getMulticastTableEntry(0)`

* Fix unit tests

* Support `GET_BUILD_STRING_REQ`

* Fix up after rebase

* Add support for the `GET_FLOW_CONTROL_TYPE` XNCP command

* Fix up after rebase

* Coverage

* Unit test XNCP commands

* Move XNCP tests into their own module

* Get coverage up to 100%
  • Loading branch information
puddly authored Nov 9, 2024
1 parent ecce1ba commit 06eb3d9
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 47 deletions.
9 changes: 9 additions & 0 deletions bellows/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
cv_boolean,
)

CONF_BELLOWS_CONFIG = "bellows_config"
CONF_MANUAL_SOURCE_ROUTING = "manual_source_routing"

CONF_USE_THREAD = "use_thread"
CONF_EZSP_CONFIG = "ezsp_config"
CONF_EZSP_POLICIES = "ezsp_policies"
Expand All @@ -31,6 +34,12 @@
{vol.Optional(str): int}
),
vol.Optional(CONF_USE_THREAD, default=True): cv_boolean,
# The above config really should belong in here
vol.Optional(CONF_BELLOWS_CONFIG, default={}): vol.Schema(
{
vol.Optional(CONF_MANUAL_SOURCE_ROUTING, default=False): bool,
}
),
}
)

Expand Down
101 changes: 91 additions & 10 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import bellows.config as conf
from bellows.exception import EzspError, InvalidCommandError
from bellows.ezsp import xncp
from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig
from bellows.ezsp.xncp import FirmwareFeatures, FlowControlType
import bellows.types as t
import bellows.uart

Expand Down Expand Up @@ -62,6 +64,7 @@ def __init__(self, device_config: dict, application: Any | None = None):
self._callbacks = {}
self._ezsp_event = asyncio.Event()
self._ezsp_version = v4.EZSPv4.VERSION
self._xncp_features = FirmwareFeatures.NONE
self._gw = None
self._protocol = None
self._application = application
Expand Down Expand Up @@ -124,6 +127,7 @@ async def startup_reset(self) -> None:
await self.reset()

await self.version()
await self.get_xncp_features()

async def connect(self, *, use_thread: bool = True) -> None:
assert self._gw is None
Expand Down Expand Up @@ -167,13 +171,22 @@ async def version(self):
if ver != self.ezsp_version:
self._switch_protocol_version(ver)
await self._command("version", desiredProtocolVersion=ver)

LOGGER.debug(
"EZSP Stack Type: %s, Stack Version: %04x, Protocol version: %s",
("EZSP Stack Type: %s" ", Stack Version: %04x" ", Protocol version: %s"),
stack_type,
stack_version,
ver,
)

async def get_xncp_features(self) -> None:
try:
self._xncp_features = await self.xncp_get_supported_firmware_features()
except InvalidCommandError:
self._xncp_features = xncp.FirmwareFeatures.NONE

LOGGER.debug("XNCP features: %s", self._xncp_features)

async def disconnect(self):
self.stop_ezsp()
if self._gw:
Expand Down Expand Up @@ -308,11 +321,10 @@ async def get_board_info(
) -> tuple[str, str, str | None] | tuple[None, None, str | None]:
"""Return board info."""

tokens = {}
tokens: dict[t.EzspMfgTokenId, str | None] = {}

for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME):
(value,) = await self.getMfgToken(tokenId=token)
LOGGER.debug("Read %s token: %s", token.name, value)
for token_id in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME):
value = await self.get_mfg_token(token_id)

# Tokens are fixed-length and initially filled with \xFF but also can end
# with \x00
Expand All @@ -324,10 +336,7 @@ async def get_board_info(
except UnicodeDecodeError:
result = "0x" + value.hex().upper()

if not result:
result = None

tokens[token] = result
tokens[token_id] = result or None

(status, ver_info_bytes) = await self.getValue(
valueId=t.EzspValueId.VALUE_VERSION_INFO
Expand All @@ -342,6 +351,14 @@ async def get_board_info(
special, ver_info_bytes = t.uint8_t.deserialize(ver_info_bytes)
version = f"{major}.{minor}.{patch}.{special} build {build}"

try:
build_string = await self.xncp_get_build_string()
except InvalidCommandError:
build_string = None

if build_string:
version = f"{version} ({build_string})"

return (
tokens[t.EzspMfgTokenId.MFG_STRING],
tokens[t.EzspMfgTokenId.MFG_BOARD_NAME],
Expand Down Expand Up @@ -369,9 +386,23 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None:

return None

async def get_mfg_token(self, token: t.EzspMfgTokenId) -> bytes:
(value,) = await self.getMfgToken(tokenId=token)
LOGGER.debug("Read manufacturing token %s: %s", token.name, value)

override_value = None

if FirmwareFeatures.MFG_TOKEN_OVERRIDES in self._xncp_features:
with contextlib.suppress(InvalidCommandError):
override_value = await self.xncp_get_mfg_token_override(token)

LOGGER.debug("XNCP override token %s: %s", token.name, override_value)

return override_value or value

async def _get_mfg_custom_eui_64(self) -> t.EUI64 | None:
"""Get the custom EUI 64 manufacturing token, if it has a valid value."""
(data,) = await self.getMfgToken(tokenId=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64)
data = await self.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64)

# Manufacturing tokens do not exist in RCP firmware: all reads are empty
if not data:
Expand Down Expand Up @@ -616,3 +647,53 @@ async def write_config(self, config: dict) -> None:
status,
)
continue

async def send_xncp_frame(
self, payload: xncp.XncpCommandPayload
) -> xncp.XncpCommandPayload:
"""Send an XNCP frame."""
req_frame = xncp.XncpCommand.from_payload(payload)
LOGGER.debug("Sending XNCP frame: %s", req_frame)
status, data = await self.customFrame(req_frame.serialize())

if status != t.EmberStatus.SUCCESS:
raise InvalidCommandError("XNCP is not supported")

rsp_frame = xncp.XncpCommand.from_bytes(data)
LOGGER.debug("Received XNCP frame: %s", rsp_frame)

if rsp_frame.status != t.EmberStatus.SUCCESS:
raise InvalidCommandError(f"XNCP response error: {rsp_frame.status}")

return rsp_frame.payload

async def xncp_get_supported_firmware_features(self) -> xncp.FirmwareFeatures:
"""Get supported firmware extensions."""
rsp = await self.send_xncp_frame(xncp.GetSupportedFeaturesReq())
return rsp.features

async def xncp_set_manual_source_route(
self, destination: t.NWK, route: list[t.NWK]
) -> None:
"""Set a manual source route."""
await self.send_xncp_frame(
xncp.SetSourceRouteReq(
destination=destination,
source_route=route,
)
)

async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes:
"""Get manufacturing token override."""
rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token))
return rsp.value

async def xncp_get_build_string(self) -> str:
"""Get build string."""
rsp = await self.send_xncp_frame(xncp.GetBuildStringReq())
return rsp.build_string.decode("utf-8")

async def xncp_get_flow_control_type(self) -> FlowControlType:
"""Get flow control type."""
rsp = await self.send_xncp_frame(xncp.GetFlowControlTypeReq())
return rsp.flow_control_type
161 changes: 161 additions & 0 deletions bellows/ezsp/xncp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""Custom EZSP commands."""
from __future__ import annotations

import dataclasses
import logging
from typing import Callable

import zigpy.types as t

from bellows.types import EmberStatus, EzspMfgTokenId

_LOGGER = logging.getLogger(__name__)

COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {}
REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {}


def register_command(command_id: XncpCommandId) -> Callable[[type], type]:
def decorator(cls: type) -> type:
COMMANDS[command_id] = cls
REV_COMMANDS[cls] = command_id
return cls

return decorator


class Bytes(bytes):
def serialize(self) -> Bytes:
return self

@classmethod
def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]:
return cls(data), b""


class XncpCommandId(t.enum16):
GET_SUPPORTED_FEATURES_REQ = 0x0000
SET_SOURCE_ROUTE_REQ = 0x0001
GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002
GET_BUILD_STRING_REQ = 0x0003
GET_FLOW_CONTROL_TYPE_REQ = 0x0004

GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000
SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000
GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000
GET_BUILD_STRING_RSP = GET_BUILD_STRING_REQ | 0x8000
GET_FLOW_CONTROL_TYPE_RSP = GET_FLOW_CONTROL_TYPE_REQ | 0x8000

UNKNOWN = 0xFFFF


@dataclasses.dataclass
class XncpCommand:
command_id: XncpCommandId
status: EmberStatus
payload: XncpCommandPayload

@classmethod
def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand:
return cls(
command_id=REV_COMMANDS[type(payload)],
status=EmberStatus.SUCCESS,
payload=payload,
)

@classmethod
def from_bytes(cls, data: bytes) -> XncpCommand:
command_id, data = XncpCommandId.deserialize(data)
status, data = EmberStatus.deserialize(data)
payload, rest = COMMANDS[command_id].deserialize(data)

if rest:
_LOGGER.debug("Unparsed data remains after %s frame: %s", payload, rest)

return cls(command_id=command_id, status=status, payload=payload)

def serialize(self) -> Bytes:
return (
self.command_id.serialize()
+ self.status.serialize()
+ self.payload.serialize()
)


class FirmwareFeatures(t.bitmap32):
NONE = 0

# The firmware passes through all group traffic, regardless of group membership
MEMBER_OF_ALL_GROUPS = 1 << 0

# Source routes can be overridden by the application
MANUAL_SOURCE_ROUTE = 1 << 1

# The firmware supports overriding some manufacturing tokens
MFG_TOKEN_OVERRIDES = 1 << 2

# The firmware contains a free-form build string
BUILD_STRING = 1 << 3

# The flow control type (software or hardware) can be queried
FLOW_CONTROL_TYPE = 1 << 4


class XncpCommandPayload(t.Struct):
pass


class FlowControlType(t.enum8):
Software = 0x00
Hardware = 0x01


@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_REQ)
class GetSupportedFeaturesReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_RSP)
class GetSupportedFeaturesRsp(XncpCommandPayload):
features: FirmwareFeatures


@register_command(XncpCommandId.SET_SOURCE_ROUTE_REQ)
class SetSourceRouteReq(XncpCommandPayload):
destination: t.NWK
source_route: t.List[t.NWK]


@register_command(XncpCommandId.SET_SOURCE_ROUTE_RSP)
class SetSourceRouteRsp(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_REQ)
class GetMfgTokenOverrideReq(XncpCommandPayload):
token: EzspMfgTokenId


@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_RSP)
class GetMfgTokenOverrideRsp(XncpCommandPayload):
value: Bytes


@register_command(XncpCommandId.GET_BUILD_STRING_REQ)
class GetBuildStringReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_BUILD_STRING_RSP)
class GetBuildStringRsp(XncpCommandPayload):
build_string: Bytes


@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_REQ)
class GetFlowControlTypeReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_RSP)
class GetFlowControlTypeRsp(XncpCommandPayload):
flow_control_type: FlowControlType
Loading

0 comments on commit 06eb3d9

Please sign in to comment.