Skip to content

Commit

Permalink
[show][config] cli support for firmware upgrade on Y-Cable (sonic-net…
Browse files Browse the repository at this point in the history
…#1528)


Summary:
This PR provides the support for adding CLI commands for activatie, download, upgrade firmware on the Y-cable
In particular these Cli commands are supported:
sudo config muxcable firmware download Ethernet0
sudo config muxcable firmware rollback Ethernet0
sudo config muxcable firmware download ~/AEC_WYOMING_B52Yb0_MS_0.6_20201218.bin Ethernet0

Signed-off-by: vaibhav-dahiya vdahiya@microsoft.com

What I did
Added the support for firmware upgrade CLI on Y cable

How I did it
added the changes in sonic-utilities/show and sonic-utilities/config by changing the muxcable.py

Signed-off-by: vaibhav-dahiya <vdahiya@microsoft.com>
  • Loading branch information
vdahiya12 authored and gitsabari committed Jun 15, 2021
1 parent 80bdcfc commit ff565ad
Show file tree
Hide file tree
Showing 3 changed files with 420 additions and 0 deletions.
231 changes: 231 additions & 0 deletions config/muxcable.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,234 @@ def setswitchmode(state, port):
if rc == False:
click.echo("ERR: Unable to set switching mode one or more ports to {}".format(state))
sys.exit(CONFIG_FAIL)


def get_per_npu_statedb(per_npu_statedb, port_table_keys):

# Getting all front asic namespace and correspding config and state DB connector

namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
# replace these with correct macros
per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB)

port_table_keys[asic_id] = per_npu_statedb[asic_id].keys(
per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*')


def get_physical_port_list(port):

physical_port_list = []
if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
sys.exit(CONFIG_FAIL)

return (physical_port_list, asic_index)


def perform_download_firmware(physical_port, fwfile, port):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile)
if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS:
click.echo("firmware download successful {}".format(port))
return True
else:
click.echo("firmware download failure {}".format(port))
return False


def perform_activate_firmware(physical_port, port):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.activate_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS:
click.echo("firmware activate successful for {}".format(port))
return True
else:
click.echo("firmware activate failure for {}".format(port))
return False


def perform_rollback_firmware(physical_port, port):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.rollback_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS:
click.echo("firmware rollback successful {}".format(port))
return True
else:
click.echo("firmware rollback failure {}".format(port))
return False


@muxcable.group(cls=clicommon.AbbreviationGroup)
def firmware():
"""Configure muxcable firmware command"""
pass


@firmware.command()
@click.argument('fwfile', metavar='<firmware_file>', required=True)
@click.argument('port', metavar='<port_name>', required=True, default=None)
def download(fwfile, port):
"""Config muxcable firmware download"""

per_npu_statedb = {}
y_cable_asic_table_keys = {}
port_table_keys = {}

get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

physical_port_list = []
physical_port_list, asic_index = get_physical_port_list(port)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
perform_download_firmware(physical_port, fwfile, port)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))
sys.exit(CONFIG_FAIL)

elif port == "all" and port is not None:

rc = CONFIG_SUCCESSFUL
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]

physical_port_list = []
(physical_port_list, asic_index) = get_physical_port_list(port)

physical_port = physical_port_list[0]

status = perform_download_firmware(physical_port, fwfile, port)

if status is not True:
rc = CONFIG_FAIL

sys.exit(rc)


@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def activate(port):
"""Config muxcable firmware activate"""

per_npu_statedb = {}
y_cable_asic_table_keys = {}
port_table_keys = {}

get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

physical_port_list = []
(physical_port_list, asic_index) = get_physical_port_list(port)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
perform_activate_firmware(physical_port, port)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))
sys.exit(CONFIG_FAIL)

elif port == "all" and port is not None:

rc = CONFIG_SUCCESSFUL
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]

physical_port_list = []

