diff --git a/config/main.py b/config/main.py index 5fc2c0be74..3fa836fbce 100644 --- a/config/main.py +++ b/config/main.py @@ -62,6 +62,14 @@ CFG_LOOPBACK_NO="<0-999>" RELOAD_PIPE_FILE = "/tmp/reload.txt" +CFG_PORTCHANNEL_PREFIX = "PortChannel" +CFG_PORTCHANNEL_PREFIX_LEN = 11 +CFG_PORTCHANNEL_NAME_TOTAL_LEN_MAX = 15 +CFG_PORTCHANNEL_MAX_VAL = 9999 +CFG_PORTCHANNEL_NO="<0-9999>" + +PORT_MTU = "mtu" +PORT_SPEED = "speed" asic_type = None child_pid = 0 @@ -393,6 +401,45 @@ def is_interface_bind_to_vrf(config_db, interface_name): return True return False +def is_portchannel_name_valid(portchannel_name): + """Port channel name validation + """ + + # Return True if Portchannel name is PortChannelXXXX (XXXX can be 0-9999) + if portchannel_name[:CFG_PORTCHANNEL_PREFIX_LEN] != CFG_PORTCHANNEL_PREFIX : + return False + if (portchannel_name[CFG_PORTCHANNEL_PREFIX_LEN:].isdigit() is False or + int(portchannel_name[CFG_PORTCHANNEL_PREFIX_LEN:]) > CFG_PORTCHANNEL_MAX_VAL) : + return False + if len(portchannel_name) > CFG_PORTCHANNEL_NAME_TOTAL_LEN_MAX: + return False + return True + +def is_portchannel_present_in_db(db, portchannel_name): + """Check if Portchannel is present in Config DB + """ + + # Return True if Portchannel name exists in the CONFIG_DB + portchannel_list = db.get_table(CFG_PORTCHANNEL_PREFIX) + if portchannel_list is None: + return False + if portchannel_name in portchannel_list: + return True + return False + +def is_port_member_of_this_portchannel(db, port_name, portchannel_name): + """Check if a port is member of given portchannel + """ + portchannel_list = db.get_table(CFG_PORTCHANNEL_PREFIX) + if portchannel_list is None: + return False + + for k,v in db.get_table('PORTCHANNEL_MEMBER'): + if (k == portchannel_name) and (v == port_name): + return True + + return False + # Return the namespace where an interface belongs # The port name input could be in default mode or in alias mode. def get_port_namespace(port): @@ -1418,9 +1465,64 @@ def add_portchannel_member(ctx, portchannel_name, port_name): ctx.fail("{} is configured as mirror destination port".format(port_name)) # Check if the member interface given by user is valid in the namespace. - if interface_name_is_valid(db, port_name) is False: + if port_name.startswith("Ethernet") is False or interface_name_is_valid(db, port_name) is False: ctx.fail("Interface name is invalid. Please enter a valid interface name!!") + # Dont proceed if the port channel name is not valid + if is_portchannel_name_valid(portchannel_name) is False: + ctx.fail("{} is invalid!, name should have prefix '{}' and suffix '{}'" + .format(portchannel_name, CFG_PORTCHANNEL_PREFIX, CFG_PORTCHANNEL_NO)) + + # Dont proceed if the port channel does not exist + if is_portchannel_present_in_db(db, portchannel_name) is False: + ctx.fail("{} is not present.".format(portchannel_name)) + + # Dont allow a port to be member of port channel if it is configured with an IP address + for key in db.get_table('INTERFACE').keys(): + if type(key) != tuple: + continue + if key[0] == port_name: + ctx.fail(" {} has ip address {} configured".format(port_name, key[1])) + return + + # Dont allow a port to be member of port channel if it is configured as a VLAN member + for k,v in db.get_table('VLAN_MEMBER'): + if v == port_name: + ctx.fail("%s Interface configured as VLAN_MEMBER under vlan : %s" %(port_name,str(k))) + return + + # Dont allow a port to be member of port channel if it is already member of a port channel + for k,v in db.get_table('PORTCHANNEL_MEMBER'): + if v == port_name: + ctx.fail("{} Interface is already member of {} ".format(v,k)) + + # Dont allow a port to be member of port channel if its speed does not match with existing members + for k,v in db.get_table('PORTCHANNEL_MEMBER'): + if k == portchannel_name: + member_port_entry = db.get_entry('PORT', v) + port_entry = db.get_entry('PORT', port_name) + + if member_port_entry is not None and port_entry is not None: + member_port_speed = member_port_entry.get(PORT_SPEED) + + port_speed = port_entry.get(PORT_SPEED) + if member_port_speed != port_speed: + ctx.fail("Port speed of {} is different than the other members of the portchannel {}" + .format(port_name, portchannel_name)) + + # Dont allow a port to be member of port channel if its MTU does not match with portchannel + portchannel_entry = db.get_entry('PORTCHANNEL', portchannel_name) + if portchannel_entry and portchannel_entry.get(PORT_MTU) is not None : + port_entry = db.get_entry('PORT', port_name) + + if port_entry and port_entry.get(PORT_MTU) is not None: + port_mtu = port_entry.get(PORT_MTU) + + portchannel_mtu = portchannel_entry.get(PORT_MTU) + if portchannel_mtu != port_mtu: + ctx.fail("Port MTU of {} is different than the {} MTU size" + .format(port_name, portchannel_name)) + db.set_entry('PORTCHANNEL_MEMBER', (portchannel_name, port_name), {'NULL': 'NULL'}) @@ -1430,12 +1532,25 @@ def add_portchannel_member(ctx, portchannel_name, port_name): @click.pass_context def del_portchannel_member(ctx, portchannel_name, port_name): """Remove member from portchannel""" + # Dont proceed if the port channel name is not valid + if is_portchannel_name_valid(portchannel_name) is False: + ctx.fail("{} is invalid!, name should have prefix '{}' and suffix '{}'" + .format(portchannel_name, CFG_PORTCHANNEL_PREFIX, CFG_PORTCHANNEL_NO)) + db = ctx.obj['db'] # Check if the member interface given by user is valid in the namespace. if interface_name_is_valid(db, port_name) is False: ctx.fail("Interface name is invalid. Please enter a valid interface name!!") + # Dont proceed if the port channel does not exist + if is_portchannel_present_in_db(db, portchannel_name) is False: + ctx.fail("{} is not present.".format(portchannel_name)) + + # Dont proceed if the the port is not an existing member of the port channel + if not is_port_member_of_this_portchannel(db, port_name, portchannel_name): + ctx.fail("{} is not a member of portchannel {}".format(port_name, portchannel_name)) + db.set_entry('PORTCHANNEL_MEMBER', (portchannel_name, port_name), None) db.set_entry('PORTCHANNEL_MEMBER', portchannel_name + '|' + port_name, None) @@ -2498,6 +2613,10 @@ def mtu(ctx, interface_name, interface_mtu, verbose): if interface_name is None: ctx.fail("'interface_name' is None!") + portchannel_member_table = config_db.get_table('PORTCHANNEL_MEMBER') + if interface_is_in_portchannel(portchannel_member_table, interface_name): + ctx.fail("'interface_name' is in portchannel!") + if ctx.obj['namespace'] is DEFAULT_NAMESPACE: command = "portconfig -p {} -m {}".format(interface_name, interface_mtu) else: diff --git a/tests/mock_tables/config_db.json b/tests/mock_tables/config_db.json index fde2f7dc47..21676feb57 100644 --- a/tests/mock_tables/config_db.json +++ b/tests/mock_tables/config_db.json @@ -546,6 +546,12 @@ "PORTCHANNEL_INTERFACE|PortChannel0004|FC00::7D/126": { "NULL": "NULL" }, + "INTERFACE|Ethernet0": { + "NULL": "NULL" + }, + "INTERFACE|Ethernet0|14.14.0.1/24": { + "NULL": "NULL" + }, "DEBUG_COUNTER|DEBUG_0": { "type": "PORT_INGRESS_DROPS" }, diff --git a/tests/portchannel_test.py b/tests/portchannel_test.py new file mode 100644 index 0000000000..a07e205392 --- /dev/null +++ b/tests/portchannel_test.py @@ -0,0 +1,115 @@ +import os +import traceback + +from click.testing import CliRunner + +import config.main as config +import show.main as show +from utilities_common.db import Db + +class TestPortChannel(object): + @classmethod + def setup_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "1" + print("SETUP") + + def test_add_portchannel_member_with_invalid_name(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add a portchannel member with invalid portchannel name + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["add"], ["PortChan005", "Ethernet0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: PortChan005 is invalid!, name should have prefix 'PortChannel' and suffix '<0-9999>'" in result.output + + def test_delete_portchannel_member_with_invalid_name(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # delete a portchannel member with invalid portchannel name + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["del"], ["PortChan005", "Ethernet0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: PortChan005 is invalid!, name should have prefix 'PortChannel' and suffix '<0-9999>'" in result.output + + def test_add_non_existing_portchannel_member(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add a portchannel member with portchannel is not yet created + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["add"], ["PortChannel0005", "Ethernet0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: PortChannel0005 is not present." in result.output + + def test_delete_non_existing_portchannel_member(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # delete a portchannel member with portchannel is not yet created + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["del"], ["PortChannel0005", "Ethernet0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: PortChannel0005 is not present." in result.output + + def test_add_portchannel_member_which_has_ipaddress(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add a portchannel member with port which has ip-address + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["add"], ["PortChannel1001", "Ethernet0"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: Ethernet0 has ip address 14.14.0.1/24 configured" in result.output + + def test_add_portchannel_member_which_is_member_of_vlan(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add a portchannel member with port which is member of Vlan + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["add"], ["PortChannel1001", "Ethernet24"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: Ethernet24 Interface configured as VLAN_MEMBER under vlan : Vlan2000" in result.output + + def test_add_portchannel_member_which_is_member_of_another_po(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # add a portchannel member with port which is member of another PO + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["add"], ["PortChannel1001", "Ethernet116"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: Ethernet116 Interface is already member of PortChannel0002 " in result.output + + def test_delete_portchannel_member_which_is_member_of_another_po(self): + runner = CliRunner() + db = Db() + obj = {'db':db.cfgdb} + + # delete a portchannel member with port which is member of another PO + result = runner.invoke(config.config.commands["portchannel"].commands["member"].commands["del"], ["PortChannel1001", "Ethernet116"], obj=obj) + print(result.exit_code) + print(result.output) + assert result.exit_code != 0 + assert "Error: Ethernet116 is not a member of portchannel PortChannel1001" in result.output + + @classmethod + def teardown_class(cls): + os.environ['UTILITIES_UNIT_TESTING'] = "0" + print("TEARDOWN")