diff --git a/ansible/group_vars/lab/lab.yml b/ansible/group_vars/lab/lab.yml
index 002893f454..db55c15e02 100644
--- a/ansible/group_vars/lab/lab.yml
+++ b/ansible/group_vars/lab/lab.yml
@@ -39,6 +39,9 @@ snmp_servers: ['10.0.0.9']
# dhcp relay servers
dhcp_servers: ['192.0.0.1', '192.0.0.2', '192.0.0.3', '192.0.0.4']
+# dhcpv6 relay servers
+dhcpv6_servers: ['fc02:2000::1', 'fc02:2000::2', 'fc02:2000::3', 'fc02:2000::4']
+
# snmp variables
snmp_rocommunity: public
snmp_location: testlab
diff --git a/ansible/library/minigraph_facts.py b/ansible/library/minigraph_facts.py
index ba9f6edc3a..437c8fd93b 100644
--- a/ansible/library/minigraph_facts.py
+++ b/ansible/library/minigraph_facts.py
@@ -363,6 +363,7 @@ def parse_dpg(dpg, hname):
vlanintfs = child.find(str(QName(ns, "VlanInterfaces")))
dhcp_servers = []
+ dhcpv6_servers = []
vlans = {}
for vintf in vlanintfs.findall(str(QName(ns, "VlanInterface"))):
vintfname = vintf.find(str(QName(ns, "Name"))).text
@@ -375,6 +376,12 @@ def parse_dpg(dpg, hname):
else:
vlandhcpservers = ""
dhcp_servers = vlandhcpservers.split(";")
+ vintf_node = vintf.find(str(QName(ns, "Dhcpv6Relays")))
+ if vintf_node is not None and vintf_node.text is not None:
+ vlandhcpservers = vintf_node.text
+ else:
+ vlandhcpservers = ""
+ dhcpv6_servers = vlandhcpservers.split(";")
for i, member in enumerate(vmbr_list):
# Skip PortChannel inside Vlan
if member in pcs:
@@ -402,7 +409,7 @@ def parse_dpg(dpg, hname):
if acl_intfs:
acls[aclname] = acl_intfs
- return intfs, lo_intfs, mgmt_intf, vlans, pcs, acls, dhcp_servers
+ return intfs, lo_intfs, mgmt_intf, vlans, pcs, acls, dhcp_servers, dhcpv6_servers
return None, None, None, None, None, None, None
def parse_cpg(cpg, hname):
@@ -578,6 +585,7 @@ def parse_xml(filename, hostname, asic_name=None):
hostname = None
syslog_servers = []
dhcp_servers = []
+ dhcpv6_servers = []
ntp_servers = []
mgmt_routes = []
bgp_peers_with_range = []
@@ -608,7 +616,7 @@ def parse_xml(filename, hostname, asic_name=None):
for child in root:
if asic_name is None:
if child.tag == str(QName(ns, "DpgDec")):
- (intfs, lo_intfs, mgmt_intf, vlans, pcs, acls, dhcp_servers) = parse_dpg(child, hostname)
+ (intfs, lo_intfs, mgmt_intf, vlans, pcs, acls, dhcp_servers, dhcpv6_servers) = parse_dpg(child, hostname)
elif child.tag == str(QName(ns, "CpgDec")):
(bgp_sessions, bgp_asn, bgp_peers_with_range) = parse_cpg(child, hostname)
elif child.tag == str(QName(ns, "PngDec")):
@@ -619,7 +627,7 @@ def parse_xml(filename, hostname, asic_name=None):
(syslog_servers, ntp_servers, mgmt_routes, deployment_id) = parse_meta(child, hostname)
else:
if child.tag == str(QName(ns, "DpgDec")):
- (intfs, lo_intfs, mgmt_intf, vlans, pcs, acls, dhcp_servers) = parse_dpg(child, asic_name)
+ (intfs, lo_intfs, mgmt_intf, vlans, pcs, acls, dhcp_servers, dhcpv6_servers) = parse_dpg(child, asic_name)
host_lo_intfs = parse_host_loopback(child, hostname)
elif child.tag == str(QName(ns, "CpgDec")):
(bgp_sessions, bgp_asn, bgp_peers_with_range) = parse_cpg(child, asic_name)
@@ -700,6 +708,7 @@ def parse_xml(filename, hostname, asic_name=None):
results['minigraph_mgmt'] = get_mgmt_info(devices, mgmt_dev, mgmt_port)
results['syslog_servers'] = syslog_servers
results['dhcp_servers'] = dhcp_servers
+ results['dhcpv6_servers'] = dhcpv6_servers
results['ntp_servers'] = ntp_servers
results['forced_mgmt_routes'] = mgmt_routes
results['deployment_id'] = deployment_id
diff --git a/ansible/roles/test/files/ptftests/dhcpv6_relay_test.py b/ansible/roles/test/files/ptftests/dhcpv6_relay_test.py
new file mode 100644
index 0000000000..9fc96bfecc
--- /dev/null
+++ b/ansible/roles/test/files/ptftests/dhcpv6_relay_test.py
@@ -0,0 +1,306 @@
+import ast
+import subprocess
+
+# Packet Test Framework imports
+import ptf
+import ptf.packet as packet
+import ptf.testutils as testutils
+from ptf import config
+from ptf.base_tests import BaseTest
+from ptf.mask import Mask
+
+IPv6 = scapy.layers.inet6.IPv6
+
+class DataplaneBaseTest(BaseTest):
+ def __init__(self):
+ BaseTest.__init__(self)
+
+ def setUp(self):
+ self.dataplane = ptf.dataplane_instance
+ self.dataplane.flush()
+ if config["log_dir"] is not None:
+ filename = os.path.join(config["log_dir"], str(self)) + ".pcap"
+ self.dataplane.start_pcap(filename)
+
+ def tearDown(self):
+ if config["log_dir"] is not None:
+ self.dataplane.stop_pcap()
+
+"""
+ This test simulates a new host booting up on the VLAN network of a ToR and
+ requesting an IPv6 address via DHCPv6. Setup is as follows:
+ - DHCP client is simulated by listening/sending on an interface connected to VLAN of ToR.
+ - DHCP server is simulated by listening/sending on injected PTF interfaces which link
+ ToR to leaves. This way we can listen for traffic sent from DHCP relay out to would-be DHCPv6 servers
+
+ This test performs the following functionality:
+ 1.) Simulated client broadcasts a DHCPv6 SOLICIT message.
+ 2.) Verify DHCP relay running on ToR receives the DHCPv6 SOLICIT message and send a DHCPv6 RELAY-FORWARD
+ message encapsulating the client DHCPv6 SOLICIT message and relays it to all of its known DHCP servers.
+ 3.) Simulate DHCPv6 RELAY-REPLY message send from a DHCP server to the ToR encapsulating DHCPv6 ADVERTISE message.
+ 4.) Verify DHCP relay receives the DHCPv6 RELAY-REPLY message decapsulate it and forwards DHCPv6 ADVERTISE
+ message to our simulated client.
+ 5.) Simulated client broadcasts a DHCPv6 REQUEST message.
+ 6.) Verify DHCP relay running on ToR receives the DHCPv6 REQUEST message and send a DHCPv6 RELAY-FORWARD
+ message encapsulating the client DHCPv6 REQUEST message and relays it to all of its known DHCP servers.
+ 7.) Simulate DHCPv6 RELAY-REPLY message send from a DHCP server to the ToR encapsulating DHCPv6 REPLY message.
+ 8.) Verify DHCP relay receives the DHCPv6 RELAY-REPLY message decapsulate it and forwards DHCPv6 REPLY
+ message to our simulated client.
+
+"""
+
+class DHCPTest(DataplaneBaseTest):
+
+ BROADCAST_MAC = '33:33:00:01:00:02'
+ BROADCAST_IP = 'ff02::1:2'
+ DHCP_CLIENT_PORT = 546
+ DHCP_SERVER_PORT = 547
+
+ def __init__(self):
+ self.test_params = testutils.test_params_get()
+ self.client_port_index = int(self.test_params['client_port_index'])
+ self.client_link_local = self.generate_client_interace_ipv6_link_local_address(self.client_port_index)
+
+ DataplaneBaseTest.__init__(self)
+
+ def setUp(self):
+ DataplaneBaseTest.setUp(self)
+ self.hostname = self.test_params['hostname']
+
+ # These are the interfaces we are injected into that link to out leaf switches
+ self.server_port_indices = ast.literal_eval(self.test_params['leaf_port_indices'])
+ self.num_dhcp_servers = int(self.test_params['num_dhcp_servers'])
+ self.assertTrue(self.num_dhcp_servers > 0,
+ "Error: This test requires at least one DHCP server to be specified!")
+
+ # We will simulate a responding DHCP server on the first interface in the provided set
+ self.server_ip = self.test_params['server_ip']
+
+ self.relay_iface_ip = self.test_params['relay_iface_ip']
+ self.relay_iface_mac = self.test_params['relay_iface_mac']
+ self.relay_link_local = self.test_params['relay_link_local']
+
+ self.vlan_ip = self.test_params['vlan_ip']
+
+ self.client_mac = self.dataplane.get_mac(0, self.client_port_index)
+
+ def generate_client_interace_ipv6_link_local_address(self, client_port_index):
+ # Shutdown and startup the client interface to generate a proper IPv6 link-local address
+ command = "ifconfig eth{} down".format(client_port_index)
+ proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
+ proc.communicate()
+
+ command = "ifconfig eth{} up".format(client_port_index)
+ proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
+ proc.communicate()
+
+ command = "ip addr show eth{} | grep inet6 | grep 'scope link' | awk '{{print $2}}' | cut -d '/' -f1".format(client_port_index)
+ proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+
+ return stdout.strip()
+
+ def tearDown(self):
+ DataplaneBaseTest.tearDown(self)
+
+
+ """
+ Packet generation functions/wrappers
+
+ """
+
+ def create_dhcp_solicit_packet(self):
+
+ solicit_packet = Ether(src=self.client_mac, dst=self.BROADCAST_MAC)
+ solicit_packet /= IPv6(src=self.client_link_local, dst=self.BROADCAST_IP)
+ solicit_packet /= UDP(sport=self.DHCP_CLIENT_PORT, dport=self.DHCP_SERVER_PORT)
+ solicit_packet /= DHCP6_Solicit(trid=12345)
+
+ return solicit_packet
+
+ def create_dhcp_solicit_relay_forward_packet(self):
+
+ solicit_relay_forward_packet = Ether(src=self.relay_iface_mac)
+ solicit_relay_forward_packet /= IPv6()
+ solicit_relay_forward_packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
+ solicit_relay_forward_packet /= DHCP6_RelayForward(msgtype=12, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)
+ solicit_relay_forward_packet /= DHCP6OptRelayMsg()
+ solicit_relay_forward_packet /= DHCP6_Solicit(trid=12345)
+
+ return solicit_relay_forward_packet
+
+ def create_dhcp_advertise_packet(self):
+
+ advertise_packet = Ether(src=self.relay_iface_mac, dst=self.client_mac)
+ advertise_packet /= IPv6(src=self.relay_link_local, dst=self.client_link_local)
+ advertise_packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_CLIENT_PORT)
+ advertise_packet /= DHCP6_Advertise(trid=12345)
+
+ return advertise_packet
+
+ def create_dhcp_advertise_relay_reply_packet(self):
+
+ advertise_relay_reply_packet = Ether(dst=self.relay_iface_mac)
+ advertise_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
+ advertise_relay_reply_packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
+ advertise_relay_reply_packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)
+ advertise_relay_reply_packet /= DHCP6OptRelayMsg()
+ advertise_relay_reply_packet /= DHCP6_Advertise(trid=12345)
+
+ return advertise_relay_reply_packet
+
+ def create_dhcp_request_packet(self):
+
+ request_packet = Ether(src=self.client_mac, dst=self.BROADCAST_MAC)
+ request_packet /= IPv6(src=self.client_link_local, dst=self.BROADCAST_IP)
+ request_packet /= UDP(sport=self.DHCP_CLIENT_PORT, dport=self.DHCP_SERVER_PORT)
+ request_packet /= DHCP6_Request(trid=12345)
+
+ return request_packet
+
+ def create_dhcp_request_relay_forward_packet(self):
+
+ request_relay_forward_packet = Ether(src=self.relay_iface_mac)
+ request_relay_forward_packet /= IPv6()
+ request_relay_forward_packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
+ request_relay_forward_packet /= DHCP6_RelayForward(msgtype=12, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)
+ request_relay_forward_packet /= DHCP6OptRelayMsg()
+ request_relay_forward_packet /= DHCP6_Request(trid=12345)
+
+ return request_relay_forward_packet
+
+ def create_dhcp_reply_packet(self):
+
+ reply_packet = Ether(src=self.relay_iface_mac, dst=self.client_mac)
+ reply_packet /= IPv6(src=self.relay_link_local, dst=self.client_link_local)
+ reply_packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_CLIENT_PORT)
+ reply_packet /= DHCP6_Reply(trid=12345)
+
+ return reply_packet
+
+ def create_dhcp_reply_relay_reply_packet(self):
+
+ reply_relay_reply_packet = Ether(dst=self.relay_iface_mac)
+ reply_relay_reply_packet /= IPv6(src=self.server_ip, dst=self.relay_iface_ip)
+ reply_relay_reply_packet /= UDP(sport=self.DHCP_SERVER_PORT, dport=self.DHCP_SERVER_PORT)
+ reply_relay_reply_packet /= DHCP6_RelayReply(msgtype=13, linkaddr=self.vlan_ip, peeraddr=self.client_link_local)
+ reply_relay_reply_packet /= DHCP6OptRelayMsg()
+ reply_relay_reply_packet /= DHCP6_Reply(trid=12345)
+
+ return reply_relay_reply_packet
+
+
+ """
+ Send/receive functions
+
+ """
+
+ # Simulate client connecting on VLAN and broadcasting a DHCPv6 SOLICIT message
+ def client_send_solicit(self):
+ # Form and send DHCPv6 SOLICIT packet
+ solicit_packet = self.create_dhcp_solicit_packet()
+ testutils.send_packet(self, self.client_port_index, solicit_packet)
+
+ # Verify that the DHCP relay actually received and relayed the DHCPv6 SOLICIT message to all of
+ # its known DHCP servers.
+ def verify_relayed_solicit_relay_forward(self):
+ # Create a packet resembling a DHCPv6 RELAY-FORWARD encapsulating SOLICIT packet
+ solicit_relay_forward_packet = self.create_dhcp_solicit_relay_forward_packet()
+
+ # Mask off fields we don't care about matching
+ masked_packet = Mask(solicit_relay_forward_packet)
+ masked_packet.set_do_not_care_scapy(packet.Ether, "dst")
+ masked_packet.set_do_not_care_scapy(IPv6, "src")
+ masked_packet.set_do_not_care_scapy(IPv6, "dst")
+ masked_packet.set_do_not_care_scapy(IPv6, "fl")
+ masked_packet.set_do_not_care_scapy(IPv6, "tc")
+ masked_packet.set_do_not_care_scapy(IPv6, "plen")
+ masked_packet.set_do_not_care_scapy(IPv6, "nh")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "len")
+
+ # Count the number of these packets received on the ports connected to our leaves
+ solicit_count = testutils.count_matched_packets_all_ports(self, masked_packet, self.server_port_indices)
+ self.assertTrue(solicit_count >= 1,
+ "Failed: Solicit count of %d" % (solicit_count))
+
+ # Simulate a DHCP server sending a DHCPv6 RELAY-REPLY encapsulating ADVERTISE packet message to client.
+ # We do this by injecting a RELAY-REPLY encapsulating ADVERTISE message on the link connected to one
+ # of our leaf switches.
+ def server_send_advertise_relay_reply(self):
+ # Form and send DHCPv6 RELAY-REPLY encapsulating ADVERTISE packet
+ advertise_relay_reply_packet = self.create_dhcp_advertise_relay_reply_packet()
+ testutils.send_packet(self, self.server_port_indices[0], advertise_relay_reply_packet)
+
+ # Verify that the DHCPv6 ADVERTISE would be received by our simulated client
+ def verify_relayed_advertise(self):
+ # Create a packet resembling a DHCPv6 ADVERTISE packet
+ advertise_packet = self.create_dhcp_advertise_packet()
+
+ # Mask off fields we don't care about matching
+ masked_packet = Mask(advertise_packet)
+ masked_packet.set_do_not_care_scapy(IPv6, "fl")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "len")
+
+ # NOTE: verify_packet() will fail for us via an assert, so no need to check a return value here
+ testutils.verify_packet(self, masked_packet, self.client_port_index)
+
+ # Simulate our client sending a DHCPv6 REQUEST message
+ def client_send_request(self):
+ # Form and send DHCPv6 REQUEST packet
+ request_packet = self.create_dhcp_request_packet()
+ testutils.send_packet(self, self.client_port_index, request_packet)
+
+ # Verify that the DHCP relay actually received and relayed the DHCPv6 REQUEST message to all of
+ # its known DHCP servers.
+ def verify_relayed_request_relay_forward(self):
+ # Create a packet resembling a DHCPv6 RELAY-FORWARD encapsulating REQUEST packet
+ request_relay_forward_packet = self.create_dhcp_request_relay_forward_packet()
+
+ # Mask off fields we don't care about matching
+ masked_packet = Mask(request_relay_forward_packet)
+ masked_packet.set_do_not_care_scapy(packet.Ether, "dst")
+ masked_packet.set_do_not_care_scapy(IPv6, "src")
+ masked_packet.set_do_not_care_scapy(IPv6, "dst")
+ masked_packet.set_do_not_care_scapy(IPv6, "fl")
+ masked_packet.set_do_not_care_scapy(IPv6, "tc")
+ masked_packet.set_do_not_care_scapy(IPv6, "plen")
+ masked_packet.set_do_not_care_scapy(IPv6, "nh")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "len")
+
+ # Count the number of these packets received on the ports connected to our leaves
+ request_count = testutils.count_matched_packets_all_ports(self, masked_packet, self.server_port_indices)
+ self.assertTrue(request_count >= 1,
+ "Failed: Request count of %d" % (request_count))
+
+ # Simulate a DHCP server sending a DHCPv6 RELAY-REPLY encapsulating REPLY packet message to client.
+ def server_send_reply_relay_reply(self):
+ # Form and send DHCPv6 RELAY-REPLY encapsulating REPLY packet
+ reply_relay_reply_packet = self.create_dhcp_reply_relay_reply_packet()
+ testutils.send_packet(self, self.server_port_indices[0], reply_relay_reply_packet)
+
+ # Verify that the DHCPv6 REPLY would be received by our simulated client
+ def verify_relayed_reply(self):
+ # Create a packet resembling a DHCPv6 REPLY packet
+ reply_packet = self.create_dhcp_reply_packet()
+
+ # Mask off fields we don't care about matching
+ masked_packet = Mask(reply_packet)
+ masked_packet.set_do_not_care_scapy(IPv6, "fl")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "chksum")
+ masked_packet.set_do_not_care_scapy(packet.UDP, "len")
+
+ # NOTE: verify_packet() will fail for us via an assert, so no need to check a return value here
+ testutils.verify_packet(self, masked_packet, self.client_port_index)
+
+ def runTest(self):
+ self.client_send_solicit()
+ self.verify_relayed_solicit_relay_forward()
+ self.server_send_advertise_relay_reply()
+ self.verify_relayed_advertise()
+ self.client_send_request()
+ self.verify_relayed_request_relay_forward()
+ self.server_send_reply_relay_reply()
+ self.verify_relayed_reply()
diff --git a/ansible/templates/minigraph_dpg.j2 b/ansible/templates/minigraph_dpg.j2
index 4af2f5ff7d..b11f3c4378 100644
--- a/ansible/templates/minigraph_dpg.j2
+++ b/ansible/templates/minigraph_dpg.j2
@@ -130,6 +130,8 @@
{% endif %}
{% set dhcp_servers_str=';'.join(dhcp_servers) %}
{{ dhcp_servers_str }}
+{% set dhcpv6_servers_str=';'.join(dhcpv6_servers) %}
+ {{ dhcpv6_servers_str }}
{{ vlan_param['id'] }}
{{ vlan_param['tag'] }}
{{ vlan_param['prefix'] | ipaddr('network') }}/{{ vlan_param['prefix'] | ipaddr('prefix') }}
diff --git a/tests/dhcp_relay/test_dhcpv6_relay.py b/tests/dhcp_relay/test_dhcpv6_relay.py
new file mode 100644
index 0000000000..e699bfffdb
--- /dev/null
+++ b/tests/dhcp_relay/test_dhcpv6_relay.py
@@ -0,0 +1,214 @@
+import ipaddress
+import pytest
+import random
+import time
+import netaddr
+
+from tests.common.fixtures.ptfhost_utils import copy_ptftests_directory # lgtm[py/unused-import]
+from tests.common.fixtures.ptfhost_utils import change_mac_addresses # lgtm[py/unused-import]
+from tests.ptf_runner import ptf_runner
+
+pytestmark = [
+ pytest.mark.topology('t0'),
+ pytest.mark.device_type('vs')
+]
+
+@pytest.fixture(scope="module")
+def dut_dhcp_relay_data(duthosts, rand_one_dut_hostname, ptfhost, tbinfo):
+ """ Fixture which returns a list of dictionaries where each dictionary contains
+ data necessary to test one instance of a DHCP relay agent running on the DuT.
+ This fixture is scoped to the module, as the data it gathers can be used by
+ all tests in this module. It does not need to be run before each test.
+ """
+ duthost = duthosts[rand_one_dut_hostname]
+ dhcp_relay_data_list = []
+ uplink_interface_link_local = ""
+
+ mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
+
+ # SONiC spawns one DHCP relay agent per VLAN interface configured on the DUT
+ vlan_dict = mg_facts['minigraph_vlans']
+ for vlan_iface_name, vlan_info_dict in vlan_dict.items():
+ # Gather information about the downlink VLAN interface this relay agent is listening on
+ downlink_vlan_iface = {}
+ downlink_vlan_iface['name'] = vlan_iface_name
+
+ for vlan_interface_info_dict in mg_facts['minigraph_vlan_interfaces']:
+ if (vlan_interface_info_dict['attachto'] == vlan_iface_name) and (netaddr.IPAddress(str(vlan_interface_info_dict['addr'])).version == 6):
+ downlink_vlan_iface['addr'] = vlan_interface_info_dict['addr']
+ downlink_vlan_iface['mask'] = vlan_interface_info_dict['mask']
+ break
+
+ # Obtain MAC address of the VLAN interface
+ res = duthost.shell('cat /sys/class/net/{}/address'.format(vlan_iface_name))
+ downlink_vlan_iface['mac'] = res['stdout']
+
+ downlink_vlan_iface['dhcpv6_server_addrs'] = mg_facts['dhcpv6_servers']
+
+ # We choose the physical interface where our DHCP client resides to be index of first interface in the VLAN
+ client_iface = {}
+ client_iface['name'] = vlan_info_dict['members'][0]
+ client_iface['alias'] = mg_facts['minigraph_port_name_to_alias_map'][client_iface['name']]
+ client_iface['port_idx'] = mg_facts['minigraph_ptf_indices'][client_iface['name']]
+
+ # Obtain uplink port indicies for this DHCP relay agent
+ uplink_interfaces = []
+ uplink_port_indices =[]
+ for iface_name, neighbor_info_dict in mg_facts['minigraph_neighbors'].items():
+ if neighbor_info_dict['name'] in mg_facts['minigraph_devices']:
+ neighbor_device_info_dict = mg_facts['minigraph_devices'][neighbor_info_dict['name']]
+ if 'type' in neighbor_device_info_dict and neighbor_device_info_dict['type'] == 'LeafRouter':
+ # If this uplink's physical interface is a member of a portchannel interface,
+ # we record the name of the portchannel interface here, as this is the actual
+ # interface the DHCP relay will listen on.
+ iface_is_portchannel_member = False
+ for portchannel_name, portchannel_info_dict in mg_facts['minigraph_portchannels'].items():
+ if 'members' in portchannel_info_dict and iface_name in portchannel_info_dict['members']:
+ iface_is_portchannel_member = True
+ if portchannel_name not in uplink_interfaces:
+ uplink_interfaces.append(portchannel_name)
+ break
+ # If the uplink's physical interface is not a member of a portchannel, add it to our uplink interfaces list
+ if not iface_is_portchannel_member:
+ uplink_interfaces.append(iface_name)
+ uplink_port_indices.append(mg_facts['minigraph_ptf_indices'][iface_name])
+ if uplink_interface_link_local == "":
+ command = "ip addr show {} | grep inet6 | grep 'scope link' | awk '{{print $2}}' | cut -d '/' -f1".format(uplink_interfaces[0])
+ res = duthost.shell(command)
+ if res['stdout'] != "":
+ uplink_interface_link_local = res['stdout']
+
+ dhcp_relay_data = {}
+ dhcp_relay_data['downlink_vlan_iface'] = downlink_vlan_iface
+ dhcp_relay_data['client_iface'] = client_iface
+ dhcp_relay_data['uplink_interfaces'] = uplink_interfaces
+ dhcp_relay_data['uplink_port_indices'] = uplink_port_indices
+ dhcp_relay_data['uplink_interface_link_local'] = uplink_interface_link_local
+
+ dhcp_relay_data_list.append(dhcp_relay_data)
+
+ return dhcp_relay_data_list
+
+
+@pytest.fixture(scope="module")
+def validate_dut_routes_exist(duthosts, rand_one_dut_hostname, dut_dhcp_relay_data):
+ """Fixture to valid a route to each DHCP server exist
+ """
+ duthost = duthosts[rand_one_dut_hostname]
+ dhcp_servers = set()
+ for dhcp_relay in dut_dhcp_relay_data:
+ dhcp_servers |= set(dhcp_relay['downlink_vlan_iface']['dhcpv6_server_addrs'])
+
+ for dhcp_server in dhcp_servers:
+ rtInfo = duthost.get_ip_route_info(ipaddress.ip_address(dhcp_server))
+ assert len(rtInfo["nexthops"]) > 0, "Failed to find route to DHCP server '{0}'".format(dhcp_server)
+
+
+def test_dhcp_relay_default(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data, validate_dut_routes_exist):
+ """Test DHCP relay functionality on T0 topology.
+ For each DHCP relay agent running on the DuT, verify DHCP packets are relayed properly
+ """
+ duthost = duthosts[rand_one_dut_hostname]
+
+ for dhcp_relay in dut_dhcp_relay_data:
+ # Run the DHCP relay test on the PTF host
+ ptf_runner(ptfhost,
+ "ptftests",
+ "dhcpv6_relay_test.DHCPTest",
+ platform_dir="ptftests",
+ params={"hostname": duthost.hostname,
+ "client_port_index": dhcp_relay['client_iface']['port_idx'],
+ "leaf_port_indices": repr(dhcp_relay['uplink_port_indices']),
+ "num_dhcp_servers": len(dhcp_relay['downlink_vlan_iface']['dhcpv6_server_addrs']),
+ "server_ip": str(dhcp_relay['downlink_vlan_iface']['dhcpv6_server_addrs'][0]),
+ "relay_iface_ip": str(dhcp_relay['downlink_vlan_iface']['addr']),
+ "relay_iface_mac": str(dhcp_relay['downlink_vlan_iface']['mac']),
+ "relay_link_local": str(dhcp_relay['uplink_interface_link_local']),
+ "vlan_ip": str(dhcp_relay['downlink_vlan_iface']['addr'])},
+ log_file="/tmp/dhcpv6_relay_test.DHCPTest.log")
+
+
+def test_dhcp_relay_after_link_flap(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data, validate_dut_routes_exist):
+ """Test DHCP relay functionality on T0 topology after uplinks flap
+ For each DHCP relay agent running on the DuT, with relay agent running, flap the uplinks,
+ then test whether the DHCP relay agent relays packets properly.
+ """
+ duthost = duthosts[rand_one_dut_hostname]
+
+ for dhcp_relay in dut_dhcp_relay_data:
+ # Bring all uplink interfaces down
+ for iface in dhcp_relay['uplink_interfaces']:
+ duthost.shell('ifconfig {} down'.format(iface))
+
+ # Sleep a bit to ensure uplinks are down
+ time.sleep(20)
+
+ # Bring all uplink interfaces back up
+ for iface in dhcp_relay['uplink_interfaces']:
+ duthost.shell('ifconfig {} up'.format(iface))
+
+ # Sleep a bit to ensure uplinks are up
+ time.sleep(20)
+
+ # Run the DHCP relay test on the PTF host
+ ptf_runner(ptfhost,
+ "ptftests",
+ "dhcpv6_relay_test.DHCPTest",
+ platform_dir="ptftests",
+ params={"hostname": duthost.hostname,
+ "client_port_index": dhcp_relay['client_iface']['port_idx'],
+ "leaf_port_indices": repr(dhcp_relay['uplink_port_indices']),
+ "num_dhcp_servers": len(dhcp_relay['downlink_vlan_iface']['dhcpv6_server_addrs']),
+ "server_ip": str(dhcp_relay['downlink_vlan_iface']['dhcpv6_server_addrs'][0]),
+ "relay_iface_ip": str(dhcp_relay['downlink_vlan_iface']['addr']),
+ "relay_iface_mac": str(dhcp_relay['downlink_vlan_iface']['mac']),
+ "relay_link_local": str(dhcp_relay['uplink_interface_link_local']),
+ "vlan_ip": str(dhcp_relay['downlink_vlan_iface']['addr'])},
+ log_file="/tmp/dhcpv6_relay_test.DHCPTest.log")
+
+
+def test_dhcp_relay_start_with_uplinks_down(ptfhost, duthosts, rand_one_dut_hostname, dut_dhcp_relay_data, validate_dut_routes_exist):
+ """Test DHCP relay functionality on T0 topology when relay agent starts with uplinks down
+ For each DHCP relay agent running on the DuT, bring the uplinks down, then restart the
+ relay agent while the uplinks are still down. Then test whether the DHCP relay agent
+ relays packets properly.
+ """
+ duthost = duthosts[rand_one_dut_hostname]
+
+ for dhcp_relay in dut_dhcp_relay_data:
+ # Bring all uplink interfaces down
+ for iface in dhcp_relay['uplink_interfaces']:
+ duthost.shell('ifconfig {} down'.format(iface))
+
+ # Sleep a bit to ensure uplinks are down
+ time.sleep(20)
+
+ # Restart DHCP relay service on DUT
+ duthost.shell('systemctl restart dhcp_relay.service')
+
+ # Sleep to give the DHCP relay container time to start up and
+ # allow the relay agent to begin listening on the down interfaces
+ time.sleep(40)
+
+ # Bring all uplink interfaces back up
+ for iface in dhcp_relay['uplink_interfaces']:
+ duthost.shell('ifconfig {} up'.format(iface))
+
+ # Sleep a bit to ensure uplinks are up
+ time.sleep(20)
+
+ # Run the DHCP relay test on the PTF host
+ ptf_runner(ptfhost,
+ "ptftests",
+ "dhcpv6_relay_test.DHCPTest",
+ platform_dir="ptftests",
+ params={"hostname": duthost.hostname,
+ "client_port_index": dhcp_relay['client_iface']['port_idx'],
+ "leaf_port_indices": repr(dhcp_relay['uplink_port_indices']),
+ "num_dhcp_servers": len(dhcp_relay['downlink_vlan_iface']['dhcpv6_server_addrs']),
+ "server_ip": str(dhcp_relay['downlink_vlan_iface']['dhcpv6_server_addrs'][0]),
+ "relay_iface_ip": str(dhcp_relay['downlink_vlan_iface']['addr']),
+ "relay_iface_mac": str(dhcp_relay['downlink_vlan_iface']['mac']),
+ "relay_link_local": str(dhcp_relay['uplink_interface_link_local']),
+ "vlan_ip": str(dhcp_relay['downlink_vlan_iface']['addr'])},
+ log_file="/tmp/dhcpv6_relay_test.DHCPTest.log")