Skip to content

Commit

Permalink
Validate input of config mirror_session add (sonic-net#1825)
Browse files Browse the repository at this point in the history
* Validate input of add mirror session

Signed-off-by: bingwang <bingwang@microsoft.com>
  • Loading branch information
bingwang-ms authored Oct 14, 2021
1 parent 9ab20fd commit c950a55
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 13 deletions.
44 changes: 31 additions & 13 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@

asic_type = None

DSCP_RANGE = click.IntRange(min=0, max=63)
TTL_RANGE = click.IntRange(min=0, max=255)
QUEUE_RANGE = click.IntRange(min=0, max=255)
GRE_TYPE_RANGE = click.IntRange(min=0, max=65535)

#
# Helper functions
#
Expand Down Expand Up @@ -953,6 +958,19 @@ def cache_arp_entries():
open(restore_flag_file, 'w').close()
return success


def validate_ipv4_address(ctx, param, ip_addr):
"""Helper function to validate ipv4 address
"""
try:
ip_n = ipaddress.ip_network(ip_addr, False)
if ip_n.version != 4:
raise click.UsageError("{} is not a valid IPv4 address".format(ip_addr))
return ip_addr
except ValueError as e:
raise click.UsageError(str(e))


# This is our main entrypoint - the main 'config' command
@click.group(cls=clicommon.AbbreviationGroup, context_settings=CONTEXT_SETTINGS)
@click.pass_context
Expand Down Expand Up @@ -1775,12 +1793,12 @@ def mirror_session():

@mirror_session.command('add')
@click.argument('session_name', metavar='<session_name>', required=True)
@click.argument('src_ip', metavar='<src_ip>', required=True)
@click.argument('dst_ip', metavar='<dst_ip>', required=True)
@click.argument('dscp', metavar='<dscp>', required=True)
@click.argument('ttl', metavar='<ttl>', required=True)
@click.argument('gre_type', metavar='[gre_type]', required=False)
@click.argument('queue', metavar='[queue]', required=False)
@click.argument('src_ip', metavar='<src_ip>', callback=validate_ipv4_address, required=True)
@click.argument('dst_ip', metavar='<dst_ip>', callback=validate_ipv4_address, required=True)
@click.argument('dscp', metavar='<dscp>', type=DSCP_RANGE, required=True)
@click.argument('ttl', metavar='<ttl>', type=TTL_RANGE, required=True)
@click.argument('gre_type', metavar='[gre_type]', type=GRE_TYPE_RANGE, required=False)
@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False)
@click.option('--policer')
def add(session_name, src_ip, dst_ip, dscp, ttl, gre_type, queue, policer):
""" Add ERSPAN mirror session.(Legacy support) """
Expand All @@ -1799,12 +1817,12 @@ def erspan(ctx):

@erspan.command('add')
@click.argument('session_name', metavar='<session_name>', required=True)
@click.argument('src_ip', metavar='<src_ip>', required=True)
@click.argument('dst_ip', metavar='<dst_ip>', required=True)
@click.argument('dscp', metavar='<dscp>', required=True)
@click.argument('ttl', metavar='<ttl>', required=True)
@click.argument('gre_type', metavar='[gre_type]', required=False)
@click.argument('queue', metavar='[queue]', required=False)
@click.argument('src_ip', metavar='<src_ip>', callback=validate_ipv4_address, required=True)
@click.argument('dst_ip', metavar='<dst_ip>', callback=validate_ipv4_address,required=True)
@click.argument('dscp', metavar='<dscp>', type=DSCP_RANGE, required=True)
@click.argument('ttl', metavar='<ttl>', type=TTL_RANGE, required=True)
@click.argument('gre_type', metavar='[gre_type]', type=GRE_TYPE_RANGE, required=False)
@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False)
@click.argument('src_port', metavar='[src_port]', required=False)
@click.argument('direction', metavar='[direction]', required=False)
@click.option('--policer')
Expand Down Expand Up @@ -1877,7 +1895,7 @@ def span(ctx):
@click.argument('dst_port', metavar='<dst_port>', required=True)
@click.argument('src_port', metavar='[src_port]', required=False)
@click.argument('direction', metavar='[direction]', required=False)
@click.argument('queue', metavar='[queue]', required=False)
@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False)
@click.option('--policer')
def add(session_name, dst_port, src_port, direction, queue, policer):
""" Add SPAN mirror session """
Expand Down
150 changes: 150 additions & 0 deletions tests/config_mirror_session_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import pytest
import config.main as config
from unittest import mock
from click.testing import CliRunner

ERR_MSG_IP_FAILURE = "does not appear to be an IPv4 or IPv6 network"
ERR_MSG_IP_VERSION_FAILURE = "not a valid IPv4 address"
ERR_MSG_VALUE_FAILURE = "Invalid value for"

def test_mirror_session_add():
runner = CliRunner()

# Verify invalid src_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "400.1.1.1", "2.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid dst_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "256.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid ip version
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1::1", "2::2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_VERSION_FAILURE in result.stdout

# Verify invalid dscp
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "65536", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid ttl
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "256", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid gre
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65536", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid queue
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65", "65536"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Positive case
with mock.patch('config.main.add_erspan') as mocked:
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "100.1.1.1", "2.2.2.2", "8", "63", "10", "100"])

mocked.assert_called_with("test_session", "100.1.1.1", "2.2.2.2", 8, 63, 10, 100, None)



def test_mirror_session_erspan_add():
runner = CliRunner()

# Verify invalid src_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "400.1.1.1", "2.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid dst_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "256.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid ip version
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1::1", "2::2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_VERSION_FAILURE in result.stdout

# Verify invalid dscp
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "65536", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid ttl
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "256", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid gre
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65536", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid queue
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65", "65536"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Positive case
with mock.patch('config.main.add_erspan') as mocked:
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "100.1.1.1", "2.2.2.2", "8", "63", "10", "100"])

mocked.assert_called_with("test_session", "100.1.1.1", "2.2.2.2", 8, 63, 10, 100, None, None, None)


def test_mirror_session_span_add():
runner = CliRunner()

# Verify invalid queue
result = runner.invoke(
config.config.commands["mirror_session"].commands["span"].commands["add"],
["test_session", "Ethernet0", "Ethernet4", "rx", "65536"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Positive case
with mock.patch('config.main.add_span') as mocked:
result = runner.invoke(
config.config.commands["mirror_session"].commands["span"].commands["add"],
["test_session", "Ethernet0", "Ethernet4", "rx", "100"])

mocked.assert_called_with("test_session", "Ethernet0", "Ethernet4", "rx", 100, None)

0 comments on commit c950a55

Please sign in to comment.