diff --git a/pyof/foundation/basic_types.py b/pyof/foundation/basic_types.py index 451b46f2f..fa9688e7c 100644 --- a/pyof/foundation/basic_types.py +++ b/pyof/foundation/basic_types.py @@ -171,19 +171,8 @@ def unpack(self, buff, offset=0): self._value = ':'.join(hexas) -class Char(GenericType): - """Build a double char type according to the length.""" - - def __init__(self, value=None, length=0): - """The constructor takes the optional parameters below. - - Args: - value: The character to be build. - length (int): Character size. - """ - super().__init__(value) - self.length = length - self._fmt = '!{}{}'.format(self.length, 's') +class CharStringBase(GenericType): + """A null terminated ascii char string.""" def pack(self, value=None): """Pack the value as a binary representation. @@ -231,6 +220,19 @@ def unpack(self, buff, offset=0): self._value = unpacked_data.decode('ascii').rstrip('\0') +def Char(value=None, length=0): + """Return a CharString class with given length and initial value.""" + string_length = length + + class CharString(CharStringBase): + """Class for null terminated ascii strings of fixed length.""" + + length = string_length + _fmt = '!{}{}'.format(length, 's') + + return CharString(value) + + class IPAddress(GenericType): """Defines a IP address.""" diff --git a/pyof/utils.py b/pyof/utils.py new file mode 100644 index 000000000..a71d8945d --- /dev/null +++ b/pyof/utils.py @@ -0,0 +1,37 @@ +"""General Unpack utils for python-openflow.""" +import pyof.v0x01.common.header +import pyof.v0x01.common.utils +import pyof.v0x04.common.header +import pyof.v0x04.common.utils +from pyof.foundation.exceptions import UnpackException + +pyof_version_libs = {0x01: pyof.v0x01, + 0x04: pyof.v0x04} + + +def unpack(packet): + """Unpack the OpenFlow Packet and returns a message.""" + try: + version = packet[0] + except IndexError: + raise UnpackException('null packet') + + try: + pyof_lib = pyof_version_libs[version] + except KeyError: + raise UnpackException('Version not supported') + + try: + header = pyof_lib.common.header.Header() + header.unpack(packet[:8]) + message = pyof_lib.common.utils.new_message_from_header(header) + binary_data = packet[8:] + if binary_data: + if len(binary_data) == header.length - 8: + message.unpack(binary_data) + else: + raise UnpackException( + 'packet size does not match packet length field') + return message + except (UnpackException, ValueError) as e: + raise UnpackException(e) diff --git a/tests/test_foundation/test_basic_types.py b/tests/test_foundation/test_basic_types.py index 9cd95afa1..07a877e0a 100644 --- a/tests/test_foundation/test_basic_types.py +++ b/tests/test_foundation/test_basic_types.py @@ -3,152 +3,102 @@ from pyof.foundation import basic_types from pyof.foundation.basic_types import BinaryData +from tests.test_struct import TestStructDump -class TestUBInt8(unittest.TestCase): - """Test of UBInt8 BasicType.""" +class TestUBInt8(TestStructDump): + """Test UBInt8.""" - def setUp(self): - """Basic test setup.""" - self.ubint8 = basic_types.UBInt8() + dump = b'\xff' + obj = basic_types.UBInt8(2**8 - 1) + min_size = 1 - def test_get_size(self): - """[Foundation/BasicTypes/UBInt8] - size 1.""" - self.assertEqual(self.ubint8.get_size(), 1) - @unittest.skip('Not yet implemented') - def test_pack(self): - """[Foundation/BasicTypes/UBInt8] - packing.""" - pass +class TestUBInt16(TestStructDump): + """Test UBInt16.""" - @unittest.skip('Not yet implemented') - def test_unpack(self): - """[Foundation/BasicTypes/UBInt8] - unpacking.""" - pass + dump = b'\xff\xff' + obj = basic_types.UBInt16(2**16 - 1) + min_size = 2 -class TestUBInt16(unittest.TestCase): - """Test of UBInt16 BasicType.""" +class TestUBInt32(TestStructDump): + """Test UBInt32.""" - def setUp(self): - """Basic test setup.""" - self.ubint16 = basic_types.UBInt16() + dump = b'\xff\xff\xff\xff' + obj = basic_types.UBInt32(2**32 - 1) + min_size = 4 - def test_get_size(self): - """[Foundation/BasicTypes/UBInt16] - size 2.""" - self.assertEqual(self.ubint16.get_size(), 2) - @unittest.skip('Not yet implemented') - def test_pack(self): - """[Foundation/BasicTypes/UBInt16] - packing.""" - pass +class TestChar3(TestStructDump): + """Test Char with length 3.""" - @unittest.skip('Not yet implemented') - def test_unpack(self): - """[Foundation/BasicTypes/UBInt16] - unpacking.""" - pass + dump = b'fo\x00' + obj = basic_types.Char('foo', length=3) + min_size = 3 -class TestUBInt32(unittest.TestCase): - """Test of UBInt32 BasicType.""" +class TestChar5(TestStructDump): + """Test Char with length 5.""" - def setUp(self): - """Basic test setup.""" - self.ubint32 = basic_types.UBInt32() + dump = b'foo\x00\x00' + obj = basic_types.Char('foo', length=5) + min_size = 5 - def test_get_size(self): - """[Foundation/BasicTypes/UBInt32] - size 4.""" - self.assertEqual(self.ubint32.get_size(), 4) - @unittest.skip('Not yet implemented') - def test_pack(self): - """[Foundation/BasicTypes/UBInt32] - packing.""" - pass +class TestHWAddressMac(TestStructDump): + """Test HWAddress mac value.""" - @unittest.skip('Not yet implemented') - def test_unpack(self): - """[Foundation/BasicTypes/UBInt32] - unpacking.""" - pass + mac = '0a:d3:98:a5:30:47' + dump = b'\x0a\xd3\x98\xa5\x30\x47' + obj = basic_types.HWAddress(mac) + min_size = 6 + def test_address_value(self): + """Test HWAddress mac value.""" + self.assertEqual(self.obj.value, self.mac) -class TestChar(unittest.TestCase): - """Test of Char BasicType.""" - def setUp(self): - """Basic test setup.""" - self.char1 = basic_types.Char('foo', length=3) - self.char2 = basic_types.Char('foo', length=5) +class TestIPAddressNetmask(TestStructDump): + """Test IPAddress and its default netmask value.""" - def test_get_size(self): - """[Foundation/BasicTypes/Char] - get_size.""" - self.assertEqual(self.char1.get_size(), 3) - self.assertEqual(self.char2.get_size(), 5) + dump = b'\xc0\xa8\x00\x01' + obj = basic_types.IPAddress('192.168.0.1') + netmask = 32 + min_size = 4 - def test_pack(self): - """[Foundation/BasicTypes/Char] - packing.""" - self.assertEqual(self.char1.pack(), b'fo\x00') - self.assertEqual(self.char2.pack(), b'foo\x00\x00') + def test_netmask(self): + """Test IPAddress netmask value.""" + self.assertEqual(self.obj.netmask, self.netmask) - def test_unpack(self): - """[Foundation/BasicTypes/Char] - unpacking.""" - char1 = basic_types.Char(length=3) - char2 = basic_types.Char(length=5) - char1.unpack(b'fo\x00') - char2.unpack(b'foo\x00\x00') - self.assertEqual(char1.value, 'fo') - self.assertEqual(char2.value, 'foo') +class TestIPAddressNoNetmask(TestIPAddressNetmask): + """Test IPAdress and netmask value 16.""" + dump = b'\xc0\xa8\x00\x01' + obj = basic_types.IPAddress('192.168.0.1/16') + netmask = 16 + min_size = 4 -class TestHWaddress(unittest.TestCase): - """Test of HWAddress BasicType.""" - def test_unpack_packed(self): - """Testing unpack of packed HWAddress.""" - mac = '0a:d3:98:a5:30:47' - hw_addr = basic_types.HWAddress(mac) - packed = hw_addr.pack() - unpacked = basic_types.HWAddress() - unpacked.unpack(packed) - self.assertEqual(mac, unpacked.value) +class TestBinaryDataEmpty(TestStructDump): + """Test empty BinaryData.""" - def test_default_value(self): - """Testing default_value for HWAddress.""" - mac = '00:00:00:00:00:00' - hw_addr = basic_types.HWAddress() - packed = hw_addr.pack() - unpacked = basic_types.HWAddress() - unpacked.unpack(packed) - self.assertEqual(mac, unpacked.value) + dump = b'' + obj = BinaryData() + min_size = 0 -class TestIPAddress(unittest.TestCase): - """Test of IPAddress BasicType.""" - - def test_unpack_packed(self): - """Test unpacking of packed IPAddress.""" - ip_addr = basic_types.IPAddress('192.168.0.1') - packed = ip_addr.pack() - unpacked = basic_types.IPAddress() - unpacked.unpack(packed) - self.assertEqual(ip_addr.value, unpacked.value) - - def test_unpack_packed_with_netmask(self): - """Testing unpack of packed IPAddress with netmask.""" - ip_addr = basic_types.IPAddress('192.168.0.1/16') - packed = ip_addr.pack() - unpacked = basic_types.IPAddress() - unpacked.unpack(packed) - self.assertEqual(ip_addr.value, unpacked.value) +class TestBinaryDataBytes(TestStructDump): + """Test 'bytes' BinaryData.""" + + dump = b'bytes' + obj = BinaryData(b'bytes') + min_size = 0 - def test_netmask(self): - """Testing get netmask from IPAddress.""" - ip_addr = basic_types.IPAddress('192.168.0.1/24') - self.assertEqual(ip_addr.netmask, 24) - ip_addr = basic_types.IPAddress('192.168.0.1/16') - self.assertEqual(ip_addr.netmask, 16) - ip_addr = basic_types.IPAddress('192.168.0.1') - self.assertEqual(ip_addr.netmask, 32) + +class TestIPAddress(unittest.TestCase): + """Test of IPAddress BasicType max_prefix.""" def test_max_prefix(self): """Testing get max_prefix from IPAddress.""" @@ -157,32 +107,9 @@ def test_max_prefix(self): ip_addr = basic_types.IPAddress('192.168.0.35/16') self.assertEqual(ip_addr.max_prefix, 32) - def test_get_size(self): - """Testing get_size from IPAddress.""" - ip_addr = basic_types.IPAddress('192.168.0.1/24') - self.assertEqual(ip_addr.get_size(), 4) - class TestBinaryData(unittest.TestCase): - """Test Binary data type.""" - - def test_default_value(self): - """Default packed value should be an empty byte.""" - expected = b'' - actual = BinaryData().pack() - self.assertEqual(expected, actual) - - def test_pack_bytes(self): - """Test packing some bytes.""" - expected = b'forty two' - actual = BinaryData(expected).pack() - self.assertEqual(expected, actual) - - def test_pack_empty_bytes(self): - """Test packing empty bytes.""" - expected = b'' - actual = BinaryData(expected).pack() - self.assertEqual(expected, actual) + """Test Binary data type cannot accept string.""" def test_unexpected_value(self): """Should raise ValueError if constructor value is not bytes.""" diff --git a/tests/test_foundation/test_network_types.py b/tests/test_foundation/test_network_types.py index a72fd42a2..a53379a9a 100644 --- a/tests/test_foundation/test_network_types.py +++ b/tests/test_foundation/test_network_types.py @@ -1,59 +1,25 @@ """Test Python-openflow network types.""" -import unittest - from pyof.foundation.basic_types import BinaryData from pyof.foundation.network_types import GenericTLV, IPv4 +from tests.test_struct import TestStructDump -class TestNetworkTypes(unittest.TestCase): - """Reproduce bugs found.""" - - def test_GenTLV_value_unpack(self): - """Value attribute should be the same after unpacking.""" - value = BinaryData(b'test') - tlv = GenericTLV(value=value) - tlv_unpacked = GenericTLV() - tlv_unpacked.unpack(tlv.pack()) - self.assertEqual(tlv.value.value, tlv_unpacked.value.value) - +class TestGenericTLV(TestStructDump): + """Test the GenericTLV value data.""" -class TestIPv4(unittest.TestCase): - """Test IPv4 packets.""" + dump = b'\xfe\x04test' + obj = GenericTLV(value=BinaryData(b'test')) - def test_IPv4_pack(self): - """Test pack/unpack of IPv4 class.""" - packet = IPv4(dscp=10, ttl=64, protocol=17, source="192.168.0.10", - destination="172.16.10.30", options=b'1000', - data=b'testdata') - packed = packet.pack() - expected = b'F(\x00 \x00\x00\x00\x00@\x11\x02' - expected += b'\xc5\xc0\xa8\x00\n\xac\x10\n\x1e1000testdata' - self.assertEqual(packed, expected) - def test_IPv4_unpack(self): - """Test unpack of IPv4 binary packet.""" - raw = b'FP\x00$\x00\x00\x00\x00\x80\x06W' - raw += b'\xf4\n\x9aN\x81\xc0\xa8\xc7\xcc1000somemoredata' - expected = IPv4(dscp=20, ttl=128, protocol=6, source="10.154.78.129", - destination="192.168.199.204", options=b'1000', - data=b'somemoredata') - expected.pack() - unpacked = IPv4() - unpacked.unpack(raw) - self.assertEqual(unpacked, expected) +class TestIPV4(TestStructDump): + """Test the IPV4 class.""" - def test_IPv4_size(self): - """Test Header size for IPv4 packet.""" - packet = IPv4() - packet.pack() - self.assertEqual(20, packet.get_size()) - self.assertEqual(20, packet.length) - self.assertEqual(20, packet.ihl * 4) + dump = b'F(\x00 \x00\x00\x00\x00@\x11\x02' +\ + b'\xc5\xc0\xa8\x00\n\xac\x10\n\x1e1000testdata' + obj = IPv4(dscp=10, ttl=64, protocol=17, source="192.168.0.10", + destination="172.16.10.30", options=b'1000', + data=b'testdata') def test_IPv4_checksum(self): """Test if the IPv4 checksum is being calculated correclty.""" - packet = IPv4(dscp=10, ttl=64, protocol=17, source="192.168.0.10", - destination="172.16.10.30", options=b'1000', - data=b'testdata') - packet.pack() - self.assertEqual(packet.checksum, 709) + self.assertEqual(self._unpacked_dump.checksum, 709) diff --git a/tests/test_struct.py b/tests/test_struct.py index 057b21033..d5c824c6b 100644 --- a/tests/test_struct.py +++ b/tests/test_struct.py @@ -1,6 +1,7 @@ """Automate struct tests.""" import unittest +from pyof.utils import unpack from tests.raw_dump import RawDump @@ -161,3 +162,148 @@ def test_raw_dump_size(self): self.assertEqual(obj.get_size(), unpacked.get_size()) except FileNotFoundError: raise self.skipTest('No raw dump file found.') + + +class TestStructDump(unittest.TestCase): + """Run message pack, unpack and get_size tests using bytes struct dump. + + Test the lib with raw dumps. We assume the raw files are valid according + to the OF specs to check whether our pack, unpack and get_size + implementations are correct. + + Also, check the minimum size of the struct by instantiating an object with + no parameters. + + To run the tests, just extends this class and set the 'dump' and 'obj' + attributes. You can also optionally set the 'min_size' attribute. + + Example: + .. code-block:: python3 + + from pyof.v0x01.foundation.basic_type import BinaryData + + class TestBinaryData(TestStructDump): + dump = b'data' + obj = BinaryData(xid=0) + min_size = 0 + """ + + dump = b'' + obj = None + min_size = 0 + + def __init__(self, *args, **kwargs): + """Constructor to avoid this base class being executed as a test.""" + if self.__class__ == TestStructDump: + self.run = self.run = lambda self, *args, **kwargs: None + super().__init__(*args, **kwargs) + + def setUp(self): + """Setup the instance before testing.""" + self._msg_cls = type(self.obj) + self._unpacked_dump = self._unpack_dump() + super().setUp() + + def _unpack_dump(self): + obj = self._msg_cls() + obj.unpack(self.dump) + return obj + + def test_pack(self): + """Check whether packed objects equals to dump file.""" + self.assertEqual(self.dump, self.obj.pack()) + + def test_unpack(self): + """Check whether the unpacked dump equals to expected object.""" + self.assertEqual(self._unpacked_dump, self.obj) + + def test_get_size(self): + """Check if get_size method return the correct size.""" + self.assertEqual(self.obj.get_size(), len(self.dump)) + + def test_minimum_size(self): + """Test struct minimum size.""" + if not self.min_size: + raise self.skipTest('skipped, no minimum size set.') + obj = self._msg_cls() + self.assertEqual(obj.get_size(), self.min_size) + + def test_raw_dump_size(self): + """Check whether the unpacked dump has the expected size.""" + self.assertEqual(self.obj.get_size(), self._unpacked_dump.get_size()) + + +class TestMsgDump(TestStructDump): + r"""Run message pack, unpack and get_size tests using bytes message dump. + + Test the lib with raw dumps. We assume the raw files are valid according + to the OF specs to check whether our pack, unpack and get_size + implementations are correct. + + Also, check the minimum size of the struct by instantiating an object with + no parameters. + + To run the tests, just extends this class and set the 'dump' and 'obj' + attributes. You can also optionally set the 'min_size' attribute. + + Example: + .. code-block:: python3 + + class TestHello(TestMsgDump): + dump = b'\x01\x00\x00\x08\x00\x00\x00\x00' + obj = pyof.v0x01.symmetric.hello.Hello(xid=0) + min_size = 8 + """ + + def __init__(self, *args, **kwargs): + """Constructor to avoid this base class beeing executed as a test.""" + if self.__class__ == TestMsgDump: + self.run = self.run = lambda self, *args, **kwargs: None + super().__init__(*args, **kwargs) + + def _unpack_dump(self): + return unpack(self.dump) + + +class TestMsgDumpFile(TestMsgDump): + """Run message pack, unpack and get_size tests using message in a dumpfile. + + Test the lib with raw dumps. We assume the raw files are valid according + to the OF specs to check whether our pack, unpack and get_size + implementations are correct. + + Also, check the minimum size of the message by instantiating an object with + no parameters. + + To run the tests, just extends this class and set the 'dumpfile' and 'obj' + attributes. You can also optionally set the 'min_size' attribute. + + Example: + .. code-block:: python3 + + class TestHelloFileDump(TestMsgDumpFile): + dumpfile = 'v0x01/ofpt_hello.dat' + obj = pyof.v0x01.symmetric.hello.Hello(xid=1) + + """ + + dumpfile = None + + def __init__(self, *args, **kwargs): + """Constructor to avoid this base class beeing executed as a test.""" + if self.__class__ == TestMsgDumpFile: + self.run = self.run = lambda self, *args, **kwargs: None + super().__init__(*args, **kwargs) + + def setUp(self): + """Setup the instance before testing.""" + self._read_dump_file() + super().setUp() + + def _read_dump_file(self): + dumpfile = f'raw/{self.dumpfile}' + try: + with open(dumpfile, 'rb') as fd: + self.dump = fd.read() + except FileNotFoundError: + raise self.skipTest(f'No raw dump file found: {dumpfile}') diff --git a/tests/v0x01/test_asynchronous/test_error_msg.py b/tests/v0x01/test_asynchronous/test_error_msg.py index 79bd3d4c6..4906b7a24 100644 --- a/tests/v0x01/test_asynchronous/test_error_msg.py +++ b/tests/v0x01/test_asynchronous/test_error_msg.py @@ -1,33 +1,27 @@ """Testing Error Message.""" from pyof.v0x01.asynchronous.error_msg import ( BadRequestCode, ErrorMsg, ErrorType, FlowModFailedCode) -from tests.test_struct import TestStruct +from tests.test_struct import TestMsgDump, TestMsgDumpFile -class TestErrorMessage(TestStruct): - """Test the Error Message.""" +class TestErrorMessageFromFile(TestMsgDumpFile): + """Test the ErrorMsg class.""" - @classmethod - def setUpClass(cls): - """Setup TestStruct.""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_error_msg') - super().set_raw_dump_object(ErrorMsg, xid=12, - error_type=ErrorType.OFPET_BAD_REQUEST, - code=BadRequestCode.OFPBRC_BAD_STAT, - data=b'') - super().set_minimum_size(12) + dumpfile = 'v0x01/ofpt_error_msg.dat' + obj = ErrorMsg(xid=12, + error_type=ErrorType.OFPET_BAD_REQUEST, + code=BadRequestCode.OFPBRC_BAD_STAT, + data=b'') + min_size = 12 - def test_unpack_error_msg(self): - """Test Unpack a sample ErrorMsg.""" - expected = b'\x01\x01\x00\x1b\x00\x00\x00\x18\x00\x03\x00\x02FLOW' - error_msg = ErrorMsg(xid=24, - error_type=ErrorType.OFPET_FLOW_MOD_FAILED, - code=FlowModFailedCode.OFPFMFC_EPERM, - data=b'FLOW') +class TestErrorMessageDump(TestMsgDump): + """Test the ErrorMsg class.""" - actual = ErrorMsg(xid=24) - actual.unpack(expected[8:]) - - self.assertEqual(actual, error_msg) + # dump needs to be checked + dump = b'\x01\x01\x00\x10\x00\x00\x00\x18\x00\x03\x00\x02FLOW' + obj = ErrorMsg(xid=24, + error_type=ErrorType.OFPET_FLOW_MOD_FAILED, + code=FlowModFailedCode.OFPFMFC_EPERM, + data=b'FLOW') + min_size = 12 diff --git a/tests/v0x01/test_asynchronous/test_flow_removed.py b/tests/v0x01/test_asynchronous/test_flow_removed.py index 9037fdb45..dbab461e8 100644 --- a/tests/v0x01/test_asynchronous/test_flow_removed.py +++ b/tests/v0x01/test_asynchronous/test_flow_removed.py @@ -2,28 +2,24 @@ from pyof.foundation.basic_types import HWAddress, IPAddress from pyof.v0x01.asynchronous.flow_removed import FlowRemoved, FlowRemovedReason from pyof.v0x01.common.flow_match import Match -from tests.test_struct import TestStruct +from tests.test_struct import TestMsgDumpFile -class TestFlowRemoved(TestStruct): - """Test the FlowRemoved message.""" +class TestFlowRemoved(TestMsgDumpFile): + """Test the FlowRemoved class.""" - @classmethod - def setUpClass(cls): - """Setup TestStruct.""" - reason = FlowRemovedReason.OFPRR_IDLE_TIMEOUT - match = Match(in_port=80, dl_vlan=1, dl_vlan_pcp=1, dl_type=1, - nw_tos=1, nw_proto=1, tp_src=80, tp_dst=80, - dl_src=HWAddress('00:00:00:00:00:00'), - dl_dst=HWAddress('00:00:00:00:00:00'), - nw_src=IPAddress('192.168.0.1'), - nw_dst=IPAddress('192.168.0.2')) + dumpfile = 'v0x01/ofpt_flow_removed.dat' + reason = FlowRemovedReason.OFPRR_IDLE_TIMEOUT + match = Match(in_port=80, dl_vlan=1, dl_vlan_pcp=1, dl_type=1, + nw_tos=1, nw_proto=1, tp_src=80, tp_dst=80, + dl_src=HWAddress('00:00:00:00:00:00'), + dl_dst=HWAddress('00:00:00:00:00:00'), + nw_src=IPAddress('192.168.0.1'), + nw_dst=IPAddress('192.168.0.2')) - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_flow_removed') - super().set_raw_dump_object(FlowRemoved, xid=12, - match=match, cookie=0, priority=1, - reason=reason, duration_sec=4, - duration_nsec=23, idle_timeout=9, - packet_count=10, byte_count=4) - super().set_minimum_size(88) + obj = FlowRemoved(xid=12, + match=match, cookie=0, priority=1, + reason=reason, duration_sec=4, + duration_nsec=23, idle_timeout=9, + packet_count=10, byte_count=4) + min_size = 88 diff --git a/tests/v0x01/test_asynchronous/test_packet_in.py b/tests/v0x01/test_asynchronous/test_packet_in.py index a223d3228..cd61157ea 100644 --- a/tests/v0x01/test_asynchronous/test_packet_in.py +++ b/tests/v0x01/test_asynchronous/test_packet_in.py @@ -1,19 +1,16 @@ """Packet in message tests.""" from pyof.v0x01.asynchronous.packet_in import PacketIn, PacketInReason -from tests.test_struct import TestStruct +from tests.test_struct import TestMsgDumpFile -class TestPacketIn(TestStruct): +class TestPacketIn(TestMsgDumpFile): """Packet in message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_packet_in') - super().set_raw_dump_object(PacketIn, xid=15, buffer_id=1, total_len=1, - in_port=1, - reason=PacketInReason.OFPR_ACTION) - # Different from the specification, the minimum size of this class is - # 18, not 20. - super().set_minimum_size(18) + dumpfile = 'v0x01/ofpt_packet_in.dat' + obj = PacketIn(xid=15, buffer_id=1, total_len=1, + in_port=1, + reason=PacketInReason.OFPR_ACTION) + + # Different from the specification, the minimum size of this class is + # 18, not 20. + min_size = 18 diff --git a/tests/v0x01/test_asynchronous/test_port_status.py b/tests/v0x01/test_asynchronous/test_port_status.py index e641718e9..585de1762 100644 --- a/tests/v0x01/test_asynchronous/test_port_status.py +++ b/tests/v0x01/test_asynchronous/test_port_status.py @@ -4,31 +4,37 @@ from pyof.v0x01.asynchronous.port_status import PortReason, PortStatus from pyof.v0x01.common.phy_port import ( PhyPort, PortConfig, PortFeatures, PortState) -from tests.test_struct import TestStruct +from tests.test_struct import TestMsgDump, TestMsgDumpFile -class TestPortStatus(TestStruct): - """Test the Port Status message.""" +class TestPortStatus1(TestMsgDumpFile): + """Test the PortStatus class.""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_port_status') - super().set_raw_dump_object(_new_portstatus) - super().set_minimum_size(64) + dumpfile = 'v0x01/ofpt_port_status.dat' + + desc_name = 's1-eth1' + desc = PhyPort(port_no=1, + hw_addr=HWAddress('9a:da:11:8a:f4:0c'), + name=desc_name, + config=0, + state=0, + curr=192, + advertised=0, + supported=0, + peer=0) + obj = PortStatus(xid=0, + reason=PortReason.OFPPR_MODIFY, + desc=desc) - def test_pack(self): - """Skip pack test for now.""" - self.skipTest('Need to recover dump contents.') - def test_unpack(self): - """Skipt unpack test for now.""" - self.skipTest('Need to recover dump contents.') +class TestPortStatus2(TestMsgDump): + """Test the PortStatus class.""" + dump = b'\x01\x0c\x00@\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00' + dump += b'\x00\x00\x02\n\n\x12\x12\x10\x10XXXXXXXXXXXXXXX\x00\x00\x00' + dump += b'\x00\x02\x00\x00\x02\x00\x00\x00\x00@\x00\x00\x04\x00\x00' + dump += b'\x00\x02\x00\x00\x00\x02\x00' # needs to be checked -def _new_portstatus(): - """Crate new PortStatus and PhyPort instances.""" desc_name = 'X' * OFP_MAX_PORT_NAME_LEN desc = PhyPort(port_no=2, hw_addr=HWAddress('0a:0a:12:12:10:10'), @@ -39,6 +45,6 @@ def _new_portstatus(): advertised=PortFeatures.OFPPF_PAUSE, supported=PortFeatures.OFPPF_AUTONEG, peer=PortFeatures.OFPPF_AUTONEG) - return PortStatus(xid=1, - reason=PortReason.OFPPR_ADD, - desc=desc) + obj = PortStatus(xid=1, + reason=PortReason.OFPPR_ADD, + desc=desc) diff --git a/tests/v0x01/test_common/test_action.py b/tests/v0x01/test_common/test_action.py index cb9937135..a91bb9590 100644 --- a/tests/v0x01/test_common/test_action.py +++ b/tests/v0x01/test_common/test_action.py @@ -3,122 +3,80 @@ ActionDLAddr, ActionEnqueue, ActionNWAddr, ActionNWTos, ActionOutput, ActionTPPort, ActionType, ActionVendorHeader, ActionVlanPCP, ActionVlanVid) from pyof.v0x01.common.phy_port import Port -from tests.test_struct import TestStruct +from tests.test_struct import TestMsgDumpFile -class TestActionOutput(TestStruct): +class TestActionOutput(TestMsgDumpFile): """ActionOutput message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_output') - super().set_raw_dump_object(ActionOutput, port=Port.OFPP_CONTROLLER, - max_length=8) - super().set_minimum_size(8) + dumpfile = 'v0x01/ofpt_action_output.dat' + obj = ActionOutput(port=Port.OFPP_CONTROLLER, max_length=8) + min_size = 8 -class TestActionEnqueue(TestStruct): +class TestActionEnqueue(TestMsgDumpFile): """ActionEnqueue message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_enqueue') - super().set_raw_dump_object(ActionEnqueue, port=Port.OFPP_CONTROLLER, - queue_id=4) - super().set_minimum_size(16) + dumpfile = 'v0x01/ofpt_action_enqueue.dat' + obj = ActionEnqueue(port=Port.OFPP_CONTROLLER, queue_id=4) + min_size = 16 -class TestActionVlanVid(TestStruct): +class TestActionVlanVid(TestMsgDumpFile): """ActionVlanVid message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_vlan_vid') - super().set_raw_dump_object(ActionVlanVid, vlan_id=5) - super().set_minimum_size(8) + dumpfile = 'v0x01/ofpt_action_vlan_vid.dat' + obj = ActionVlanVid(vlan_id=5) + min_size = 8 -class TestActionVlanPCP(TestStruct): +class TestActionVlanPCP(TestMsgDumpFile): """ActionVlanPCP message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_vlan_pcp') - super().set_raw_dump_object(ActionVlanPCP, vlan_pcp=2) - super().set_minimum_size(8) + dumpfile = 'v0x01/ofpt_action_vlan_pcp.dat' + obj = ActionVlanPCP(vlan_pcp=2) + min_size = 8 -class TestActionDLAddr(TestStruct): +class TestActionDLAddr(TestMsgDumpFile): """ActionDLAddr message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_dl_addr') - super().set_raw_dump_object(ActionDLAddr, - dl_addr_type=ActionType.OFPAT_SET_DL_SRC, - dl_addr=[12, 12, 12, 12, 12, 12]) - super().set_minimum_size(16) + dumpfile = 'v0x01/ofpt_action_dl_addr.dat' + obj = ActionDLAddr(action_type=ActionType.OFPAT_SET_DL_SRC, + dl_addr=[12, 12, 12, 12, 12, 12]) + min_size = 16 -class TestActionNWAddr(TestStruct): +class TestActionNWAddr(TestMsgDumpFile): """ActionNWAddr message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_nw_addr') - super().set_raw_dump_object(ActionNWAddr, - nw_addr_type=ActionType.OFPAT_SET_NW_SRC, - nw_addr=[12, 12, 12, 12, 12, 12]) - super().set_minimum_size(8) + dumpfile = 'v0x01/ofpt_action_nw_addr.dat' + obj = ActionNWAddr(action_type=ActionType.OFPAT_SET_NW_SRC, + nw_addr=[12, 12, 12, 12, 12, 12]) + min_size = 8 -class TestActionNWTos(TestStruct): +class TestActionNWTos(TestMsgDumpFile): """ActionNWTos message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_nw_tos') - super().set_raw_dump_object(ActionNWTos, - nw_tos_type=ActionType.OFPAT_SET_NW_SRC, - nw_tos=123456) - super().set_minimum_size(8) + dumpfile = 'v0x01/ofpt_action_nw_tos.dat' + obj = ActionNWTos(action_type=ActionType.OFPAT_SET_NW_SRC, + nw_tos=123456) + min_size = 8 -class TestActionTPPort(TestStruct): +class TestActionTPPort(TestMsgDumpFile): """ActionTPPort message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_tp_port') - super().set_raw_dump_object(ActionTPPort, - tp_port_type=ActionType.OFPAT_SET_TP_SRC, - tp_port=8888) - super().set_minimum_size(8) + dumpfile = 'v0x01/ofpt_action_tp_port.dat' + obj = ActionTPPort(action_type=ActionType.OFPAT_SET_TP_SRC, + tp_port=8888) + min_size = 8 -class TestActionVendorHeader(TestStruct): +class TestActionVendorHeader(TestMsgDumpFile): """ActionVendorHeader message tests (also those in :class:`.TestDump`).""" - @classmethod - def setUpClass(cls): - """Configure raw file and its object in parent class (TestDump).""" - super().setUpClass() - super().set_raw_dump_file('v0x01', 'ofpt_action_vendor_header') - super().set_raw_dump_object(ActionVendorHeader, length=16, vendor=1) - super().set_minimum_size(8) + dumpfile = 'v0x01/ofpt_action_vendor_header.dat' + obj = ActionVendorHeader(length=16, vendor=1) + min_size = 8 diff --git a/tests/v0x01/test_common/test_flow_match.py b/tests/v0x01/test_common/test_flow_match.py index 4c804c667..5d43dc6cf 100644 --- a/tests/v0x01/test_common/test_flow_match.py +++ b/tests/v0x01/test_common/test_flow_match.py @@ -1,45 +1,38 @@ """Testing FlowMatch structure.""" -import unittest +from pyof.foundation.basic_types import HWAddress, IPAddress +from pyof.v0x01.common.flow_match import Match +from tests.test_struct import TestStructDump -from pyof.v0x01.common import flow_match - -class TestMatch(unittest.TestCase): +class TestMatch(TestStructDump): """Test Match structure.""" - def setUp(self): - """Basic setup for test.""" - self.message = flow_match.Match() - self.message.in_port = 22 - self.message.dl_src = [1, 2, 3, 4, 5, 6] - self.message.dl_dst = [1, 2, 3, 4, 5, 6] - self.message.dl_vlan = 1 - self.message.dl_vlan_pcp = 1 - self.message.dl_type = 1 - self.message.nw_tos = 1 - self.message.nw_proto = 1 - self.message.nw_src = [192, 168, 0, 1] - self.message.nw_dst = [192, 168, 0, 2] - self.message.tp_src = 22 - self.message.tp_dst = 22 - - def test_get_size(self): - """[Common/FlowMatch] - size 40.""" - self.assertEqual(self.message.get_size(), 40) - - def test_pack_unpack(self): - """[Common/FlowMatch] - packing and unpacking.""" - pack = self.message.pack() - unpacked = flow_match.Match() - unpacked.unpack(pack) - self.assertEqual(self.message.pack(), unpacked.pack()) - - @unittest.skip('Not yet implemented') - def test_pack(self): - """[Common/FlowMatch] - packing.""" - pass + dump = b'\x00\x0f\xff\x00\x00\x16\x01\x02\x03\x04\x05\x06\x01\x02\x03\x04' + dump += b'\x05\x06\x00\x01\x01\x00\x00\x01\x01\x01\x00\x00\xc0\xa8\x00\x01' + dump += b'\xc0\xa8\x00\x02\x00\x16\x00\x16' # needs to be checked + obj = Match(in_port=22, + dl_src=[1, 2, 3, 4, 5, 6], + dl_dst=[1, 2, 3, 4, 5, 6], + dl_vlan=1, + dl_vlan_pcp=1, + dl_type=1, + nw_tos=1, + nw_proto=1, + nw_src=[192, 168, 0, 1], + nw_dst=[192, 168, 0, 2], + tp_src=22, + tp_dst=22) + + +class TestMatch2(TestStructDump): + """Test Match structure.""" - @unittest.skip('Not yet implemented') - def test_unpack(self): - """[Common/FlowMatch] - unpacking.""" - pass + dump = b'\x00\x0f\xff\x0c\x00P\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + dump += b'\x00\x00\x00\x01\x01\x00\x00\x01\x01\x01\x00\x00\xc0\xa8\x00' + dump += b'\x01\xc0\xa8\x00\x02\x00P\x00P' + obj = Match(in_port=80, dl_vlan=1, dl_vlan_pcp=1, dl_type=1, + nw_tos=1, nw_proto=1, tp_src=80, tp_dst=80, + dl_src=HWAddress('00:00:00:00:00:00'), + dl_dst=HWAddress('00:00:00:00:00:00'), + nw_src=IPAddress('192.168.0.1'), + nw_dst=IPAddress('192.168.0.2')) diff --git a/tests/v0x01/test_common/test_header.py b/tests/v0x01/test_common/test_header.py index 3c01fd80c..22a39cdd0 100644 --- a/tests/v0x01/test_common/test_header.py +++ b/tests/v0x01/test_common/test_header.py @@ -1,52 +1,27 @@ """Testing Header structure.""" -import os import unittest -from unittest.mock import patch +from pyof.foundation.exceptions import PackException from pyof.v0x01.common.header import Header, Type +from tests.test_struct import TestStructDump -class TestHeader(unittest.TestCase): +class TestHeader(TestStructDump): """Test the message Header.""" - def setUp(self): - """Setup the TestHeader Class instantiating a HELLO header.""" - self.message = Header() - self.message.message_type = Type.OFPT_HELLO - self.message.xid = 1 - self.message.length = 0 - - def test_size(self): - """[Common/Header] - size 8.""" - self.assertEqual(self.message.get_size(), 8) + dump = b'\x01\x00\x00\x08\x00\x00\x00\x01' + obj = Header(message_type=Type.OFPT_HELLO, + xid=1, + length=8) @unittest.expectedFailure def test_pack_empty(self): """[Common/Header] - packing empty header.""" - self.assertRaises(TypeError, + self.assertRaises(PackException, Header().pack()) - def test_pack(self): - """[Common/Header] - packing Hello.""" - packed_header = b'\x01\x00\x00\x00\x00\x00\x00\x01' - self.assertEqual(self.message.pack(), packed_header) - - def test_unpack(self): - """[Common/Header] - unpacking Hello.""" - filename = os.path.join(os.path.dirname(os.path.realpath('__file__')), - 'raw/v0x01/ofpt_hello.dat') - f = open(filename, 'rb') - self.message.unpack(f.read(8)) - - self.assertEqual(self.message.length, 8) - self.assertEqual(self.message.xid, 1) - self.assertEqual(self.message.message_type, Type.OFPT_HELLO) - self.assertEqual(self.message.version, 1) - - f.close() - - @patch('pyof.v0x01.common.header.randint') - def test_random_xid(self, m): - """Each Header instantiations without xid should call randint.""" - Header(), Header() # noqa - self.assertEqual(m.call_count, 2) + # @patch('pyof.v0x01.common.header.randint') + # def test_random_xid(self, m): + # """Each Header instantiations without xid should call randint.""" + # Header(), Header() # noqa + # self.assertEqual(m.call_count, 2) diff --git a/tests/v0x01/test_common/test_phy_port.py b/tests/v0x01/test_common/test_phy_port.py index 35cd83ff8..c7f8a3c12 100644 --- a/tests/v0x01/test_common/test_phy_port.py +++ b/tests/v0x01/test_common/test_phy_port.py @@ -1,37 +1,23 @@ """Testing PhyPort structure.""" -from unittest import TestCase, skip - from pyof.foundation.constants import OFP_MAX_PORT_NAME_LEN from pyof.v0x01.common.phy_port import ( PhyPort, PortConfig, PortFeatures, PortState) +from tests.test_struct import TestStructDump -class TestPhyPort(TestCase): - """Test PhyPort.""" - - def setUp(self): - """Basic setup for test.""" - self.message = PhyPort() - self.message.port_no = 2 - self.message.hw_addr = '1a:2b:3c:4d:5e:6f' - self.message.name = bytes('X' * OFP_MAX_PORT_NAME_LEN, 'utf-8') - self.message.config = PortConfig.OFPPC_NO_STP - self.message.state = PortState.OFPPS_STP_FORWARD - self.message.curr = PortFeatures.OFPPF_10GB_FD - self.message.advertised = PortFeatures.OFPPF_PAUSE - self.message.supported = PortFeatures.OFPPF_AUTONEG - self.message.peer = PortFeatures.OFPPF_AUTONEG - - def test_get_size(self): - """[Common/PhyPort] - size 48.""" - self.assertEqual(self.message.get_size(), 48) +class TestPhyPort(TestStructDump): + """Test PhyPort class.""" - @skip('Not yet implemented') - def test_pack(self): - """[Common/PhyPort] - packing.""" - pass + dump = b'\x00\x02\x1a+