Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLI] MACsec support #1727

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions config/macsec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import click
import utilities_common.cli as clicommon

#
# 'macsec' group ('config macsec ...')
#
@click.group(cls=clicommon.AbbreviationGroup, name='macsec')
def macsec():
"""MACsec-related configuration tasks"""
pass


#
# 'port' group ('config macsec port ...')
#
@macsec.group(cls=clicommon.AbbreviationGroup, name='port')
def macsec_port():
"""Enable MACsec or disable MACsec on the specified port"""
pass

#
# 'add' command ('config macsec port add ...')
#
@macsec_port.command('add')
@click.argument('port', metavar='<port_name>', required=True)
@click.argument('profile', metavar='<profile_name>', required=True)
@clicommon.pass_db
def add_port(db, port, profile):
"""
Add MACsec port
"""
ctx = click.get_current_context()

if clicommon.get_interface_naming_mode() == "alias":
alias = port
iface_alias_converter = clicommon.InterfaceAliasConverter(db)
port = iface_alias_converter.alias_to_name(alias)
if port is None:
ctx.fail("cannot find port name for alias {}".format(alias))

profile_entry = db.cfgdb.get_entry('MACSEC_PROFILE', profile)
if len(profile_entry) == 0:
ctx.fail("profile {} doesn't exist".format(profile))

db.cfgdb.mod_entry("PORT", port, {'macsec': profile})


#
# 'del' command ('config macsec port del ...')
#
@macsec_port.command('del')
@click.argument('port', metavar='<port_name>', required=True)
@clicommon.pass_db
def del_port(db, port):
"""
Delete MACsec port
"""
ctx = click.get_current_context()

if clicommon.get_interface_naming_mode() == "alias":
alias = port
iface_alias_converter = clicommon.InterfaceAliasConverter(db)
port = iface_alias_converter.alias_to_name(alias)
if port is None:
ctx.fail("cannot find port name for alias {}".format(alias))

db.cfgdb.mod_entry("PORT", port, {'macsec': ""})


#
# 'profile' group ('config macsec profile ...')
#
@macsec.group(cls=clicommon.AbbreviationGroup, name='profile')
def macsec_profile():
pass


def is_hexstring(hexstring: str):
try:
int(hexstring, 16)
return True
except ValueError:
return False


#
# 'add' command ('config macsec profile add ...')
#
@macsec_profile.command('add')
@click.argument('profile', metavar='<profile_name>', required=True)
@click.option('--priority', metavar='<priority>', required=False, default=255, show_default=True, type=click.IntRange(0, 255), help="For Key server election. In 0-255 range with 0 being the highest priority.")
@click.option('--cipher_suite', metavar='<cipher_suite>', required=False, default="GCM-AES-128", show_default=True, type=click.Choice(["GCM-AES-128", "GCM-AES-256", "GCM-AES-XPN-128", "GCM-AES-XPN-256"]), help="The cipher suite for MACsec.")
@click.option('--primary_cak', metavar='<primary_cak>', required=True, type=str, help="Primary Connectivity Association Key.")
@click.option('--primary_ckn', metavar='<primary_cak>', required=True, type=str, help="Primary CAK Name.")
@click.option('--policy', metavar='<policy>', required=False, default="security", show_default=True, type=click.Choice(["integrity_only", "security"]), help="MACsec policy. INTEGRITY_ONLY: All traffic, except EAPOL, will be converted to MACsec packets without encryption. SECURITY: All traffic, except EAPOL, will be encrypted by SecY.")
@click.option('--enable_replay_protect/--disable_replay_protect', metavar='<replay_protect>', required=False, default=False, show_default=True, is_flag=True, help="Whether enable replay protect.")
@click.option('--replay_window', metavar='<enable_replay_protect>', required=False, default=0, show_default=True, type=click.IntRange(0, 2**32), help="Replay window size that is the number of packets that could be out of order. This field works only if ENABLE_REPLAY_PROTECT is true.")
@click.option('--send_sci/--no_send_sci', metavar='<send_sci>', required=False, default=True, show_default=True, is_flag=True, help="Send SCI in SecTAG field of MACsec header.")
@click.option('--rekey_period', metavar='<rekey_period>', required=False, default=0, show_default=True, type=click.IntRange(min=0), help="The period of proactively refresh (Unit second).")
@clicommon.pass_db
def add_profile(db, profile, priority, cipher_suite, primary_cak, primary_ckn, policy, enable_replay_protect, replay_window, send_sci, rekey_period):
"""
Add MACsec profile
"""
ctx = click.get_current_context()

profile_entry = db.cfgdb.get_entry('MACSEC_PROFILE', profile)
if not len(profile_entry) == 0:
ctx.fail("{} already exists".format(profile))

profile_table = {}

profile_table["priority"] = priority

profile_table["cipher_suite"] = cipher_suite

if "128" in cipher_suite:
if len(primary_cak) != 32:
ctx.fail("Expect the length of CAK is 32, but got {}".format(len(primary_cak)))
elif "256" in cipher_suite:
if len(primary_cak) != 64:
ctx.fail("Expect the length of CAK is 64, but got {}".format(len(primary_cak)))
if not is_hexstring(primary_cak):
ctx.fail("Expect the primary_cak is valid hex string")
if not is_hexstring(primary_ckn):
ctx.fail("Expect the primary_ckn is valid hex string")
profile_table["primary_cak"] = primary_cak
profile_table["primary_ckn"] = primary_ckn

profile_table["policy"] = policy

if enable_replay_protect and replay_window > 0:
profile_table["enable_replay_protect"] = enable_replay_protect
profile_table["replay_window"] = replay_window

profile_table["send_sci"] = send_sci

if rekey_period > 0:
profile_table["rekey_period"] = rekey_period

for k, v in profile_table.items():
if isinstance(v, bool):
if v:
profile_table[k] = "true"
else:
profile_table[k] = "false"
else:
profile_table[k] = str(v)
db.cfgdb.set_entry("MACSEC_PROFILE", profile, profile_table)


#
# 'del' command ('config macsec profile del ...')
#
@macsec_profile.command('del')
@click.argument('profile', metavar='<profile_name>', required=True)
@clicommon.pass_db
def del_profile(db, profile):
"""
Delete MACsec profile
"""
ctx = click.get_current_context()

profile_entry = db.cfgdb.get_entry('MACSEC_PROFILE', profile)
if len(profile_entry) == 0:
ctx.fail("{} doesn't exist".format(profile))

db.cfgdb.set_entry("MACSEC_PROFILE", profile, None)
2 changes: 2 additions & 0 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from . import feature
from . import kdump
from . import kube
from . import macsec
from . import muxcable
from . import nat
from . import vlan
Expand Down Expand Up @@ -1033,6 +1034,7 @@ def config(ctx):
config.add_command(feature.feature)
config.add_command(kdump.kdump)
config.add_command(kube.kubernetes)
config.add_command(macsec.macsec)
config.add_command(muxcable.muxcable)
config.add_command(nat.nat)
config.add_command(vlan.vlan)
Expand Down
Loading