Skip to content

Commit

Permalink
tests: Added tests for new AutoRP functionality
Browse files Browse the repository at this point in the history
Uses hardcoded sample AutoRP packets injected in to test
message parsing and proper application of AutoRP learned
RP info. Tests mix of AutoRP and static RP's.

Signed-off-by: Nathan Bahr <nbahr@atcorp.com>
  • Loading branch information
nabahr committed Sep 17, 2024
1 parent 70ab5e1 commit b4b9158
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 1 deletion.
91 changes: 90 additions & 1 deletion tests/topotests/lib/pim.py
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,49 @@ def verify_pim_rp_info(
return True


@retry(retry_timeout=60, diag_pct=0)
def verify_pim_rp_info_is_empty(tgen, dut, af="ipv4"):
"""
Verify pim rp info by running "show ip pim rp-info" cli
Parameters
----------
* `tgen`: topogen object
* `dut`: device under test
Usage
-----
dut = "r1"
result = verify_pim_rp_info_is_empty(tgen, dut)
Returns
-------
errormsg(str) or True
"""

logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))

if dut not in tgen.routers():
return False

rnode = tgen.routers()[dut]

ip_cmd = "ip"
if af == "ipv6":
ip_cmd = "ipv6"

logger.info("[DUT: %s]: Verifying %s rp info", dut, ip_cmd)
cmd = "show {} pim rp-info json".format(ip_cmd)
show_ip_rp_info_json = run_frr_cmd(rnode, cmd, isjson=True)

if show_ip_rp_info_json:
errormsg = "[DUT %s]: Verifying empty rp-info [FAILED]!!" % (dut)
return errormsg

logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True


@retry(retry_timeout=60, diag_pct=0)
def verify_pim_state(
tgen,
Expand Down Expand Up @@ -2751,6 +2794,48 @@ def scapy_send_bsr_raw_packet(tgen, topo, senderRouter, receiverRouter, packet=N
return True


def scapy_send_autorp_raw_packet(tgen, senderRouter, senderInterface, packet=None):
"""
Using scapy Raw() method to send AutoRP raw packet from one FRR
to other
Parameters:
-----------
* `tgen` : Topogen object
* `senderRouter` : Sender router
* `senderInterface` : SenderInterface
* `packet` : AutoRP packet in raw format
returns:
--------
errormsg or True
"""

global CWD
result = ""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))

python3_path = tgen.net.get_exec_path(["python3", "python"])
# send_bsr_packet.py has no direct ties to bsr, just sends a raw packet out
# a given interface, so just reuse it
script_path = os.path.join(CWD, "send_bsr_packet.py")
node = tgen.net[senderRouter]

cmd = [
python3_path,
script_path,
packet,
senderInterface,
"--interval=1",
"--count=1",
]
logger.info("Scapy cmd: \n %s", cmd)
node.cmd_raises(cmd)

logger.debug("Exiting lib API: scapy_send_autorp_raw_packet")
return True


def find_rp_from_bsrp_info(tgen, dut, bsr, grp=None):
"""
Find which RP is having lowest prioriy and returns rp IP
Expand Down Expand Up @@ -4260,6 +4345,7 @@ def verify_local_igmp_groups(tgen, dut, interface, group_addresses):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True


@retry(retry_timeout=62)
def verify_static_groups(tgen, dut, interface, group_addresses):
"""
Expand Down Expand Up @@ -4293,7 +4379,9 @@ def verify_static_groups(tgen, dut, interface, group_addresses):
rnode = tgen.routers()[dut]

logger.info("[DUT: %s]: Verifying static groups received:", dut)
show_static_group_json = run_frr_cmd(rnode, "show ip igmp static-group json", isjson=True)
show_static_group_json = run_frr_cmd(
rnode, "show ip igmp static-group json", isjson=True
)

if type(group_addresses) is not list:
group_addresses = [group_addresses]
Expand Down Expand Up @@ -4330,6 +4418,7 @@ def verify_static_groups(tgen, dut, interface, group_addresses):
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return True


