Skip to content

Commit

Permalink
EZSP v13 (#603)
Browse files Browse the repository at this point in the history
* Create EZSPv13 definitions

* [WIP] Key reading rewrite

* Abstract adding transient link keys

* Fix `setChildData` schema

* Remove extraneous `key = ...`

* Drop `EmberEUI64` alias

* Drop `EmberKeyData` alias

* Log when frames have unparsed trailing data

* Add undocumented `timeout_remaining` field to `EmberChildData`

* Fix `getNetworkKeyInfo` command format

* Ensure v13 types override v12

* Speed up key table scanning by reading the table size

* Speed up reading of address tables

* Set frame counters before setting security state

* Ensure child restoration works

* Ensure unit tests run on 3.8

* Use v12 config as base

* `setSourceRoute` was only removed from the docs, it still exists?

* Make sure proper protocol version is being used

* Fix existing unit tests

* Add a test for v13

* Use `CONFIG_KEY_TABLE_SIZE` for all versions

* Mock network state loading for v13

* Fix `test_ezsp_v13` command tests

* Ensure child table is restored

* Increase coverage
  • Loading branch information
puddly authored Dec 30, 2023
1 parent d917e99 commit f9f4c3f
Show file tree
Hide file tree
Showing 52 changed files with 1,635 additions and 704 deletions.
10 changes: 5 additions & 5 deletions bellows/cli/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
ATTR_KEY_FRAME_COUNTER_OUT: cv_hex,
ATTR_KEY_FRAME_COUNTER_IN: cv_hex,
ATTR_KEY_SEQ: cv_hex,
ATTR_KEY_PARTNER: vol.All(str, t.EmberEUI64.convert),
ATTR_KEY_PARTNER: vol.All(str, t.EUI64.convert),
}
)
SCHEMA_BAK = vol.Schema(
{
ATTR_CHANNELS: cv_hex,
ATTR_NODE_TYPE: cv_hex,
ATTR_NODE_ID: cv_hex,
ATTR_NODE_EUI64: vol.All(str, t.EmberEUI64.convert),
ATTR_NODE_EUI64: vol.All(str, t.EUI64.convert),
ATTR_NWK_UPDATE_ID: cv_hex,
ATTR_PAN_ID: cv_hex,
ATTR_RADIO_CHANNEL: cv_hex,
Expand Down Expand Up @@ -214,7 +214,7 @@ async def _restore(
return

if update_eui64_token:
ncp_eui64 = t.EmberEUI64(backup_data[ATTR_NODE_EUI64]).serialize()
ncp_eui64 = t.EUI64(backup_data[ATTR_NODE_EUI64]).serialize()
(status,) = await ezsp.setMfgToken(
t.EzspMfgTokenId.MFG_CUSTOM_EUI_64, ncp_eui64
)
Expand Down Expand Up @@ -244,7 +244,7 @@ async def _restore(
init_sec_state.bitmask |= (
t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY
)
init_sec_state.preconfiguredKey = t.EmberKeyData(os.urandom(16))
init_sec_state.preconfiguredKey = t.KeyData(os.urandom(16))

(status,) = await ezsp.setInitialSecurityState(init_sec_state)
LOGGER.debug("Set initial security state: %s", status)
Expand Down Expand Up @@ -344,5 +344,5 @@ async def _update_nwk_id(ezsp, nwk_update_id):

def is_well_known_key(tc_link_key):
"""Return True if this is a well known key."""
well_known_key = t.EmberKeyData.deserialize(b"ZigBeeAlliance09")[0]
well_known_key = t.KeyData.deserialize(b"ZigBeeAlliance09")[0]
return tc_link_key == well_known_key
2 changes: 1 addition & 1 deletion bellows/cli/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ZigbeeNodeParamType(click.ParamType):
def convert(self, value, param, ctx):
if ":" not in value or len(value) != 23:
self.fail("Node format should be a 8 byte hex string separated by ':'")
return t.EmberEUI64.convert(value)
return t.EUI64.convert(value)


def background(f):
Expand Down
26 changes: 13 additions & 13 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import bellows.types as t
import bellows.uart

from . import v4, v5, v6, v7, v8, v9, v10, v11, v12
from . import v4, v5, v6, v7, v8, v9, v10, v11, v12, v13

EZSP_LATEST = v12.EZSPv12.VERSION
EZSP_LATEST = v13.EZSPv13.VERSION
LOGGER = logging.getLogger(__name__)
MTOR_MIN_INTERVAL = 60
MTOR_MAX_INTERVAL = 3600
Expand All @@ -55,6 +55,7 @@ class EZSP:
v10.EZSPv10.VERSION: v10.EZSPv10,
v11.EZSPv11.VERSION: v11.EZSPv11,
v12.EZSPv12.VERSION: v12.EZSPv12,
v13.EZSPv13.VERSION: v13.EZSPv13,
}

def __init__(self, device_config: dict):
Expand Down Expand Up @@ -154,20 +155,19 @@ async def reset(self):
self.start_ezsp()

def _switch_protocol_version(self, version: int) -> None:
LOGGER.debug("Switching to EZSP protocol version %d", version)
self._ezsp_version = version
LOGGER.debug("Switching to EZSP protocol version %d", self.ezsp_version)

try:
protcol_cls = self._BY_VERSION[version]
except KeyError:
if version not in self._BY_VERSION:
LOGGER.warning(
"Protocol version %s is not supported, using version %s instead",
version,
EZSP_LATEST,
)
protcol_cls = self._BY_VERSION[EZSP_LATEST]
# We replace the protocol object but keep the version correct
version = EZSP_LATEST

self._protocol = protcol_cls(self.handle_callback, self._gw)
self._protocol = self._BY_VERSION[version](self.handle_callback, self._gw)

async def version(self):
ver, stack_type, stack_version = await self._command(
Expand Down Expand Up @@ -386,24 +386,24 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None:
return None

if status == t.EmberStatus.SUCCESS:
nv3_restored_eui64, _ = t.EmberEUI64.deserialize(data)
nv3_restored_eui64, _ = t.EUI64.deserialize(data)
LOGGER.debug("NV3 restored EUI64: %s=%s", key, nv3_restored_eui64)

return key

return None

async def _get_mfg_custom_eui_64(self) -> t.EmberEUI64 | None:
async def _get_mfg_custom_eui_64(self) -> t.EUI64 | None:
"""Get the custom EUI 64 manufacturing token, if it has a valid value."""
(data,) = await self.getMfgToken(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64)

# Manufacturing tokens do not exist in RCP firmware: all reads are empty
if not data:
raise ValueError("Firmware does not support MFG_CUSTOM_EUI_64 token")

mfg_custom_eui64, _ = t.EmberEUI64.deserialize(data)
mfg_custom_eui64, _ = t.EUI64.deserialize(data)

if mfg_custom_eui64 == t.EmberEUI64.convert("FF:FF:FF:FF:FF:FF:FF:FF"):
if mfg_custom_eui64 == t.EUI64.convert("FF:FF:FF:FF:FF:FF:FF:FF"):
return None

return mfg_custom_eui64
Expand All @@ -429,7 +429,7 @@ async def reset_custom_eui64(self) -> None:
(status,) = await self.setTokenData(
nv3_eui64_key,
0,
t.LVBytes32(t.EmberEUI64.convert("FF:FF:FF:FF:FF:FF:FF:FF").serialize()),
t.LVBytes32(t.EUI64.convert("FF:FF:FF:FF:FF:FF:FF:FF").serialize()),
)
assert status == t.EmberStatus.SUCCESS

Expand Down
1 change: 1 addition & 0 deletions bellows/ezsp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,5 @@ class ValueConfig:
10: DEFAULT_CONFIG_NEW,
11: DEFAULT_CONFIG_NEW,
12: DEFAULT_CONFIG_NEW,
13: DEFAULT_CONFIG_NEW,
}
9 changes: 9 additions & 0 deletions bellows/ezsp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from bellows.config import CONF_EZSP_POLICIES
from bellows.exception import InvalidCommandError
import bellows.types as t
from bellows.typing import GatewayType

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -54,6 +55,11 @@ def _ezsp_frame_tx(self, name: str) -> bytes:
async def pre_permit(self, time_s: int) -> None:
"""Schedule task before allowing new joins."""

async def add_transient_link_key(
self, ieee: t.EUI64, key: t.KeyData
) -> t.EmberStatus:
"""Add a transient link key."""

async def command(self, name, *args) -> Any:
"""Serialize command and send it."""
LOGGER.debug("Send command %s: %s", name, args)
Expand Down Expand Up @@ -103,6 +109,9 @@ def __call__(self, data: bytes) -> None:

LOGGER.debug("Application frame received %s: %s", frame_name, result)

if data:
LOGGER.debug("Frame contains trailing data: %s", data)

if sequence in self._awaiting:
expected_id, schema, future = self._awaiting.pop(sequence)
try:
Expand Down
Loading

0 comments on commit f9f4c3f

Please sign in to comment.