From 412159b533471cb03c50d53954fc0fbfb3abfc51 Mon Sep 17 00:00:00 2001 From: Robin GROSS Date: Mon, 27 Nov 2023 15:41:21 +0100 Subject: [PATCH 1/6] add memory extended services --- test/telegram_tests/apci_test.py | 239 +++++++++++++++++++++++++++++++ xknx/telegram/apci.py | 227 +++++++++++++++++++++++++++++ 2 files changed, 466 insertions(+) diff --git a/test/telegram_tests/apci_test.py b/test/telegram_tests/apci_test.py index 8ddc3cdbd..3772b4315 100644 --- a/test/telegram_tests/apci_test.py +++ b/test/telegram_tests/apci_test.py @@ -24,6 +24,10 @@ IndividualAddressSerialResponse, IndividualAddressSerialWrite, IndividualAddressWrite, + MemoryExtendedRead, + MemoryExtendedReadResponse, + MemoryExtendedWrite, + MemoryExtendedWriteResponse, MemoryRead, MemoryResponse, MemoryWrite, @@ -298,6 +302,241 @@ def test_str(self): assert str(payload) == '' +class TestMemoryExtendedWrite: + """Test class for MemoryExtendedWrite objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + assert payload.calculated_length() == 9 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx( + bytes([0x01, 0xFB, 0x03, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC]) + ) + + assert payload == MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert payload.to_knx() == bytes( + [0x01, 0xFB, 0x03, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC] + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedWrite( + address=0xAABBCCDD, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedWrite( + address=0x123456, count=256, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Count.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedWrite( + address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert ( + str(payload) + == '' + ) + + +class TestMemoryExtendedWriteResponse: + """Test class for MemoryExtendedWrite objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0x123456) + assert payload.calculated_length() == 6 + + def test_calculated_lengt_with_confirmation_data(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedWriteResponse( + return_code=0, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + assert payload.calculated_length() == 8 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx(bytes([0x01, 0xFC, 0x00, 0x12, 0x34, 0x56])) + + assert payload == MemoryExtendedWriteResponse( + return_code=0, address=0x123456, confirmation_data=b"" + ) + + def test_from_knx_with_confirmation_data(self): + """Test the from_knx method.""" + payload = APCI.from_knx(bytes([0x01, 0xFC, 0x01, 0x12, 0x34, 0x56, 0xAA, 0xBB])) + + assert payload == MemoryExtendedWriteResponse( + return_code=1, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0x123456) + + assert payload.to_knx() == bytes([0x01, 0xFC, 0x00, 0x12, 0x34, 0x56]) + + def test_to_knx_with_confirmation_data(self): + """Test the to_knx method.""" + payload = MemoryExtendedWriteResponse( + return_code=1, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + + assert payload.to_knx() == bytes( + [0x01, 0xFC, 0x01, 0x12, 0x34, 0x56, 0xAA, 0xBB] + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0xAABBCCDD) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedWriteResponse(return_code=0x100, address=0x123456) + + with pytest.raises(ConversionError, match=r".*Return code.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedWriteResponse(return_code=0, address=0x123456) + + assert ( + str(payload) + == '' + ) + + def test_str_with_confirmation_data(self): + """Test the __str__ method.""" + payload = MemoryExtendedWriteResponse( + return_code=1, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) + ) + + assert ( + str(payload) + == '' + ) + + +class TestMemoryExtendedRead: + """Test class for MemoryExtendedRead objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedRead(address=0x123456, count=3) + assert payload.calculated_length() == 6 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx(bytes([0x01, 0xFD, 0x03, 0x12, 0x34, 0x56])) + + assert payload == MemoryExtendedRead(address=0x123456, count=3) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedRead(address=0x123456, count=3) + + assert payload.to_knx() == bytes([0x01, 0xFD, 0x03, 0x12, 0x34, 0x56]) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedRead(address=0xAABBCCDD, count=3) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedRead(address=0x123456, count=256) + + with pytest.raises(ConversionError, match=r".*Count.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedRead(address=0x123456, count=3) + + assert str(payload) == '' + + +class TestMemoryExtendedReadResponse: + """Test class for MemoryExtendedReadResponse objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + assert payload.calculated_length() == 9 + + def test_from_knx(self): + """Test the from_knx method.""" + payload = APCI.from_knx( + bytes([0x01, 0xFE, 0x00, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC]) + ) + + assert payload == MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert payload.to_knx() == bytes( + [0x01, 0xFE, 0x00, 0x12, 0x34, 0x56, 0xAA, 0xBB, 0xCC] + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0xAABBCCDD, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Address.*"): + payload.to_knx() + + payload = MemoryExtendedReadResponse( + return_code=0x100, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with pytest.raises(ConversionError, match=r".*Return code.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = MemoryExtendedReadResponse( + return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + assert ( + str(payload) + == '' + ) + + class TestMemoryRead: """Test class for MemoryRead objects.""" diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index 80e183d17..4b77cdfef 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -50,6 +50,11 @@ class APCIService(Enum): ADC_READ = 0x0180 ADC_RESPONSE = 0x1C0 + MEMORY_EXTENDED_WRITE = 0x1FB + MEMORY_EXTENDED_WRITE_RESPONSE = 0x1FC + MEMORY_EXTENDED_READ = 0x1FD + MEMORY_EXTENDED_READ_RESPONSE = 0x1FE + MEMORY_READ = 0x0200 MEMORY_RESPONSE = 0x0240 MEMORY_WRITE = 0x0280 @@ -158,6 +163,14 @@ def from_knx(cls, raw: bytes) -> APCI: if service == APCIService.ADC_READ.value: return ADCRead.from_knx(raw) if service == APCIService.ADC_RESPONSE.value: + if apci == APCIService.MEMORY_EXTENDED_WRITE.value: + return MemoryExtendedWrite.from_knx(raw) + if apci == APCIService.MEMORY_EXTENDED_WRITE_RESPONSE.value: + return MemoryExtendedWriteResponse.from_knx(raw) + if apci == APCIService.MEMORY_EXTENDED_READ.value: + return MemoryExtendedRead.from_knx(raw) + if apci == APCIService.MEMORY_EXTENDED_READ_RESPONSE.value: + return MemoryExtendedReadResponse.from_knx(raw) return ADCResponse.from_knx(raw) if service == APCIService.MEMORY_READ.value: return MemoryRead.from_knx(raw) @@ -492,6 +505,220 @@ def __str__(self) -> str: return f'' +class MemoryExtendedWrite(APCI): + """ + MemoryExtendedWrite service. + + Payload indicates address (16 MiB), count (1-255 bytes) and data. + """ + + CODE = APCIService.MEMORY_EXTENDED_WRITE + + def __init__(self, address: int, data: bytes, count: int | None = None) -> None: + """Initialize a new instance of MemoryExtendedWrite.""" + if count is None: + count = len(data) + self.address = address + self.count = count + self.data = data + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 6 + len(self.data) + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedWrite: + """Parse/deserialize from KNX/IP raw data.""" + size = len(raw) - 6 + + # inject [0x00] before 3 bytes address to enable unsigned int unpack + count, address, data = struct.unpack( + f"!BI{size}s", bytes([raw[2], 0x00]) + raw[3:] + ) + return cls( + count=count, + address=address, + data=data, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + if not 0 <= self.count <= 250: + raise ConversionError("Count out of range.") + + size = len(self.data) + payload = struct.pack(f"!BI{size}s", self.count, self.address, self.data) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return f'' + + +class MemoryExtendedWriteResponse(APCI): + """ + MemoryExtendedWriteResponse service. + + Payload indicates return code, address (16 MiB) and confirmation data. + """ + + CODE = APCIService.MEMORY_EXTENDED_WRITE_RESPONSE + + def __init__( + self, return_code: int, address: int, confirmation_data: bytes = b"" + ) -> None: + """Initialize a new instance of MemoryExtendedWriteResponse.""" + self.return_code = return_code + self.address = address + self.confirmation_data = confirmation_data + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 6 + len(self.confirmation_data) + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedWriteResponse: + """Parse/deserialize from KNX/IP raw data.""" + size = len(raw) - 6 + + # inject [0x00] before 3 bytes address to enable unsigned int unpack + return_code, address, confirmation_data = struct.unpack( + f"!BI{size}s", bytes([raw[2], 0x00]) + raw[3:] + ) + return cls( + return_code=return_code, + address=address, + confirmation_data=confirmation_data, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + if not 0 <= self.return_code <= 255: + raise ConversionError("Return code out of range.") + + size = len(self.confirmation_data) + payload = struct.pack( + f"!BI{size}s", self.return_code, self.address, self.confirmation_data + ) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return f'' + + +class MemoryExtendedRead(APCI): + """ + MemoryExtendedRead service. + + Payload indicates count and address (16 MiB). + """ + + CODE = APCIService.MEMORY_EXTENDED_READ + + def __init__(self, count: int, address: int) -> None: + """Initialize a new instance of MemoryExtendedRead.""" + self.count = count + self.address = address + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 6 + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedRead: + """Parse/deserialize from KNX/IP raw data.""" + # inject [0x00] before 3 bytes address to enable unsigned int unpack + count, address = struct.unpack("!BI", bytes([raw[2], 0x00]) + raw[3:]) + return cls( + count=count, + address=address, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + if not 0 <= self.count <= 250: + raise ConversionError("Count out of range.") + + payload = struct.pack("!BI", self.count, self.address) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return ( + f'' + ) + + +class MemoryExtendedReadResponse(APCI): + """ + MemoryExtendedReadResponse service. + + Payload indicates return code, address (16 MiB) and data. + """ + + CODE = APCIService.MEMORY_EXTENDED_READ_RESPONSE + + def __init__(self, return_code: int, address: int, data: bytes = b"") -> None: + """Initialize a new instance of MemoryExtendedReadResponse.""" + self.return_code = return_code + self.address = address + self.data = data + + def calculated_length(self) -> int: + """Get length of APCI payload.""" + return 6 + len(self.data) + + @classmethod + def from_knx(cls, raw: bytes) -> MemoryExtendedReadResponse: + """Parse/deserialize from KNX/IP raw data.""" + size = len(raw) - 6 + + # inject [0x00] before 3 bytes address to enable unsigned int unpack + return_code, address, data = struct.unpack( + f"!BI{size}s", bytes([raw[2], 0x00]) + raw[3:] + ) + return cls( + return_code=return_code, + address=address, + data=data, + ) + + def to_knx(self) -> bytearray: + """Serialize to KNX/IP raw data.""" + if not 0 <= self.address <= 0xFFFFFF: + raise ConversionError("Address out of range.") + + if not 0 <= self.return_code <= 255: + raise ConversionError("Return code out of range.") + + size = len(self.data) + payload = struct.pack(f"!BI{size}s", self.return_code, self.address, self.data) + # suppress first byte of address + payload = payload[:1] + payload[2:] + + return encode_cmd_and_payload(self.CODE, 0, appended_payload=payload) + + def __str__(self) -> str: + """Return object as readable string.""" + return f'' + + class MemoryRead(APCI): """ MemoryRead service. From 336887736eb54a0d8977032ce2b9ac020aec69b4 Mon Sep 17 00:00:00 2001 From: Robin GROSS Date: Mon, 27 Nov 2023 21:27:00 +0100 Subject: [PATCH 2/6] fix calculated_length --- test/telegram_tests/apci_test.py | 10 +++++----- xknx/telegram/apci.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/test/telegram_tests/apci_test.py b/test/telegram_tests/apci_test.py index 3772b4315..f8d857725 100644 --- a/test/telegram_tests/apci_test.py +++ b/test/telegram_tests/apci_test.py @@ -310,7 +310,7 @@ def test_calculated_length(self): payload = MemoryExtendedWrite( address=0x123456, count=3, data=bytes([0xAA, 0xBB, 0xCC]) ) - assert payload.calculated_length() == 9 + assert payload.calculated_length() == 8 def test_from_knx(self): """Test the from_knx method.""" @@ -366,14 +366,14 @@ class TestMemoryExtendedWriteResponse: def test_calculated_length(self): """Test the test_calculated_length method.""" payload = MemoryExtendedWriteResponse(return_code=0, address=0x123456) - assert payload.calculated_length() == 6 + assert payload.calculated_length() == 5 def test_calculated_lengt_with_confirmation_data(self): """Test the test_calculated_length method.""" payload = MemoryExtendedWriteResponse( return_code=0, address=0x123456, confirmation_data=bytes([0xAA, 0xBB]) ) - assert payload.calculated_length() == 8 + assert payload.calculated_length() == 7 def test_from_knx(self): """Test the from_knx method.""" @@ -446,7 +446,7 @@ class TestMemoryExtendedRead: def test_calculated_length(self): """Test the test_calculated_length method.""" payload = MemoryExtendedRead(address=0x123456, count=3) - assert payload.calculated_length() == 6 + assert payload.calculated_length() == 5 def test_from_knx(self): """Test the from_knx method.""" @@ -487,7 +487,7 @@ def test_calculated_length(self): payload = MemoryExtendedReadResponse( return_code=0, address=0x123456, data=bytes([0xAA, 0xBB, 0xCC]) ) - assert payload.calculated_length() == 9 + assert payload.calculated_length() == 8 def test_from_knx(self): """Test the from_knx method.""" diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index 4b77cdfef..d97a5e60d 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -524,12 +524,12 @@ def __init__(self, address: int, data: bytes, count: int | None = None) -> None: def calculated_length(self) -> int: """Get length of APCI payload.""" - return 6 + len(self.data) + return 5 + len(self.data) @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedWrite: """Parse/deserialize from KNX/IP raw data.""" - size = len(raw) - 6 + size = len(raw) - 5 # inject [0x00] before 3 bytes address to enable unsigned int unpack count, address, data = struct.unpack( @@ -579,7 +579,7 @@ def __init__( def calculated_length(self) -> int: """Get length of APCI payload.""" - return 6 + len(self.confirmation_data) + return 5 + len(self.confirmation_data) @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedWriteResponse: @@ -633,7 +633,7 @@ def __init__(self, count: int, address: int) -> None: def calculated_length(self) -> int: """Get length of APCI payload.""" - return 6 + return 5 @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedRead: @@ -682,7 +682,7 @@ def __init__(self, return_code: int, address: int, data: bytes = b"") -> None: def calculated_length(self) -> int: """Get length of APCI payload.""" - return 6 + len(self.data) + return 5 + len(self.data) @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedReadResponse: From 7a42f20cd3d759591af3d773089286a64532c500 Mon Sep 17 00:00:00 2001 From: Robin GROSS Date: Tue, 28 Nov 2023 00:46:56 +0100 Subject: [PATCH 3/6] fix from_knx for MemoryExtendedWrite --- xknx/telegram/apci.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index d97a5e60d..a440ceffd 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -529,7 +529,7 @@ def calculated_length(self) -> int: @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedWrite: """Parse/deserialize from KNX/IP raw data.""" - size = len(raw) - 5 + size = len(raw) - 6 # inject [0x00] before 3 bytes address to enable unsigned int unpack count, address, data = struct.unpack( From d2781e02d58678499336efe87bcc97db7c31adaa Mon Sep 17 00:00:00 2001 From: RealByron <1749192+RealByron@users.noreply.github.com> Date: Sat, 2 Dec 2023 12:36:48 +0100 Subject: [PATCH 4/6] cleaner decoding --- xknx/telegram/apci.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index a440ceffd..47d70e2b3 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -531,10 +531,10 @@ def from_knx(cls, raw: bytes) -> MemoryExtendedWrite: """Parse/deserialize from KNX/IP raw data.""" size = len(raw) - 6 - # inject [0x00] before 3 bytes address to enable unsigned int unpack - count, address, data = struct.unpack( - f"!BI{size}s", bytes([raw[2], 0x00]) + raw[3:] - ) + count = raw[2] + address = int.from_bytes(raw[3:6], "big") + data = raw[6:] + return cls( count=count, address=address, @@ -586,10 +586,10 @@ def from_knx(cls, raw: bytes) -> MemoryExtendedWriteResponse: """Parse/deserialize from KNX/IP raw data.""" size = len(raw) - 6 - # inject [0x00] before 3 bytes address to enable unsigned int unpack - return_code, address, confirmation_data = struct.unpack( - f"!BI{size}s", bytes([raw[2], 0x00]) + raw[3:] - ) + return_code = raw[2] + address = int.from_bytes(raw[3:6], "big") + confirmation_data = raw[6:] + return cls( return_code=return_code, address=address, From ae7058b91f9e1c3523ba3c0ca3a6db213763f64b Mon Sep 17 00:00:00 2001 From: RealByron <1749192+RealByron@users.noreply.github.com> Date: Sun, 3 Dec 2023 09:53:02 +0100 Subject: [PATCH 5/6] cleaner decoding 2 --- xknx/telegram/apci.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index 47d70e2b3..35d08e9d1 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -638,8 +638,9 @@ def calculated_length(self) -> int: @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedRead: """Parse/deserialize from KNX/IP raw data.""" - # inject [0x00] before 3 bytes address to enable unsigned int unpack - count, address = struct.unpack("!BI", bytes([raw[2], 0x00]) + raw[3:]) + count = raw[2] + address = int.from_bytes(raw[3:6], "big") + return cls( count=count, address=address, @@ -689,10 +690,10 @@ def from_knx(cls, raw: bytes) -> MemoryExtendedReadResponse: """Parse/deserialize from KNX/IP raw data.""" size = len(raw) - 6 - # inject [0x00] before 3 bytes address to enable unsigned int unpack - return_code, address, data = struct.unpack( - f"!BI{size}s", bytes([raw[2], 0x00]) + raw[3:] - ) + return_code = raw[2] + address = int.from_bytes(raw[3:6], "big") + data = raw[6:] + return cls( return_code=return_code, address=address, From d5ef03b50ce289f2841a02c4bc4a3682e18044a3 Mon Sep 17 00:00:00 2001 From: RealByron <1749192+RealByron@users.noreply.github.com> Date: Sun, 3 Dec 2023 11:24:26 +0100 Subject: [PATCH 6/6] fix lint warnings --- xknx/telegram/apci.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index 35d08e9d1..d5da00ee8 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -529,8 +529,6 @@ def calculated_length(self) -> int: @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedWrite: """Parse/deserialize from KNX/IP raw data.""" - size = len(raw) - 6 - count = raw[2] address = int.from_bytes(raw[3:6], "big") data = raw[6:] @@ -584,8 +582,6 @@ def calculated_length(self) -> int: @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedWriteResponse: """Parse/deserialize from KNX/IP raw data.""" - size = len(raw) - 6 - return_code = raw[2] address = int.from_bytes(raw[3:6], "big") confirmation_data = raw[6:] @@ -688,8 +684,6 @@ def calculated_length(self) -> int: @classmethod def from_knx(cls, raw: bytes) -> MemoryExtendedReadResponse: """Parse/deserialize from KNX/IP raw data.""" - size = len(raw) - 6 - return_code = raw[2] address = int.from_bytes(raw[3:6], "big") data = raw[6:]