(physical_port_list, asic_index) = get_physical_port_list(port)
physical_port = physical_port_list[0]
status = perform_activate_firmware(physical_port, port)

if status is not True:
rc = CONFIG_FAIL

sys.exit(rc)


@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def rollback(port):
"""Config muxcable firmware rollback"""

port_table_keys = {}
y_cable_asic_table_keys = {}
per_npu_statedb = {}

get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

physical_port_list = []
(physical_port_list, asic_index) = get_physical_port_list(port)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
perform_rollback_firmware(physical_port, port)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))
sys.exit(CONFIG_FAIL)

elif port == "all" and port is not None:

rc = CONFIG_SUCCESSFUL
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]

physical_port_list = []
(physical_port_list, asic_index) = get_physical_port_list(port)
physical_port = physical_port_list[0]
status = perform_rollback_firmware(physical_port, port)

if status is not True:
rc = CONFIG_FAIL

sys.exit(rc)
105 changes: 105 additions & 0 deletions show/muxcable.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from natsort import natsorted
from sonic_py_common import multi_asic
from swsscommon.swsscommon import SonicV2Connector, ConfigDBConnector
from swsscommon import swsscommon
from tabulate import tabulate
from utilities_common import platform_sfputil_helper

Expand Down Expand Up @@ -820,3 +821,107 @@ def switchmode(port):
click.echo(tabulate(body, headers=headers))
if rc == False:
sys.exit(EXIT_FAIL)


def get_firmware_dict(physical_port, target, side, mux_info_dict):

import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.get_firmware_version(physical_port, target)

if result is not None and isinstance(result, dict):
mux_info_dict[("version_{}_active".format(side))] = result.get("version_active", None)
mux_info_dict[("version_{}_inactive".format(side))] = result.get("version_inactive", None)
mux_info_dict[("version_{}_next".format(side))] = result.get("version_next", None)

else:
mux_info_dict[("version_{}_active".format(side))] = "N/A"
mux_info_dict[("version_{}_inactive".format(side))] = "N/A"
mux_info_dict[("version_{}_next".format(side))] = "N/A"


@muxcable.group(cls=clicommon.AbbreviationGroup)
def firmware():
"""Show muxcable firmware command"""
pass


@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def version(port):
"""Show muxcable firmware version"""

port_table_keys = {}
y_cable_asic_table_keys = {}
per_npu_statedb = {}
physical_port_list = []

# Getting all front asic namespace and correspding config and state DB connector

namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
# replace these with correct macros
per_npu_statedb[asic_id] = swsscommon.SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB)

port_table_keys[asic_id] = per_npu_statedb[asic_id].keys(
per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*')

if port is not None:

logical_port_list = platform_sfputil_helper.get_logical_list()

if port not in logical_port_list:
click.echo(("ERR: Not a valid logical port for muxcable firmware {}".format(port)))
sys.exit(CONFIG_FAIL)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))

if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
sys.exit(CONFIG_FAIL)

mux_info_dict = {}
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
import sonic_y_cable.y_cable
read_side = sonic_y_cable.y_cable.check_read_side(physical_port)
if logical_key in y_cable_asic_table_keys:
if read_side == 1:
get_firmware_dict(physical_port, 1, "self", mux_info_dict)
get_firmware_dict(physical_port, 2, "peer", mux_info_dict)
get_firmware_dict(physical_port, 0, "nic", mux_info_dict)
click.echo("{}".format(json.dumps(mux_info_dict, indent=4)))
elif read_side == 2:
get_firmware_dict(physical_port, 2, "self", mux_info_dict)
get_firmware_dict(physical_port, 1, "peer", mux_info_dict)
get_firmware_dict(physical_port, 0, "nic", mux_info_dict)
click.echo("{}".format(json.dumps(mux_info_dict, indent=4)))
else:
click.echo("Did not get a valid read_side for muxcable".format(port))
sys.exit(CONFIG_FAIL)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))
Loading

0 comments on commit ff565ad

Please sign in to comment.