From 2d2fd8611c56da790b6858be68b5d2c1e99b83f0 Mon Sep 17 00:00:00 2001 From: Bas Stottelaar Date: Fri, 1 Jan 2021 14:44:22 +0100 Subject: [PATCH] Add tests for UserMemoryRead/Write/Response --- test/telegram_tests/apci_test.py | 192 +++++++++++++++++++++++++++++-- xknx/telegram/apci.py | 42 ++++--- 2 files changed, 212 insertions(+), 22 deletions(-) diff --git a/test/telegram_tests/apci_test.py b/test/telegram_tests/apci_test.py index d2793c8b57..9145e915cf 100644 --- a/test/telegram_tests/apci_test.py +++ b/test/telegram_tests/apci_test.py @@ -440,7 +440,7 @@ def test_calculated_length(self): def test_from_knx(self): """Test the from_knx method.""" - payload = MemoryRead(address=0x1234, count=11) + payload = MemoryRead() payload.from_knx(bytes([0x02, 0x0B, 0x12, 0x34])) self.assertEqual(payload, MemoryRead(address=0x1234, count=11)) @@ -458,6 +458,11 @@ def test_to_knx_conversion_error(self): with self.assertRaisesRegex(ConversionError, r".*Address.*"): payload.to_knx() + payload = MemoryRead(address=0x1234, count=255) + + with self.assertRaisesRegex(ConversionError, r".*Count.*"): + payload.to_knx() + def test_str(self): """Test the __str__ method.""" payload = MemoryRead(address=0x1234, count=11) @@ -476,7 +481,7 @@ def test_calculated_length(self): def test_from_knx(self): """Test the from_knx method.""" - payload = MemoryWrite(address=0x1234, count=3, data=bytes([0xAA, 0xBB, 0xCC])) + payload = MemoryWrite() payload.from_knx(bytes([0x02, 0x83, 0x12, 0x34, 0xAA, 0xBB, 0xCC])) self.assertEqual( @@ -501,6 +506,11 @@ def test_to_knx_conversion_error(self): with self.assertRaisesRegex(ConversionError, r".*Address.*"): payload.to_knx() + payload = MemoryWrite(address=0x1234, count=255, data=bytes([0xAA, 0xBB, 0xCC])) + + with self.assertRaisesRegex(ConversionError, r".*Count.*"): + payload.to_knx() + def test_str(self): """Test the __str__ method.""" payload = MemoryWrite(address=0x1234, count=3, data=bytes([0xAA, 0xBB, 0xCC])) @@ -523,9 +533,7 @@ def test_calculated_length(self): def test_from_knx(self): """Test the from_knx method.""" - payload = MemoryResponse( - address=0x1234, count=3, data=bytes([0xAA, 0xBB, 0xCC]) - ) + payload = MemoryResponse() payload.from_knx(bytes([0x02, 0x43, 0x12, 0x34, 0xAA, 0xBB, 0xCC])) self.assertEqual( @@ -552,6 +560,13 @@ def test_to_knx_conversion_error(self): with self.assertRaisesRegex(ConversionError, r".*Address.*"): payload.to_knx() + payload = MemoryResponse( + address=0x1234, count=255, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with self.assertRaisesRegex(ConversionError, r".*Count.*"): + payload.to_knx() + def test_str(self): """Test the __str__ method.""" payload = MemoryResponse( @@ -574,7 +589,7 @@ def test_calculated_length(self): def test_from_knx(self): """Test the from_knx method.""" - payload = DeviceDescriptorRead(13) + payload = DeviceDescriptorRead() payload.from_knx(bytes([0x03, 0x0D])) self.assertEqual(payload, DeviceDescriptorRead(13)) @@ -610,7 +625,7 @@ def test_calculated_length(self): def test_from_knx(self): """Test the from_knx method.""" - payload = DeviceDescriptorResponse(descriptor=13, value=123) + payload = DeviceDescriptorResponse() payload.from_knx(bytes([0x03, 0x4D, 0x00, 0x7B])) self.assertEqual(payload, DeviceDescriptorResponse(descriptor=13, value=123)) @@ -637,6 +652,169 @@ def test_str(self): ) +class TestUserMemoryRead(unittest.TestCase): + """Test class for UserMemoryRead objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = UserMemoryRead() + + self.assertEqual(payload.calculated_length(), 4) + + def test_from_knx(self): + """Test the from_knx method.""" + payload = UserMemoryRead() + payload.from_knx(bytes([0x02, 0xC0, 0x1B, 0x23, 0x45])) + + self.assertEqual(payload, UserMemoryRead(address=0x12345, count=11)) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = UserMemoryRead(address=0x12345, count=11) + + self.assertEqual(payload.to_knx(), bytes([0x02, 0xC0, 0x1B, 0x23, 0x45])) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = UserMemoryRead(address=0xAABBCCDD, count=11) + + with self.assertRaisesRegex(ConversionError, r".*Address.*"): + payload.to_knx() + + payload = UserMemoryRead(address=0x12345, count=255) + + with self.assertRaisesRegex(ConversionError, r".*Count.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = UserMemoryRead(address=0x12345, count=11) + + self.assertEqual( + str(payload), '' + ) + + +class TestUserMemoryWrite(unittest.TestCase): + """Test class for UserMemoryWrite objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = UserMemoryWrite( + address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + self.assertEqual(payload.calculated_length(), 7) + + def test_from_knx(self): + """Test the from_knx method.""" + payload = UserMemoryWrite() + payload.from_knx(bytes([0x02, 0xC2, 0x13, 0x23, 0x45, 0xAA, 0xBB, 0xCC])) + + self.assertEqual( + payload, + UserMemoryWrite(address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC])), + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = UserMemoryWrite( + address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + self.assertEqual( + payload.to_knx(), bytes([0x02, 0xC2, 0x13, 0x23, 0x45, 0xAA, 0xBB, 0xCC]) + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = UserMemoryWrite( + address=0xAABBCCDD, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with self.assertRaisesRegex(ConversionError, r".*Address.*"): + payload.to_knx() + + payload = UserMemoryWrite( + address=0x12345, count=255, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with self.assertRaisesRegex(ConversionError, r".*Count.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = UserMemoryWrite( + address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + self.assertEqual( + str(payload), + '', + ) + + +class TestUserMemoryResponse(unittest.TestCase): + """Test class for UserMemoryResponse objects.""" + + def test_calculated_length(self): + """Test the test_calculated_length method.""" + payload = UserMemoryResponse( + address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + self.assertEqual(payload.calculated_length(), 7) + + def test_from_knx(self): + """Test the from_knx method.""" + payload = UserMemoryResponse() + payload.from_knx(bytes([0x02, 0xC1, 0x13, 0x23, 0x45, 0xAA, 0xBB, 0xCC])) + + self.assertEqual( + payload, + UserMemoryResponse( + address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ), + ) + + def test_to_knx(self): + """Test the to_knx method.""" + payload = UserMemoryResponse( + address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + self.assertEqual( + payload.to_knx(), bytes([0x02, 0xC1, 0x13, 0x23, 0x45, 0xAA, 0xBB, 0xCC]) + ) + + def test_to_knx_conversion_error(self): + """Test the to_knx method for conversion errors.""" + payload = UserMemoryResponse( + address=0xAABBCCDD, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with self.assertRaisesRegex(ConversionError, r".*Address.*"): + payload.to_knx() + + payload = UserMemoryResponse( + address=0x12345, count=255, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + with self.assertRaisesRegex(ConversionError, r".*Count.*"): + payload.to_knx() + + def test_str(self): + """Test the __str__ method.""" + payload = UserMemoryResponse( + address=0x12345, count=3, data=bytes([0xAA, 0xBB, 0xCC]) + ) + + self.assertEqual( + str(payload), + '', + ) + + class TestRestart(unittest.TestCase): """Test class for Restart objects.""" diff --git a/xknx/telegram/apci.py b/xknx/telegram/apci.py index 60a260d23a..1e9d17c5e1 100644 --- a/xknx/telegram/apci.py +++ b/xknx/telegram/apci.py @@ -490,7 +490,7 @@ class MemoryRead(APCI): """ MemoryRead service. - Payload indicates address and count. + Payload indicates address (64 KiB) and count (1-63 bytes). """ code = APCIService.MEMORY_READ @@ -514,6 +514,8 @@ def to_knx(self) -> bytes: """Serialize to KNX/IP raw data.""" if self.address < 0 or self.address >= 2 ** 16: raise ConversionError("Address out of range.") + if self.count < 0 or self.count >= 2 ** 6: + raise ConversionError("Count out of range.") payload = struct.pack("!BH", self.count, self.address) @@ -530,7 +532,7 @@ class MemoryWrite(APCI): """ MemoryWrite service. - Payload indicates address, count and data. + Payload indicates address (64 KiB), count (1-63 bytes) and data. """ code = APCIService.MEMORY_WRITE @@ -563,6 +565,8 @@ def to_knx(self) -> bytes: """Serialize to KNX/IP raw data.""" if self.address < 0 or self.address >= 2 ** 16: raise ConversionError("Address out of range.") + if self.count < 0 or self.count >= 2 ** 6: + raise ConversionError("Count out of range.") size = len(self.data) payload = struct.pack(f"!BH{size}s", self.count, self.address, self.data) @@ -580,7 +584,7 @@ class MemoryResponse(APCI): """ MemoryResponse service. - Payload indicates address, count and data. + Payload indicates address (64 KiB), count (1-63 bytes) and data. """ code = APCIService.MEMORY_RESPONSE @@ -613,6 +617,8 @@ def to_knx(self) -> bytes: """Serialize to KNX/IP raw data.""" if self.address < 0 or self.address >= 2 ** 16: raise ConversionError("Address out of range.") + if self.count < 0 or self.count >= 2 ** 6: + raise ConversionError("Count out of range.") size = len(self.data) payload = struct.pack(f"!BH{size}s", self.count, self.address, self.data) @@ -733,7 +739,7 @@ class UserMemoryRead(APCI): """ UserMemoryRead service. - Payload indicates address and count. + Payload indicates address (1 MiB) and count (1-15 bytes). """ code = APCIUserService.USER_MEMORY_READ @@ -752,14 +758,16 @@ def from_knx(self, raw: bytes) -> None: byte0, address = struct.unpack("!BH", raw[2:]) self.count = byte0 & 0x0F - self.address = ((byte0 & 0xF0) << 16) + address + self.address = (((byte0 & 0xF0) >> 4) << 16) + address def to_knx(self) -> bytes: """Serialize to KNX/IP raw data.""" if self.address < 0 or self.address >= 2 ** 20: raise ConversionError("Address out of range.") + if self.count < 0 or self.count >= 2 ** 4: + raise ConversionError("Count out of range.") - byte0 = (self.address & 0x0F0000 >> 12) | (self.count & 0x0F) + byte0 = (((self.address & 0x0F0000) >> 16) << 4) | (self.count & 0x0F) address = self.address & 0xFFFF payload = struct.pack("!BH", byte0, address) @@ -775,7 +783,7 @@ class UserMemoryWrite(APCI): """ UserMemoryWrite service. - Payload indicates address, count and data. + Payload indicates address (1 MiB), count and data. """ code = APCIUserService.USER_MEMORY_WRITE @@ -803,20 +811,22 @@ def from_knx(self, raw: bytes) -> None: byte0, address, self.data = struct.unpack(f"!BH{size}s", raw[2:]) self.count = byte0 & 0x0F - self.address = ((byte0 & 0xF0) << 16) + address + self.address = (((byte0 & 0xF0) >> 4) << 16) + address def to_knx(self) -> bytes: """Serialize to KNX/IP raw data.""" if self.address < 0 or self.address >= 2 ** 20: raise ConversionError("Address out of range.") + if self.count < 0 or self.count >= 2 ** 4: + raise ConversionError("Count out of range.") - byte0 = (self.address & 0x0F0000 >> 12) | (self.count & 0x0F) + byte0 = (((self.address & 0x0F0000) >> 16) << 4) | (self.count & 0x0F) address = self.address & 0xFFFF size = len(self.data) payload = struct.pack(f"!BH{size}s", byte0, address, self.data) - return encode_cmd_and_payload(self.code, appended_payload=payload[1:]) + return encode_cmd_and_payload(self.code, appended_payload=payload) def __str__(self) -> str: """Return object as readable string.""" @@ -827,7 +837,7 @@ class UserMemoryResponse(APCI): """ UserMemoryResponse service. - Payload indicates address, count and data. + Payload indicates address (1 MiB), count and data. """ code = APCIUserService.USER_MEMORY_RESPONSE @@ -855,20 +865,22 @@ def from_knx(self, raw: bytes) -> None: byte0, address, self.data = struct.unpack(f"!BH{size}s", raw[2:]) self.count = byte0 & 0x0F - self.address = ((byte0 & 0xF0) << 16) + address + self.address = (((byte0 & 0xF0) >> 4) << 16) + address def to_knx(self) -> bytes: """Serialize to KNX/IP raw data.""" if self.address < 0 or self.address >= 2 ** 20: raise ConversionError("Address out of range.") + if self.count < 0 or self.count >= 2 ** 4: + raise ConversionError("Count out of range.") - byte0 = (self.address & 0x0F0000 >> 12) | (self.count & 0x0F) + byte0 = (((self.address & 0x0F0000) >> 16) << 4) | (self.count & 0x0F) address = self.address & 0xFFFF size = len(self.data) payload = struct.pack(f"!BH{size}s", byte0, address, self.data) - return encode_cmd_and_payload(self.code, appended_payload=payload[1:]) + return encode_cmd_and_payload(self.code, appended_payload=payload) def __str__(self) -> str: """Return object as readable string.""" @@ -1139,7 +1151,7 @@ def __init__( self, object_index: int = 0, property_id: int = 0, - count: int = 1, + count: int = 0, start_index: int = 1, ) -> None: """Initialize a new instance of PropertyValueRead."""