diff --git a/test/telegram_tests/apci_test.py b/test/telegram_tests/apci_test.py index 8ddc3cdbd..f8d857725 100644 --- a/test/telegram_tests/apci_test.py +++ b/test/telegram_tests/apci_test.py @@ -24,6 +24,10 @@ IndividualAddressSerialResponse, IndividualAddressSerialWrite, IndividualAddressWrite, + MemoryExtendedRead, + MemoryExtendedReadResponse, + MemoryExtendedWrite, + MemoryExtendedWriteResponse, MemoryRead, MemoryResponse, MemoryWrite, @@ -298,6 +302,241 @@ def test_str(self): assert str(payload) == '' +class TestMemoryExtendedWrite: + """Test class for MemoryExtendedWrite objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + assert payload.calculated_length() == 8 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx( + bytes([0x01, 0xFB, 0x03, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC]) + ) + + assert payload == MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert payload.to_knx() == bytes( + [0x01, 0xFB, 0x03, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC] + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedWrite( + address=0xAABBCCDD, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedWrite( + address=0x123456, count=256, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Count.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert ( + str(payload) + == '' + ) + + +class TestMemoryExtendedWriteResponse: + """Test class for MemoryExtendedWrite objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0x123456) + assert payload.calculated_length() == 5 + + def test_calculated_lengt_with_confirmation_data(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedWriteResponse( + return_code=0, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + assert payload.calculated_length() == 7 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx(bytes([0x01, 0xFC, 0x00, 0x12, 0x34, 0x56])) + + assert payload == MemoryExtendedWriteResponse( + return_code=0, address=0x123456, confirmation_data=b"" + ) + + def test_from_knx_with_confirmation_data(self): + """Test the from_knx method.""" + payload = APCI.from_knx(bytes([0x01, 0xFC, 0x01, 0x12, 0x34, 0x56, 0xAA, 0xBB])) + + assert payload == MemoryExtendedWriteResponse( + return_code=1, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0x123456) + + assert payload.to_knx() == bytes([0x01, 0xFC, 0x00, 0x12, 0x34, 0x56]) + + def test_to_knx_with_confirmation_data(self): + """Test the to_knx method.""" + payload = MemoryExtendedWriteResponse( + return_code=1, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + + assert payload.to_knx() == bytes( + [0x01, 0xFC, 0x01, 0x12, 0x34, 0x56, 0xAA, 0xBB] + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0xAABBCCDD) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedWriteResponse(return_code=0x100, address=0x123456) + + with pytest.raises(ConversionError, match=r".*Return code.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0x123456) + + assert ( + str(payload) + == '' + ) + + def test_str_with_confirmation_data(self): + """Test the __str__ method.""" + payload = MemoryExtendedWriteResponse( + return_code=1, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + + assert ( + str(payload) + == '' + ) + + +class TestMemoryExtendedRead: + """Test class for MemoryExtendedRead objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedRead(address=0x123456, count=3) + assert payload.calculated_length() == 5 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx(bytes([0x01, 0xFD, 0x03, 0x12, 0x34, 0x56])) + + assert payload == MemoryExtendedRead(address=0x123456, count=3) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedRead(address=0x123456, count=3) + + assert payload.to_knx() == bytes([0x01, 0xFD, 0x03, 0x12, 0x34, 0x56]) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedRead(address=0xAABBCCDD, count=3) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedRead(address=0x123456, count=256) + + with pytest.raises(ConversionError, match=r".*Count.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedRead(address=0x123456, count=3) + + assert str(payload) == '' + + +class TestMemoryExtendedReadResponse: + """Test class for MemoryExtendedReadResponse objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + assert payload.calculated_length() == 8 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx( + bytes([0x01, 0xFE, 0x00, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC]) + ) + + assert payload == MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert payload.to_knx() == bytes( + [0x01, 0xFE, 0x00, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC] + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0xAABBCCDD, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedReadResponse( + return_code=0x100, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Return code.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert ( + str(payload) + == '' + ) + + class TestMemoryRead: """Test class for MemoryRead objects.""" diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index 80e183d17..d5da00ee8 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -50,6 +50,11 @@ class APCIService(Enum): ADC_READ = 0x0180 ADC_RESPONSE = 0x1C0 + MEMORY_EXTENDED_WRITE = 0x1FB + MEMORY_EXTENDED_WRITE_RESPONSE = 0x1FC + MEMORY_EXTENDED_READ = 0x1FD + MEMORY_EXTENDED_READ_RESPONSE = 0x1FE + MEMORY_READ = 0x0200 MEMORY_RESPONSE = 0x0240 MEMORY_WRITE = 0x0280 @@ -158,6 +163,14 @@ def from_knx(cls, raw: bytes) -> APCI: if service == APCIService.ADC_READ.value: return ADCRead.from_knx(raw) if service == APCIService.ADC_RESPONSE.value: + if apci == APCIService.MEMORY_EXTENDED_WRITE.value: + return MemoryExtendedWrite.from_knx(raw) + if apci == APCIService.MEMORY_EXTENDED_WRITE_RESPONSE.value: + return MemoryExtendedWriteResponse.from_knx(raw) + if apci == APCIService.MEMORY_EXTENDED_READ.value: + return MemoryExtendedRead.from_knx(raw) + if apci == APCIService.MEMORY_EXTENDED_READ_RESPONSE.value: + return MemoryExtendedReadResponse.from_knx(raw) return ADCResponse.from_knx(raw) if service == APCIService.MEMORY_READ.value: return MemoryRead.from_knx(raw) @@ -492,6 +505,215 @@ def __str__(self) -> str: return f'' +class MemoryExtendedWrite(APCI): + """ + MemoryExtendedWrite service. + + Payload indicates address (16 MiB), count (1-255 bytes) and data. + """ + + CODE = APCIService.MEMORY_EXTENDED_WRITE + + def __init__(self, address: int, data: bytes, count: int | None = None) -> None: + """Initialize a new instance of MemoryExtendedWrite.""" + if count is None: + count = len(data) + self.address = address + self.count = count + self.data = data + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 5 + len(self.data) + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedWrite: + """Parse/deserialize from KNX/IP raw data.""" + count = raw[2] + address = int.from_bytes(raw[3:6], "big") + data = raw[6:] + + return cls( + count=count, + address=address, + data=data, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + if not 0 <= self.count <= 250: + raise ConversionError("Count out of range.") + + size = len(self.data) + payload = struct.pack(f"!BI{size}s", self.count, self.address, self.data) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return f'' + + +class MemoryExtendedWriteResponse(APCI): + """ + MemoryExtendedWriteResponse service. + + Payload indicates return code, address (16 MiB) and confirmation data. + """ + + CODE = APCIService.MEMORY_EXTENDED_WRITE_RESPONSE + + def __init__( + self, return_code: int, address: int, confirmation_data: bytes = b"" + ) -> None: + """Initialize a new instance of MemoryExtendedWriteResponse.""" + self.return_code = return_code + self.address = address + self.confirmation_data = confirmation_data + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 5 + len(self.confirmation_data) + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedWriteResponse: + """Parse/deserialize from KNX/IP raw data.""" + return_code = raw[2] + address = int.from_bytes(raw[3:6], "big") + confirmation_data = raw[6:] + + return cls( + return_code=return_code, + address=address, + confirmation_data=confirmation_data, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + if not 0 <= self.return_code <= 255: + raise ConversionError("Return code out of range.") + + size = len(self.confirmation_data) + payload = struct.pack( + f"!BI{size}s", self.return_code, self.address, self.confirmation_data + ) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return f'' + + +class MemoryExtendedRead(APCI): + """ + MemoryExtendedRead service. + + Payload indicates count and address (16 MiB). + """ + + CODE = APCIService.MEMORY_EXTENDED_READ + + def __init__(self, count: int, address: int) -> None: + """Initialize a new instance of MemoryExtendedRead.""" + self.count = count + self.address = address + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 5 + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedRead: + """Parse/deserialize from KNX/IP raw data.""" + count = raw[2] + address = int.from_bytes(raw[3:6], "big") + + return cls( + count=count, + address=address, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + if not 0 <= self.count <= 250: + raise ConversionError("Count out of range.") + + payload = struct.pack("!BI", self.count, self.address) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return ( + f'' + ) + + +class MemoryExtendedReadResponse(APCI): + """ + MemoryExtendedReadResponse service. + + Payload indicates return code, address (16 MiB) and data. + """ + + CODE = APCIService.MEMORY_EXTENDED_READ_RESPONSE + + def __init__(self, return_code: int, address: int, data: bytes = b"") -> None: + """Initialize a new instance of MemoryExtendedReadResponse.""" + self.return_code = return_code + self.address = address + self.data = data + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 5 + len(self.data) + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedReadResponse: + """Parse/deserialize from KNX/IP raw data.""" + return_code = raw[2] + address = int.from_bytes(raw[3:6], "big") + data = raw[6:] + + return cls( + return_code=return_code, + address=address, + data=data, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + + if not 0 <= self.return_code <= 255: + raise ConversionError("Return code out of range.") + + size = len(self.data) + payload = struct.pack(f"!BI{size}s", self.return_code, self.address, self.data) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return f'' + + class MemoryRead(APCI): """ MemoryRead service.