def verify_pim_interface_traffic(tgen, input_dict, return_stats=True, addr_type="ipv4"):
"""
Verify ip pim interface traffic by running
Expand Down
Empty file.
16 changes: 16 additions & 0 deletions tests/topotests/pim_autorp/r1/frr.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
!
hostname r1
password zebra
log file /tmp/r1-frr.log
debug pim autorp
!
interface r1-eth0
ip address 10.10.76.1/24
ip igmp
ip pim
!
ip forwarding
!
router pim
autorp discovery
!
16 changes: 16 additions & 0 deletions tests/topotests/pim_autorp/r2/frr.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
!
hostname r2
password zebra
log file /tmp/r2-frr.log
debug pim autorp
!
interface r2-eth0
ip address 10.10.76.2/24
ip igmp
ip pim
!
ip forwarding
!
router pim
autorp discovery
!
207 changes: 207 additions & 0 deletions tests/topotests/pim_autorp/test_pim_autorp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#!/usr/bin/env python
# SPDX-License-Identifier: ISC

#
# test_pim_autorp.py
#
# Copyright (c) 2024 ATCorp
# Nathan Bahr
#

import os
import sys
import pytest

# pylint: disable=C0413
# Import topogen and topotest helpers
from lib.topogen import Topogen, get_topogen
from lib.topolog import logger
from lib.pim import scapy_send_autorp_raw_packet, verify_pim_rp_info, verify_pim_rp_info_is_empty
from lib.common_config import step, write_test_header

from time import sleep

"""
test_pim_autorp.py: Test general PIM AutoRP functionality
"""

TOPOLOGY = """
Basic AutoRP functionality
+---+---+ +---+---+
| | 10.10.76.0/24 | |
+ R1 + <------------------> + R2 |
| | .1 .2 | |
+---+---+ +---+---+
"""

# Save the Current Working Directory to find configuration files.
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))

# Required to instantiate the topology builder class.
pytestmark = [pytest.mark.pimd]


def build_topo(tgen):
"Build function"

# Create routers
tgen.add_router("r1")
tgen.add_router("r2")

# Create link between router 1 and 2
switch = tgen.add_switch("s1-2")
switch.add_link(tgen.gears["r1"])
switch.add_link(tgen.gears["r2"])

def setup_module(mod):
logger.info("PIM AutoRP basic functionality:\n {}".format(TOPOLOGY))

tgen = Topogen(build_topo, mod.__name__)
tgen.start_topology()

# Router 1 will be the router configured with "fake" autorp configuration, so give it a default route
# to router 2 so that routing to the RP address is not an issue
# r1_defrt_setup_cmds = [
# "ip route add default via 10.10.76.1 dev r1-eth0",
# ]
# for cmd in r1_defrt_setup_cmds:
# tgen.net["r1"].cmd(cmd)

logger.info("Testing PIM AutoRP support")
router_list = tgen.routers()
for rname, router in router_list.items():
logger.info("Loading router %s" % rname)
router.load_frr_config(os.path.join(CWD, "{}/frr.conf".format(rname)))

# Initialize all routers.
tgen.start_router()
for router in router_list.values():
if router.has_version("<", "4.0"):
tgen.set_error("unsupported version")


def teardown_module(mod):
"Teardown the pytest environment"
tgen = get_topogen()
tgen.stop_topology()

def test_pim_autorp_discovery_single_rp(request):
"Test PIM AutoRP Discovery with single RP"
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)

if tgen.routers_have_failure():
pytest.skip(tgen.errors)

step("Start with no RP configuration")
result = verify_pim_rp_info_is_empty(tgen, "r1")
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)

step("Send AutoRP packet from r1 to r2")
# 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4
data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000"
scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data)

step("Verify rp-info from AutoRP packet")
result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True)
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)

step("Verify AutoRP configuration times out")
result = verify_pim_rp_info_is_empty(tgen, "r2")
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)

def test_pim_autorp_discovery_multiple_rp(request):
"Test PIM AutoRP Discovery with multiple RP's"
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)

if tgen.routers_have_failure():
pytest.skip("skipped because of router(s) failure")

step("Start with no RP configuration")
result = verify_pim_rp_info_is_empty(tgen, "r2")
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)

step("Send AutoRP packet from r1 to r2")
# 2 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/8, 10.10.76.3, group(s) 225.0.0.0/8
data = "01005e00012800127f55cfb1080045c0003c700c000008110ab20a0a4c01e000012801f001f000283f5712020005000000000a0a4c0103010008e00000000a0a4c0303010008e1000000"
scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data)

step("Verify rp-info from AutoRP packet")
result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/8", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True)
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
result = verify_pim_rp_info(tgen, None, "r2", "225.0.0.0/8", "r2-eth0", "10.10.76.3", "AutoRP", False, "ipv4", True)
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)


def test_pim_autorp_discovery_static(request):
"Test PIM AutoRP Discovery with Static RP"
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)

if tgen.routers_have_failure():
pytest.skip("skipped because of router(s) failure")

step("Start with no RP configuration")
result = verify_pim_rp_info_is_empty(tgen, "r2")
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)

step("Add static RP configuration to r2")
rnode = tgen.routers()["r2"]
rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'rp 10.10.76.3 224.0.0.0/4'")

step("Verify static rp-info from r2")
result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.3", "Static", False, "ipv4", True)
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)

step("Send AutoRP packet from r1 to r2")
# 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4
data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000"
scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data)

step("Verify rp-info from AutoRP packet")
result = verify_pim_rp_info(tgen, None, "r2", "224.0.0.0/4", "r2-eth0", "10.10.76.1", "AutoRP", False, "ipv4", True)
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)


def test_pim_autorp_announce_group(request):
"Test PIM AutoRP Announcement with a single group"
tgen = get_topogen()
tc_name = request.node.name
write_test_header(tc_name)

if tgen.routers_have_failure():
pytest.skip("skipped because of router(s) failure")

step("Add candidate RP configuration to r1")
rnode = tgen.routers()["r1"]
rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce 10.10.76.1 224.0.0.0/4'")
step("Verify Announcement sent data")
# TODO: Verify AutoRP mapping agent receives candidate RP announcement
# Mapping agent is not yet implemented
#sleep(10)
step("Change AutoRP Announcement packet parameters")
rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce scope 8 interval 10 holdtime 60'")
step("Verify Announcement sent data")
# TODO: Verify AutoRP mapping agent receives updated candidate RP announcement
# Mapping agent is not yet implemented
#sleep(10)


def test_memory_leak():
"Run the memory leak test and report results."
tgen = get_topogen()
if not tgen.is_memleak_enabled():
pytest.skip("Memory leak test/report is disabled")

tgen.report_memory_leaks()


if __name__ == "__main__":
args = ["-s"] + sys.argv[1:]
sys.exit(pytest.main(args))

0 comments on commit b4b9158

Please sign in to comment.