From 0df3ba812fbdc700b4170ebf29b5a97f87407eb1 Mon Sep 17 00:00:00 2001 From: Ying Xie Date: Fri, 12 Aug 2022 15:34:46 +0000 Subject: [PATCH] Revert "Improve the way to check port type of RJ45 port (#2249)" This reverts commit a1a09e4beefb55e99ed8c9fd45ab4f4aa2b3770e. --- scripts/intfutil | 40 +++++----- scripts/sfpshow | 75 +++++++++---------- sfputil/main.py | 71 ++++++++++-------- .../mock_platform_sfputil.py | 41 ---------- tests/mock_platform_sfputil/portmap.json | 8 -- tests/sfp_test.py | 8 -- tests/sfputil_test.py | 44 +++++++---- utilities_common/platform_sfputil_helper.py | 45 +---------- 8 files changed, 122 insertions(+), 210 deletions(-) delete mode 100644 tests/mock_platform_sfputil/mock_platform_sfputil.py delete mode 100644 tests/mock_platform_sfputil/portmap.json diff --git a/scripts/intfutil b/scripts/intfutil index fb351687a8..e327d1a607 100755 --- a/scripts/intfutil +++ b/scripts/intfutil @@ -5,6 +5,13 @@ import os import re import sys +from natsort import natsorted +from tabulate import tabulate +from utilities_common import constants +from utilities_common import multi_asic as multi_asic_util +from utilities_common.intf_filter import parse_interface_in_filter +from sonic_py_common.interface import get_intf_longname + # mock the redis for unit test purposes # try: if os.environ["UTILITIES_UNIT_TESTING"] == "2": @@ -13,8 +20,6 @@ try: sys.path.insert(0, modules_path) sys.path.insert(0, tests_path) import mock_tables.dbconnector - from mock_platform_sfputil.mock_platform_sfputil import mock_platform_sfputil_helper - mock_platform_sfputil_helper() if os.environ["UTILITIES_UNIT_TESTING_TOPOLOGY"] == "multi_asic": import mock_tables.mock_multi_asic mock_tables.dbconnector.load_namespace_config() @@ -22,14 +27,6 @@ try: except KeyError: pass -from natsort import natsorted -from tabulate import tabulate -from utilities_common import constants -from utilities_common import multi_asic as multi_asic_util -from utilities_common.intf_filter import parse_interface_in_filter -from utilities_common.platform_sfputil_helper import is_rj45_port, RJ45_PORT_TYPE -from sonic_py_common.interface import get_intf_longname - # ========================== Common interface-utils logic ========================== @@ -52,7 +49,7 @@ PORT_RMT_ADV_SPEEDS = 'rmt_adv_speeds' PORT_INTERFACE_TYPE = 'interface_type' PORT_ADV_INTERFACE_TYPES = 'adv_interface_types' PORT_TPID = "tpid" -OPTICS_TYPE_RJ45 = RJ45_PORT_TYPE +OPTICS_TYPE_RJ45 = 'RJ45' PORT_LINK_TRAINING = 'link_training' PORT_LINK_TRAINING_STATUS = 'link_training_status' @@ -164,10 +161,10 @@ def appl_db_port_status_get(appl_db, intf_name, status_type): if status is None: return "N/A" if status_type == PORT_SPEED and status != "N/A": - optics_type = port_optics_get(appl_db, intf_name, PORT_OPTICS_TYPE) + optics_type = state_db_port_optics_get(appl_db, intf_name, PORT_OPTICS_TYPE) status = port_speed_parse(status, optics_type) elif status_type == PORT_ADV_SPEEDS and status != "N/A" and status != "all": - optics_type = port_optics_get(appl_db, intf_name, PORT_OPTICS_TYPE) + optics_type = state_db_port_optics_get(appl_db, intf_name, PORT_OPTICS_TYPE) speed_list = status.split(',') new_speed_list = [] for s in natsorted(speed_list): @@ -184,7 +181,7 @@ def state_db_port_status_get(db, intf_name, field): if not status: return "N/A" if field in [PORT_RMT_ADV_SPEEDS] and status not in ["N/A", "all"]: - optics_type = port_optics_get(db, intf_name, PORT_OPTICS_TYPE) + optics_type = state_db_port_optics_get(db, intf_name, PORT_OPTICS_TYPE) speed_list = status.split(',') new_speed_list = [] for s in natsorted(speed_list): @@ -201,7 +198,7 @@ def port_oper_speed_get(db, intf_name): if oper_speed is None or oper_speed == "N/A" or oper_status != "up": return appl_db_port_status_get(db, intf_name, PORT_SPEED) else: - optics_type = port_optics_get(db, intf_name, PORT_OPTICS_TYPE) + optics_type = state_db_port_optics_get(db, intf_name, PORT_OPTICS_TYPE) return port_speed_parse(oper_speed, optics_type) def port_oper_speed_get_raw(db, intf_name): @@ -214,17 +211,14 @@ def port_oper_speed_get_raw(db, intf_name): speed = db.get(db.APPL_DB, PORT_STATUS_TABLE_PREFIX + intf_name, PORT_SPEED) return speed -def port_optics_get(state_db, intf_name, type): +def state_db_port_optics_get(state_db, intf_name, type): """ Get optic type info for port """ full_table_id = PORT_TRANSCEIVER_TABLE_PREFIX + intf_name optics_type = state_db.get(state_db.STATE_DB, full_table_id, type) if optics_type is None: - if is_rj45_port(intf_name): - return OPTICS_TYPE_RJ45 - else: - return "N/A" + return "N/A" return optics_type def merge_dicts(x,y): @@ -331,13 +325,13 @@ def po_speed_dict(po_int_dict, appl_db): # If no speed was returned, append None without format po_list.append(None) else: - optics_type = port_optics_get(appl_db, value[0], PORT_OPTICS_TYPE) + optics_type = state_db_port_optics_get(appl_db, value[0], PORT_OPTICS_TYPE) interface_speed = port_speed_parse(interface_speed, optics_type) po_list.append(interface_speed) elif len(value) > 1: for intf in value: temp_speed = port_oper_speed_get_raw(appl_db, intf) - optics_type = port_optics_get(appl_db, intf, PORT_OPTICS_TYPE) + optics_type = state_db_port_optics_get(appl_db, intf, PORT_OPTICS_TYPE) temp_speed = int(temp_speed) if temp_speed else 0 agg_speed_list.append(temp_speed) interface_speed = sum(agg_speed_list) @@ -483,7 +477,7 @@ class IntfStatus(object): config_db_vlan_port_keys_get(self.combined_int_to_vlan_po_dict, self.front_panel_ports_list, key), appl_db_port_status_get(self.db, key, PORT_OPER_STATUS), appl_db_port_status_get(self.db, key, PORT_ADMIN_STATUS), - port_optics_get(self.db, key, PORT_OPTICS_TYPE), + state_db_port_optics_get(self.db, key, PORT_OPTICS_TYPE), appl_db_port_status_get(self.db, key, PORT_PFC_ASYM_STATUS))) for po, value in self.portchannel_speed_dict.items(): diff --git a/scripts/sfpshow b/scripts/sfpshow index d292dddb85..3d71408202 100755 --- a/scripts/sfpshow +++ b/scripts/sfpshow @@ -17,6 +17,7 @@ from natsort import natsorted from sonic_py_common.interface import front_panel_prefix, backplane_prefix, inband_prefix, recirc_prefix from sonic_py_common import multi_asic from tabulate import tabulate +from utilities_common import multi_asic as multi_asic_util # Mock the redis DB for unit test purposes try: @@ -26,17 +27,12 @@ try: sys.path.insert(0, modules_path) sys.path.insert(0, test_path) import mock_tables.dbconnector - from mock_platform_sfputil.mock_platform_sfputil import mock_platform_sfputil_helper - mock_platform_sfputil_helper() if os.environ["UTILITIES_UNIT_TESTING_TOPOLOGY"] == "multi_asic": import mock_tables.mock_multi_asic mock_tables.dbconnector.load_namespace_config() except KeyError: pass -from utilities_common import multi_asic as multi_asic_util -from utilities_common.platform_sfputil_helper import is_rj45_port, RJ45_PORT_TYPE - # TODO: We should share these maps and the formatting functions between sfputil and sfpshow QSFP_DATA_MAP = { 'model': 'Vendor PN', @@ -219,6 +215,8 @@ QSFP_DD_DOM_VALUE_UNIT_MAP = { 'voltage': 'Volts' } +RJ45_PORT_TYPE = 'RJ45' + def display_invalid_intf_eeprom(intf_name): output = intf_name + ': SFP EEPROM Not detected\n' @@ -233,6 +231,7 @@ def display_invalid_intf_presence(intf_name): class SFPShow(object): + def __init__(self, intf_name, namespace_option, dump_dom=False): super(SFPShow, self).__init__() self.db = None @@ -395,47 +394,41 @@ class SFPShow(object): output = '' sfp_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) - if sfp_info_dict: - if sfp_info_dict['type'] == RJ45_PORT_TYPE: - output = 'SFP EEPROM is not applicable for RJ45 port\n' - else: - output = 'SFP EEPROM detected\n' - sfp_info_output = self.convert_sfp_info_to_output_string(sfp_info_dict) - output += sfp_info_output - - if dump_dom: - sfp_type = sfp_info_dict['type'] - dom_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_DOM_SENSOR|{}'.format(interface_name)) - dom_output = self.convert_dom_to_output_string(sfp_type, dom_info_dict) - output += dom_output + if sfp_info_dict['type'] == RJ45_PORT_TYPE: + output = 'SFP EEPROM is not applicable for RJ45 port\n' else: - if is_rj45_port(interface_name): - output = 'SFP EEPROM is not applicable for RJ45 port\n' - else: - output = "SFP EEPROM Not detected\n" + output = 'SFP EEPROM detected\n' + sfp_info_output = self.convert_sfp_info_to_output_string(sfp_info_dict) + output += sfp_info_output + + if dump_dom: + sfp_type = sfp_info_dict['type'] + dom_info_dict = state_db.get_all(state_db.STATE_DB, 'TRANSCEIVER_DOM_SENSOR|{}'.format(interface_name)) + dom_output = self.convert_dom_to_output_string(sfp_type, dom_info_dict) + output += dom_output return output @multi_asic_util.run_on_multi_asic def get_eeprom(self): if self.intf_name is not None: - self.intf_eeprom[self.intf_name] = self.convert_interface_sfp_info_to_cli_output_string( - self.db, self.intf_name, self.dump_dom) + presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(self.intf_name)) + if presence: + self.intf_eeprom[self.intf_name] = self.convert_interface_sfp_info_to_cli_output_string( + self.db, self.intf_name, self.dump_dom) + else: + self.intf_eeprom[self.intf_name] = "SFP EEPROM Not detected\n" else: port_table_keys = self.db.keys(self.db.APPL_DB, "PORT_TABLE:*") for i in port_table_keys: interface = re.split(':', i, maxsplit=1)[-1].strip() if interface and interface.startswith(front_panel_prefix()) and not interface.startswith((backplane_prefix(), inband_prefix(), recirc_prefix())): - self.intf_eeprom[interface] = self.convert_interface_sfp_info_to_cli_output_string( - self.db, interface, self.dump_dom) - - def convert_interface_sfp_presence_state_to_cli_output_string(self, state_db, interface_name): - sfp_info_dict = state_db.get_all(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface_name)) - if sfp_info_dict: - output = 'Present' - else: - output = 'Not present' - return output + presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(interface)) + if presence: + self.intf_eeprom[interface] = self.convert_interface_sfp_info_to_cli_output_string( + self.db, interface, self.dump_dom) + else: + self.intf_eeprom[interface] = "SFP EEPROM Not detected\n" @multi_asic_util.run_on_multi_asic @@ -443,15 +436,21 @@ class SFPShow(object): port_table = [] if self.intf_name is not None: - presence_string = self.convert_interface_sfp_presence_state_to_cli_output_string(self.db, self.intf_name) - port_table.append((self.intf_name, presence_string)) + presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(self.intf_name)) + if presence: + port_table.append((self.intf_name, 'Present')) + else: + port_table.append((self.intf_name, 'Not present')) else: port_table_keys = self.db.keys(self.db.APPL_DB, "PORT_TABLE:*") for i in port_table_keys: key = re.split(':', i, maxsplit=1)[-1].strip() if key and key.startswith(front_panel_prefix()) and not key.startswith((backplane_prefix(), inband_prefix(), recirc_prefix())): - presence_string = self.convert_interface_sfp_presence_state_to_cli_output_string(self.db, key) - port_table.append((key, presence_string)) + presence = self.db.exists(self.db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(key)) + if presence: + port_table.append((key, 'Present')) + else: + port_table.append((key, 'Not present')) self.table += port_table diff --git a/sfputil/main.py b/sfputil/main.py index 96653bb622..d567f39a0d 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -16,7 +16,6 @@ import click import sonic_platform import sonic_platform_base.sonic_sfp.sfputilhelper -from sonic_platform_base.sfp_base import SfpBase from swsscommon.swsscommon import SonicV2Connector from natsort import natsorted from sonic_py_common import device_info, logger, multi_asic @@ -292,16 +291,33 @@ def is_sfp_present(port_name): return bool(presence) -def is_port_type_rj45(port_name): +# Below defined two flavors of functions to determin whether a port is a RJ45 port. +# They serve different types of SFP utilities. One type of SFP utility consume the +# info stored in the STATE_DB, these utilities shall call 'is_rj45_port_from_db' +# to judge the port type. Another type of utilities will call the platform API +# directly to access SFP, for them shall use 'is_rj45_port_from_api'. +def is_rj45_port_from_db(port_name, db): + intf_type = db.get(db.STATE_DB, 'TRANSCEIVER_INFO|{}'.format(port_name), 'type') + return intf_type == RJ45_PORT_TYPE + + +def is_rj45_port_from_api(port_name): physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) try: - port_types = platform_chassis.get_port_or_cage_type(physical_port) - return SfpBase.SFP_PORT_TYPE_BIT_RJ45 == port_types + port_type = sfp.get_transceiver_info()['type'] except NotImplementedError: - pass + click.echo("Not able to judge the port type due to get_transceiver_info not implemented!", err=True) + sys.exit(ERROR_NOT_IMPLEMENTED) - return False + return port_type == RJ45_PORT_TYPE + + +def skip_if_port_is_rj45(port_name): + if is_rj45_port_from_api(port_name): + click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) + sys.exit(EXIT_FAIL) # ========================== Methods for formatting output ========================== # Convert dict values to cli output string @@ -643,7 +659,7 @@ def eeprom(port, dump_dom, namespace): for physical_port in physical_port_list: port_name = get_physical_port_name(logical_port_name, i, ganged) - if is_port_type_rj45(port_name): + if is_rj45_port_from_api(port_name): output += "{}: SFP EEPROM is not applicable for RJ45 port\n".format(port_name) output += '\n' continue @@ -709,7 +725,6 @@ def presence(port): logical_port_list = [port] - logical_port_list = natsort.natsorted(logical_port_list) for logical_port_name in logical_port_list: ganged = False i = 1 @@ -802,7 +817,7 @@ def fetch_error_status_from_platform_api(port): physical_port_list = logical_port_name_to_physical_port_list(logical_port_name) port_name = get_physical_port_name(logical_port_name, 1, False) - if is_port_type_rj45(logical_port_name): + if is_rj45_port_from_api(logical_port_name): output.append([port_name, "N/A"]) else: output.append([port_name, output_dict.get(physical_port_list[0])]) @@ -828,7 +843,7 @@ def fetch_error_status_from_state_db(port, state_db): sorted_ports = natsort.natsorted(status) output = [] for port in sorted_ports: - if is_port_type_rj45(port): + if is_rj45_port_from_db(port, state_db): description = "N/A" else: statestring = status[port].get('status') @@ -904,7 +919,7 @@ def lpmode(port): click.echo("Error: No physical ports found for logical port '{}'".format(logical_port_name)) return - if is_port_type_rj45(logical_port_name): + if is_rj45_port_from_api(logical_port_name): output_table.append([logical_port_name, "N/A"]) else: if len(physical_port_list) > 1: @@ -947,7 +962,7 @@ def fwversion(port_name): physical_port = logical_port_to_physical_port_index(port_name) sfp = platform_chassis.get_sfp(physical_port) - if is_port_type_rj45(port_name): + if is_rj45_port_from_api(port_name): click.echo("Show firmware version is not applicable for RJ45 port {}.".format(port_name)) sys.exit(EXIT_FAIL) @@ -986,7 +1001,7 @@ def set_lpmode(logical_port, enable): click.echo("Error: No physical ports found for logical port '{}'".format(logical_port)) return - if is_port_type_rj45(logical_port): + if is_rj45_port_from_api(logical_port): click.echo("{} low-power mode is not applicable for RJ45 port {}.".format("Enabling" if enable else "Disabling", logical_port)) sys.exit(EXIT_FAIL) @@ -1046,7 +1061,7 @@ def reset(port_name): click.echo("Error: No physical ports found for logical port '{}'".format(port_name)) return - if is_port_type_rj45(port_name): + if is_rj45_port_from_api(port_name): click.echo("Reset is not applicable for RJ45 port {}.".format(port_name)) sys.exit(EXIT_FAIL) @@ -1211,14 +1226,12 @@ def download_firmware(port_name, filepath): def run(port_name, mode): """Run the firmware with default mode=1""" - if is_port_type_rj45(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) - if not is_sfp_present(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) + status = run_firmware(port_name, int(mode)) if status != 1: click.echo('Failed to run firmware in mode={}! CDB status: {}'.format(mode, status)) @@ -1232,14 +1245,12 @@ def run(port_name, mode): def commit(port_name): """Commit the running firmware""" - if is_port_type_rj45(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) - if not is_sfp_present(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) + status = commit_firmware(port_name) if status != 1: click.echo('Failed to commit firmware! CDB status: {}'.format(status)) @@ -1256,14 +1267,12 @@ def upgrade(port_name, filepath): physical_port = logical_port_to_physical_port_index(port_name) - if is_port_type_rj45(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) - if not is_sfp_present(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) + show_firmware_version(physical_port) status = download_firmware(port_name, filepath) @@ -1294,14 +1303,12 @@ def upgrade(port_name, filepath): def download(port_name, filepath): """Download firmware on the transceiver""" - if is_port_type_rj45(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) - if not is_sfp_present(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) + start = time.time() status = download_firmware(port_name, filepath) if status == 1: @@ -1322,9 +1329,7 @@ def unlock(port_name, password): physical_port = logical_port_to_physical_port_index(port_name) sfp = platform_chassis.get_sfp(physical_port) - if is_port_type_rj45(port_name): - click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name)) - sys.exit(EXIT_FAIL) + skip_if_port_is_rj45(port_name) if not is_sfp_present(port_name): click.echo("{}: SFP EEPROM not detected\n".format(port_name)) diff --git a/tests/mock_platform_sfputil/mock_platform_sfputil.py b/tests/mock_platform_sfputil/mock_platform_sfputil.py deleted file mode 100644 index 5c3ea2e248..0000000000 --- a/tests/mock_platform_sfputil/mock_platform_sfputil.py +++ /dev/null @@ -1,41 +0,0 @@ -import json -import os -from sonic_platform_base.platform_base import PlatformBase -from sonic_platform_base.chassis_base import ChassisBase -from sonic_platform_base.sfp_base import SfpBase -import utilities_common.platform_sfputil_helper as platform_sfputil_helper - -portMap = None -RJ45Ports = None - -class mock_Chassis(ChassisBase): - def __init__(self): - ChassisBase.__init__(self) - - def get_port_or_cage_type(self, index): - if index in RJ45Ports: - return SfpBase.SFP_PORT_TYPE_BIT_RJ45 - else: - raise NotImplementedError - -def mock_logical_port_name_to_physical_port_list(port_name): - index = portMap.get(port_name) - if not index: - index = 0 - return [index] - -def mock_platform_sfputil_read_porttab_mappings(): - global portMap - global RJ45Ports - - with open(os.path.join(os.path.dirname(__file__), 'portmap.json')) as pm: - jsonobj = json.load(pm) - portMap = jsonobj['portMap'] - RJ45Ports = jsonobj['RJ45Ports'] - -def mock_platform_sfputil_helper(): - platform_sfputil_helper.platform_chassis = mock_Chassis() - platform_sfputil_helper.platform_sfputil = True - platform_sfputil_helper.platform_porttab_mapping_read = False - platform_sfputil_helper.platform_sfputil_read_porttab_mappings = mock_platform_sfputil_read_porttab_mappings - platform_sfputil_helper.logical_port_name_to_physical_port_list = mock_logical_port_name_to_physical_port_list diff --git a/tests/mock_platform_sfputil/portmap.json b/tests/mock_platform_sfputil/portmap.json deleted file mode 100644 index eec9a9f40d..0000000000 --- a/tests/mock_platform_sfputil/portmap.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "portMap": { - "Ethernet29": 1 - }, - "RJ45Ports": [ - 1 - ] -} diff --git a/tests/sfp_test.py b/tests/sfp_test.py index b894b39469..a69872ab76 100644 --- a/tests/sfp_test.py +++ b/tests/sfp_test.py @@ -364,14 +364,6 @@ def test_sfp_presence(self): assert result.exit_code == 0 assert result.output == expected - result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["presence"], ["Ethernet29"]) - expected = """Port Presence ----------- ----------- -Ethernet29 Not present -""" - assert result.exit_code == 0 - assert result.output == expected - result = runner.invoke(show.cli.commands["interfaces"].commands["transceiver"].commands["presence"], ["Ethernet36"]) expected = """Port Presence ---------- ---------- diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 1231ba67d7..a4d568d20e 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -268,16 +268,15 @@ def test_version(self): result = runner.invoke(sfputil.cli.commands['version'], []) assert result.output.rstrip() == 'sfputil version {}'.format(sfputil.VERSION) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=False)) def test_error_status_from_db(self): db = Db() expected_output = [['Ethernet0', 'Blocking Error|High temperature'], ['Ethernet4', 'OK'], ['Ethernet8', 'Unplugged'], ['Ethernet12', 'Unknown state: 255'], - ['Ethernet16', 'Unplugged'], - ['Ethernet28', 'Unplugged'], - ['Ethernet36', 'Unknown']] + ['Ethernet16', 'N/A'], + ['Ethernet28', 'N/A'], + ['Ethernet36', 'N/A']] output = sfputil.fetch_error_status_from_state_db(None, db.db) assert output == expected_output @@ -285,7 +284,11 @@ def test_error_status_from_db(self): output = sfputil.fetch_error_status_from_state_db('Ethernet0', db.db) assert output == expected_output_ethernet0 - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + expected_output_ethernet16 = expected_output[4:5] + output = sfputil.fetch_error_status_from_state_db('Ethernet16', db.db) + assert output == expected_output_ethernet16 + + @patch('sfputil.main.is_rj45_port_from_db', MagicMock(return_value=True)) def test_error_status_from_db_RJ45(self): db = Db() expected_output = [['Ethernet0', 'N/A'], @@ -302,9 +305,13 @@ def test_error_status_from_db_RJ45(self): output = sfputil.fetch_error_status_from_state_db('Ethernet0', db.db) assert output == expected_output_ethernet0 + expected_output_ethernet16 = expected_output[4:5] + output = sfputil.fetch_error_status_from_state_db('Ethernet16', db.db) + assert output == expected_output_ethernet16 + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=False)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=False)) @patch('subprocess.check_output', MagicMock(return_value="['0:OK']")) def test_fetch_error_status_from_platform_api(self): output = sfputil.fetch_error_status_from_platform_api('Ethernet0') @@ -313,7 +320,7 @@ def test_fetch_error_status_from_platform_api(self): @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) @patch('subprocess.check_output', MagicMock(return_value="['0:OK']")) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) def test_fetch_error_status_from_platform_api_RJ45(self): output = sfputil.fetch_error_status_from_platform_api('Ethernet0') assert output == [['Ethernet0', 'N/A']] @@ -394,7 +401,6 @@ def test_show_lpmode(self, mock_chassis): mock_sfp.get_lpmode.return_value = False mock_sfp.get_transceiver_info = MagicMock(return_value={'type': sfputil.RJ45_PORT_TYPE}) - mock_chassis.get_port_or_cage_type = MagicMock(return_value=sfputil.SfpBase.SFP_PORT_TYPE_BIT_RJ45) result = runner.invoke(sfputil.cli.commands['show'].commands['lpmode'], ["-p", "Ethernet0"]) assert result.exit_code == 0 expected_output = """Port Low-power Mode @@ -407,7 +413,7 @@ def test_show_lpmode(self, mock_chassis): @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=[1])) @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) def test_show_eeprom_RJ45(self, mock_chassis): mock_sfp = MagicMock() mock_api = MagicMock() @@ -418,8 +424,14 @@ def test_show_eeprom_RJ45(self, mock_chassis): expected_output = "Ethernet16: SFP EEPROM is not applicable for RJ45 port\n\n\n" assert result.output == expected_output + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) + @patch('sys.exit', MagicMock(return_value=EXIT_FAIL)) + def test_skip_if_port_is_rj45(self): + result = sfputil.skip_if_port_is_rj45('Ethernet0') + assert result == None + @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=1)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) def test_lpmode_set(self): runner = CliRunner() @@ -428,7 +440,7 @@ def test_lpmode_set(self): assert result.exit_code == EXIT_FAIL @patch('sfputil.main.logical_port_name_to_physical_port_list', MagicMock(return_value=1)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) def test_reset_RJ45(self): runner = CliRunner() @@ -451,7 +463,7 @@ def test_unlock_firmware(self, mock_chassis): @patch('sfputil.main.platform_chassis') @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) def test_show_fwversion_Rj45(self, mock_chassis): mock_sfp = MagicMock() mock_api = MagicMock() @@ -488,7 +500,7 @@ def test_commit_firmwre(self, mock_chassis): assert status == 1 @patch('sfputil.main.is_sfp_present', MagicMock(return_value=True)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) def test_firmware_run_RJ45(self): runner = CliRunner() result = runner.invoke(sfputil.cli.commands['firmware'].commands['run'], ["--mode", "0", "Ethernet0"]) @@ -496,7 +508,7 @@ def test_firmware_run_RJ45(self): assert result.exit_code == EXIT_FAIL @patch('sfputil.main.is_sfp_present', MagicMock(return_value=True)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) def test_firmware_commit_RJ45(self): runner = CliRunner() result = runner.invoke(sfputil.cli.commands['firmware'].commands['commit'], ["Ethernet0"]) @@ -504,7 +516,7 @@ def test_firmware_commit_RJ45(self): assert result.exit_code == EXIT_FAIL @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) @patch('sfputil.main.is_sfp_present', MagicMock(return_value=1)) def test_firmware_upgrade_RJ45(self): runner = CliRunner() @@ -513,7 +525,7 @@ def test_firmware_upgrade_RJ45(self): assert result.exit_code == EXIT_FAIL @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) - @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True)) + @patch('sfputil.main.is_rj45_port_from_api', MagicMock(return_value=True)) @patch('sfputil.main.is_sfp_present', MagicMock(return_value=1)) def test_firmware_download_RJ45(self): runner = CliRunner() diff --git a/utilities_common/platform_sfputil_helper.py b/utilities_common/platform_sfputil_helper.py index 8b6afd5ef5..a7f4477660 100644 --- a/utilities_common/platform_sfputil_helper.py +++ b/utilities_common/platform_sfputil_helper.py @@ -6,11 +6,7 @@ from sonic_py_common import multi_asic, device_info platform_sfputil = None -platform_chassis = None -platform_sfp_base = None -platform_porttab_mapping_read = False -RJ45_PORT_TYPE = 'RJ45' def load_platform_sfputil(): @@ -26,10 +22,6 @@ def load_platform_sfputil(): def platform_sfputil_read_porttab_mappings(): - global platform_porttab_mapping_read - - if platform_porttab_mapping_read: - return 0 try: @@ -43,8 +35,6 @@ def platform_sfputil_read_porttab_mappings(): # For single ASIC platforms we pass port_config_file_path and the asic_inst as 0 port_config_file_path = device_info.get_path_to_port_config_file() platform_sfputil.read_porttab_mappings(port_config_file_path, 0) - - platform_porttab_mapping_read = True except Exception as e: click.echo("Error reading port info (%s)" % str(e)) sys.exit(1) @@ -80,7 +70,7 @@ def get_physical_to_logical(): def get_interface_name(port, db): - if port != "all" and port is not None: + if port is not "all" and port is not None: alias = port iface_alias_converter = clicommon.InterfaceAliasConverter(db) if clicommon.get_interface_naming_mode() == "alias": @@ -93,7 +83,7 @@ def get_interface_name(port, db): def get_interface_alias(port, db): - if port != "all" and port is not None: + if port is not "all" and port is not None: alias = port iface_alias_converter = clicommon.InterfaceAliasConverter(db) if clicommon.get_interface_naming_mode() == "alias": @@ -103,34 +93,3 @@ def get_interface_alias(port, db): sys.exit(1) return port - - -def is_rj45_port(port_name): - global platform_sfputil - global platform_chassis - global platform_sfp_base - global platform_sfputil_loaded - - if not platform_chassis: - import sonic_platform - platform_chassis = sonic_platform.platform.Platform().get_chassis() - if not platform_sfp_base: - import sonic_platform_base - platform_sfp_base = sonic_platform_base.sfp_base.SfpBase - - if platform_chassis and platform_sfp_base: - if not platform_sfputil: - load_platform_sfputil() - - if not platform_porttab_mapping_read: - platform_sfputil_read_porttab_mappings() - - physical_port = logical_port_name_to_physical_port_list(port_name)[0] - try: - port_type = platform_chassis.get_port_or_cage_type(physical_port) - except NotImplementedError as e: - port_type = None - - return port_type == platform_sfp_base.SFP_PORT_TYPE_BIT_RJ45 - - return False