From 0d90023b4ae973bd2fa01381d4d1ba4992b45025 Mon Sep 17 00:00:00 2001 From: vdahiya12 <67608553+vdahiya12@users.noreply.github.com> Date: Tue, 31 May 2022 10:32:00 -0700 Subject: [PATCH] grpc client implementation for active-active dualtor (#248) This PR is to support the gRPC interaction with to the SoC/Nic-Simulator for link manager state machine ActiveActiveStateMachine to work if the port is configured as active-active cable type. It supports the RPC's support for SONiC by creating channels/stubs when a cable is pulled in as well as when the ycabled is initialized from supervisord. The logic to treat a cable/port as "active-active" comes from minigraph/config_db and then this PR has the logic to take care of serving RPC's as requested by other daemons. It does the RPC call when an appropriate request lands the ycabled as described below The following Tables are served by ycabled for gRPC RPC by listening to changes in app DB request from linkmgr/orchagent and corresponding results are written to state DB. HW_MUX_CABLE_TABLE HW_MUX_TABLE_TABLE_PEER This PR also adds logic to listening to the forwarding state command table and get the response back from gRPC and write to forwarding state response FORWARDING_STATE_COMMAND -> FORWARDING_STATE_RESPONSE for getting the forwarding state request/response using gRPC This PR also has logic for gRPC library build using build_ext extension. The proto definition is present in proto/proto_out/ The setup.py changes make sure that gRPC libs are generated correctly. the corresponding gRPC libs are generated in proto_out directory in python packages directory and are imported by ycabled Motivation and Context DualToR active-active support for gRPC interface to support the state machine How Has This Been Tested? Unit-Tests and deploying changes on a DualToR testbed --- .../proto/proto_out/linkmgr_grpc_driver.proto | 47 + sonic-ycabled/proto_out/__init__.py | 0 sonic-ycabled/setup.py | 32 +- sonic-ycabled/tests/test_y_cable_helper.py | 486 ++++++++- .../ycable/ycable_utilities/y_cable_helper.py | 925 +++++++++++++++--- 5 files changed, 1373 insertions(+), 117 deletions(-) create mode 100644 sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto create mode 100644 sonic-ycabled/proto_out/__init__.py diff --git a/sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto b/sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto new file mode 100644 index 000000000000..5426898bc491 --- /dev/null +++ b/sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto @@ -0,0 +1,47 @@ +syntax = "proto3"; + +service DualToRActive { + rpc QueryAdminForwardingPortState(AdminRequest) returns (AdminReply) {} + rpc SetAdminForwardingPortState(AdminRequest) returns (AdminReply) {} + rpc QueryOperationPortState(OperationRequest) returns (OperationReply) {} + rpc QueryLinkState(LinkStateRequest) returns (LinkStateReply) {} + rpc QueryServerVersion(ServerVersionRequest) returns (ServerVersionReply) {} +} + +message AdminRequest { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message AdminReply { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message OperationRequest { + repeated int32 portid = 1; +} + +message OperationReply { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message LinkStateRequest { + repeated int32 portid = 1; +} + +message LinkStateReply { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message ServerVersionRequest { + string version = 1; +} + +message ServerVersionReply { + string version = 1; +} + + diff --git a/sonic-ycabled/proto_out/__init__.py b/sonic-ycabled/proto_out/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sonic-ycabled/setup.py b/sonic-ycabled/setup.py index 778ed72ae7fb..60ee1414ccc2 100644 --- a/sonic-ycabled/setup.py +++ b/sonic-ycabled/setup.py @@ -1,9 +1,34 @@ from setuptools import setup, find_packages +from distutils.command.build_ext import build_ext as _build_ext +import distutils.command + +class GrpcTool(distutils.cmd.Command): + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + import grpc_tools.protoc + + grpc_tools.protoc.main([ + 'grpc_tools.protoc', + '-Iproto', + '--python_out=.', + '--grpc_python_out=.', + 'proto/proto_out/linkmgr_grpc_driver.proto' + ]) + +class BuildExtCommand (_build_ext, object): + def run(self): + self.run_command('GrpcTool') + super(BuildExtCommand, self).run() setup( name='sonic-ycabled', version='1.0', - description='Y-cable configuration daemon for SONiC', + description='Y-cable and smart nic configuration daemon for SONiC', license='Apache 2.0', author='SONiC Team', author_email='linuxnetdev@microsoft.com', @@ -16,13 +41,16 @@ 'ycabled = ycable.ycable:main', ] }, + cmdclass={'build_ext': BuildExtCommand, + 'GrpcTool': GrpcTool}, install_requires=[ # NOTE: This package also requires swsscommon, but it is not currently installed as a wheel 'enum34; python_version < "3.4"', 'sonic-py-common', ], setup_requires=[ - 'wheel' + 'wheel', + 'grpcio-tools' ], tests_require=[ 'pytest', diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index 8048cbcbcead..bf7d187e4ac9 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -4561,6 +4561,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_port(self, mock_swssc xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} @@ -4568,7 +4569,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_port(self, mock_swssc fvp = {"state": "active"} rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == -1) @patch('swsscommon.swsscommon.Table') @@ -4588,6 +4589,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_else_condition(self, moc xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} @@ -4595,7 +4597,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_else_condition(self, moc fvp = {"down_firmware": "null"} rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) @patch('swsscommon.swsscommon.Table') @@ -4606,6 +4608,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_else_condition(self, moc @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_port_locks', MagicMock(return_value=[0])) @patch('os.path.isfile', MagicMock(return_value=True)) @patch('time.sleep', MagicMock(return_value=True)) @@ -4619,6 +4622,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_manual(sel xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} port = "Ethernet0" @@ -4651,7 +4655,7 @@ def get_switching_mode(self): patched_util.get.return_value = PortInstanceHelper() rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) @patch('swsscommon.swsscommon.Table') @@ -4659,16 +4663,20 @@ def get_switching_mode(self): "lane_mask": "0", "direction": "0"}))) @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_port_locks', MagicMock(return_value=[0])) @patch('os.path.isfile', MagicMock(return_value=True)) def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_swsscommon_table): mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) mock_swsscommon_table.return_value = mock_table xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} @@ -4676,7 +4684,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_s fvp = {"state": "active"} rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == -1) @patch('swsscommon.swsscommon.Table') @@ -4688,11 +4696,14 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_s @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_port_locks', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('os.path.isfile', MagicMock(return_value=True)) @patch('time.sleep', MagicMock(return_value=True)) def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto(self, mock_swsscommon_table, platform_sfputil): mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) mock_swsscommon_table.return_value = mock_table xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table @@ -4700,6 +4711,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto(self, xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} port = "Ethernet0" @@ -4732,9 +4744,473 @@ def get_mux_direction(self): patched_util.get.return_value = PortInstanceHelper() rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) + def test_retry_setup_grpc_channel_for_port_incorrect(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1)] + Table = MagicMock() + Table.get.return_value = (status, fvs) + swsscommon.Table.return_value.get.return_value = ( + False, {"read_side": "2"}) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0) + assert(rc == False) + + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(True,True))) + def test_retry_setup_grpc_channel_for_port_correct(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1)] + Table = MagicMock() + Table.get.return_value = (status, fvs) + swsscommon.Table.return_value.get.return_value = ( + True, {"cable_type": "active-active", "soc_ipv4":"192.168.0.1/32", "state":"active"}) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0) + assert(rc == True) + + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None,None))) + def test_retry_setup_grpc_channel_for_port_correct_none_val(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1)] + Table = MagicMock() + Table.get.return_value = (status, fvs) + swsscommon.Table.return_value.get.return_value = ( + True, {"cable_type": "active-active", "soc_ipv4":"192.168.0.1/32", "state":"active"}) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0) + assert(rc == False) + + def test_process_loopback_interface_and_get_read_side_rc(self): + + loopback_keys = [["Loopback3|10.212.64.2/3", "Loopback3|2603:1010:100:d::1/128"]] + rc = process_loopback_interface_and_get_read_side(loopback_keys) + assert(rc == 0) + + def test_process_loopback_interface_and_get_read_side_rc_true(self): + + loopback_keys = [["Loopback3|10.212.64.1/3", "Loopback3|2603:1010:100:d::1/128"]] + rc = process_loopback_interface_and_get_read_side(loopback_keys) + assert(rc == 1) + + def test_process_loopback_interface_and_get_read_side_false(self): + + loopback_keys = [["Loopback2|10.212.64.1/3", "Loopback3|2603:1010:100:d::1/128"]] + rc = process_loopback_interface_and_get_read_side(loopback_keys) + assert(rc == -1) + + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + def test_check_identifier_presence_and_setup_channel(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + + state_db = {} + test_db = "TEST_DB" + y_cable_tbl = {} + static_tbl = {} + mux_tbl = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + port_tbl = {} + read_side = 0 + asic_index = 0 + y_cable_presence = [True] + delete_change_event = [True] + + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE1") + hw_mux_cable_tbl_peer[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE2") + + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + assert(rc == None) + + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None, None))) + def test_check_identifier_presence_and_setup_channel_with_mock(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + + state_db = {} + test_db = "TEST_DB" + y_cable_tbl = {} + static_tbl = {} + mux_tbl = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + port_tbl = {} + read_side = 0 + asic_index = 0 + y_cable_presence = [True] + delete_change_event = [True] + + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE1") + hw_mux_cable_tbl_peer[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE2") + + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + assert(rc == None) + + + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None, None))) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_stubs', MagicMock(return_value={})) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_channels', MagicMock(return_value={})) + def test_check_identifier_presence_and_setup_channel_with_mock_not_none(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + + state_db = {} + test_db = "TEST_DB" + y_cable_tbl = {} + static_tbl = {} + mux_tbl = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + port_tbl = {} + read_side = 0 + asic_index = 0 + y_cable_presence = [True] + delete_change_event = [True] + + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE1") + hw_mux_cable_tbl_peer[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE2") + + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + assert(rc == None) + + @patch('proto_out.linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub', MagicMock(return_value=True)) + def test_setup_grpc_channel_for_port(self): + + rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") + + assert(rc == (None, None)) + + + def test_setup_grpc_channels(self): + + stop_event = MagicMock() + stop_event.is_set.return_value = False + with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util: + + patched_util.logical.return_value = ['Ethernet0', 'Ethernet4'] + patched_util.get_asic_id_for_logical_port.return_value = 0 + rc = setup_grpc_channels(stop_event) + + assert(rc == None) + + + def test_check_mux_cable_port_type_get_none(self): + + stop_event = MagicMock() + test_db = "TEST_DB" + status = False + asic_index = 0 + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + stop_event.is_set.return_value = False + port_tbl = {} + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + + rc = check_mux_cable_port_type("Ethernet0", port_tbl, 0) + assert(rc == (False, None)) + + + def test_check_mux_cable_port_type_get_correct(self): + + stop_event = MagicMock() + status = True + asic_index = 0 + test_db = "TEST_DB" + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + stop_event.is_set.return_value = False + port_tbl = {} + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + + rc = check_mux_cable_port_type("Ethernet0", port_tbl, 0) + assert(rc == (True, "active-active")) + + + def test_check_mux_cable_port_type_get_correct_standby(self): + + stop_event = MagicMock() + status = True + asic_index = 0 + test_db = "TEST_DB" + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-standby'), ('soc_ipv4','192.168.0.1')] + stop_event.is_set.return_value = False + port_tbl = {} + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + + rc = check_mux_cable_port_type("Ethernet0", port_tbl, 0) + assert(rc == (True, "active-standby")) + + + def test_parse_grpc_response_hw_mux_cable_change_state(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0] + self.state = [True] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(True, response, 0, "Ethernet0") + assert(rc == "active") + + + def test_parse_grpc_response_hw_mux_cable_change_state_standby(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(True, response, 0, "Ethernet0") + assert(rc == "standby") + + + def test_parse_grpc_response_hw_mux_cable_change_state_unknown(self): + + class Response_Helper(): + def __init__(self): + self.portid = [1] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(True, response, 0, "Ethernet0") + assert(rc == "unknown") + + + def test_parse_grpc_response_hw_mux_cable_change_state_unknown_false(self): + + class Response_Helper(): + def __init__(self): + self.portid = [1] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(False, response, 0, "Ethernet0") + assert(rc == "unknown") + + + def test_parse_grpc_response_forwarding_state_unknown_false(self): + + class Response_Helper(): + def __init__(self): + self.portid = [1] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(False, None, 0) + assert(rc == ("unknown", "unknown")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("active", "standby")) + + + def test_parse_grpc_response_forwarding_state_active_active_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "active")) + + + def test_parse_grpc_response_forwarding_state_active_active_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_active_active_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "active")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("standby", "active")) + + + def test_parse_grpc_response_forwarding_state_standby_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "standby")) + + + def test_parse_grpc_response_forwarding_state_standby_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("standby", "standby")) + + + def test_parse_grpc_response_forwarding_state_active_active_with_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_standby_standby_with_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "standby")) + + + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_stubs', MagicMock(return_value={})) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_channels', MagicMock(return_value={})) + def test_parse_grpc_response_forwarding_state_standby_standby_with_true_read_side(self): + + status = True + asic_index = 0 + test_db = "TEST_DB" + port = "Ethernet0" + fvs_m = [('command', "probe"), ('read_side', 1), ('cable_type','active-standby'), ('soc_ipv4','192.168.0.1')] + hw_mux_cable_tbl = {} + fwd_state_response_tbl = {} + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + fwd_state_response_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + hw_mux_cable_tbl[asic_index].get.return_value = (status, fvs_m) + + rc = handle_fwd_state_command_grpc_notification(fvs_m, hw_mux_cable_tbl, fwd_state_response_tbl, asic_index, port, "TestDB") + assert(rc == True) def test_get_mux_cable_static_info_without_presence(self): rc = get_muxcable_static_info_without_presence() diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index 3504e111992c..da247650603a 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -4,18 +4,28 @@ """ import datetime +import ipaddress import os import re +import sys import threading import time from importlib import import_module + +import grpc +from proto_out import linkmgr_grpc_driver_pb2_grpc +from proto_out import linkmgr_grpc_driver_pb2 from sonic_py_common import daemon_base, logger from sonic_py_common import multi_asic from sonic_y_cable import y_cable_vendor_mapping from swsscommon import swsscommon +if sys.version_info.major == 3: + UNICODE_TYPE = str +else: + UNICODE_TYPE = unicode SELECT_TIMEOUT = 1000 @@ -23,6 +33,25 @@ y_cable_platform_chassis = None y_cable_is_platform_vs = None +# Global port channels for gRPC RPC's +grpc_port_channels = {} +# Global port channel stubs for gRPC RPC's +grpc_port_stubs = {} + +GRPC_PORT = 50075 + +read_side = -1 + +DEFAULT_NAMESPACE = "" + +LOOPBACK_INTERFACE_T0 = "10.212.64.1/32" +LOOPBACK_INTERFACE_LT0 = "10.212.64.2/32" +LOOPBACK_INTERFACE_T0_NIC = "10.1.0.37/32" +LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.38/32" +# rename and put in right place +# port id 0 -> maps to T0 +# port id 1 -> maps to LT0 + SYSLOG_IDENTIFIER = "y_cable_helper" helper_logger = logger.Logger(SYSLOG_IDENTIFIER) @@ -234,6 +263,313 @@ def set_show_firmware_fields(port, mux_info_dict, xcvrd_show_fw_rsp_tbl): return 0 + +def check_mux_cable_port_type(logical_port_name, port_tbl, asic_index): + + (status, fvs) = port_tbl[asic_index].get(logical_port_name) + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) + return (False, None) + + else: + # Convert list of tuples to a dictionary + mux_table_dict = dict(fvs) + if "state" in mux_table_dict: + + val = mux_table_dict.get("state", None) + cable_type = mux_table_dict.get("cable_type", None) + + if val in ["active", "standby", "auto", "manual"]: + if cable_type == "active-active": + helper_logger.log_debug("Y_CABLE_DEBUG:check_mux_cable_port_type returning True active-active port {}".format(logical_port_name)) + return (True , "active-active") + else: + helper_logger.log_debug("Y_CABLE_DEBUG:check_mux_cable_port_type returning True active-standby port {}".format(logical_port_name)) + return (True, "active-standby") + else: + helper_logger.log_debug("Y_CABLE_DEBUG:check_mux_cable_port_type returning False None port {}".format(logical_port_name)) + return (False, None) + + +def hook_grpc_nic_simulated(target, soc_ip): + """ + Args: + target (function): The function collecting transceiver info. + """ + + #NIC_SIMULATOR_CONFIG_FILE = "/etc/sonic/nic_simulator.json" + + def wrapper(*args, **kwargs): + #res = target(*args, **kwargs) + if os.path.exists(MUX_SIMULATOR_CONFIG_FILE): + """setup channels for all downlinks + NIC simulator will run on same port number + Todo put a task for secure channel""" + channel = grpc.insecure_channel("server_ip:GRPC_PORT".format(host)) + stub = None + #metadata_interceptor = MetadataInterceptor(("grpc_server", soc_ipv4)) + #intercept_channel = grpc.intercept_channel(channel, metadata_interceptor) + #stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(intercept_channel) + # TODO hook the interceptor appropriately + return channel, stub + + wrapper.__name__ = target.__name__ + + return wrapper + + +def retry_setup_grpc_channel_for_port(port, asic_index): + + config_db, port_tbl = {}, {} + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + + (status, fvs) = port_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(port, port_tbl[asic_index].getTableName())) + return False + + else: + # Convert list of tuples to a dictionary + mux_table_dict = dict(fvs) + if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict: + + soc_ipv4_full = mux_table_dict.get("soc_ipv4", None) + if soc_ipv4_full is not None: + soc_ipv4 = soc_ipv4_full.split('/')[0] + + channel, stub = setup_grpc_channel_for_port(port, soc_ipv4) + if channel is None or stub is None: + helper_logger.log_notice( + "stub is None, while reattempt setting up channels did not work {}".format(port)) + return False + else: + grpc_port_channels[port] = channel + grpc_port_stubs[port] = stub + return True + +def setup_grpc_channel_for_port(port, soc_ip): + "TODO make these configurable like RESTAPI" + """ + root_cert = open('/etc/sonic/credentials/ca-chain-bundle.cert.pem', 'rb').read() + key = open('/etc/sonic/credentials/client.key.pem', 'rb').read() + cert_chain = open('/etc/sonic/credentials/client.cert.pem', 'rb').read() + + """ + """ + Dummy values for lab for now + TODO remove these once done + root_cert = open('/home/admin/proto_out1/proto_out/ca-chain-bundle.cert.pem', 'rb').read() + key = open('/home/admin/proto_out1/proto_out/client.key.pem', 'rb').read() + cert_chain = open('/home/admin/proto_out1/proto_out/client.cert.pem', 'rb').read() + """ + """credential = grpc.ssl_channel_credentials( + root_certificates=root_cert, + private_key=key, + certificate_chain=cert_chain) + """ + helper_logger.log_debug("Y_CABLE_DEBUG:setting up gRPC channel for RPC's {} {}".format(port,soc_ip)) + channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=[('grpc.keepalive_timeout_ms', 1000)]) + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) + + channel_ready = grpc.channel_ready_future(channel) + + try: + channel_ready.result(timeout=0.2) + except grpc.FutureTimeoutError: + channel = None + stub = None + + if stub is None: + helper_logger.log_warning("stub was not setup gRPC ip {} port {}, no gRPC server running ".format(soc_ip, port)) + if channel is None: + helper_logger.log_warning("channel was not setup gRPC ip {} port {}, no gRPC server running".format(soc_ip, port)) + + return channel, stub + + +def process_loopback_interface_and_get_read_side(loopback_keys): + + asic_index = multi_asic.get_asic_index_from_namespace(DEFAULT_NAMESPACE) + + for key in loopback_keys[asic_index]: + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback key = {} ".format(key)) + if key.startswith("Loopback3|") and "/" in key and "::" not in key: + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback split 1 {} ".format(key)) + temp_list = key.split('|') + addr = temp_list[1].split('/')[0] + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback split 2 {} ".format(addr)) + loopback_prefix = ipaddress.ip_network(UNICODE_TYPE(addr)) + loopback_address = str(loopback_prefix) + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback address parsed = {} ".format(loopback_address)) + if loopback_address == LOOPBACK_INTERFACE_LT0 or loopback_address == LOOPBACK_INTERFACE_LT0_NIC: + return 0 + elif loopback_address == LOOPBACK_INTERFACE_T0 or loopback_address == LOOPBACK_INTERFACE_T0_NIC: + return 1 + else: + # Loopback3 should be present, if not present log a warning + helper_logger.log_warning("Could not get any address associated with Loopback3") + return -1 + + return -1 + + +def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence): + + global grpc_port_stubs + global grpc_port_channels + + (status, fvs) = port_tbl[asic_index].get(logical_port_name) + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) + return + + else: + # Convert list of tuples to a dictionary + mux_table_dict = dict(fvs) + if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict: + + val = mux_table_dict.get("state", None) + soc_ipv4_full = mux_table_dict.get("soc_ipv4", None) + if soc_ipv4_full is not None: + soc_ipv4 = soc_ipv4_full.split('/')[0] + cable_type = mux_table_dict.get("cable_type", None) + + if val in ["active", "standby", "auto", "manual"] and cable_type == "active-active": + + # import the module and load the port instance + y_cable_presence[:] = [True] + physical_port_list = logical_port_name_to_physical_port_list( + logical_port_name) + + if len(physical_port_list) == 1: + + physical_port = physical_port_list[0] + if y_cable_wrapper_get_presence(physical_port): + prev_stub = grpc_port_stubs.get(logical_port_name, None) + prev_channel = grpc_port_channels.get(logical_port_name, None) + if prev_channel is not None and prev_stub is not None: + return + + channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4) + if channel is not None: + grpc_port_channels[logical_port_name] = channel + helper_logger.log_notice( + "channel is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ipv4, logical_port_name)) + if stub is not None: + grpc_port_stubs[logical_port_name] = stub + helper_logger.log_notice( + "stub is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ipv4, logical_port_name)) + + fvs_updated = swsscommon.FieldValuePairs([('read_side', str(read_side))]) + hw_mux_cable_tbl[asic_index].set(logical_port_name, fvs_updated) + hw_mux_cable_tbl_peer[asic_index].set(logical_port_name, fvs_updated) + else: + helper_logger.log_warning( + "DAC cable not present while Channel setup Port {} for gRPC channel initiation".format(logical_port_name)) + + else: + helper_logger.log_warning( + "DAC cable logical to physical port mapping returned more than one physical ports while Channel setup Port {}".format(logical_port_name)) + else: + helper_logger.log_warning( + "DAC cable logical to physical port mapping returned more than one physical ports while Channel setup Port {}".format(logical_port_name)) + + +def setup_grpc_channels(stop_event): + + global read_side + helper_logger.log_debug("Y_CABLE_DEBUG:setting up channels for active-active") + config_db, state_db, port_tbl, loopback_tbl, port_table_keys = {}, {}, {}, {}, {} + loopback_keys = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + loopback_tbl[asic_id] = swsscommon.Table( + config_db[asic_id], "LOOPBACK_INTERFACE") + loopback_keys[asic_id] = loopback_tbl[asic_id].getKeys() + port_table_keys[asic_id] = port_tbl[asic_id].getKeys() + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + + if read_side == -1: + read_side = process_loopback_interface_and_get_read_side(loopback_keys) + + helper_logger.log_debug("Y_CABLE_DEBUG:while setting up grpc channels read side = {}".format(read_side)) + + # Init PORT_STATUS table if ports are on Y cable + logical_port_list = y_cable_platform_sfputil.logical + for logical_port_name in logical_port_list: + if stop_event.is_set(): + break + + # Get the asic to which this port belongs + asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port( + logical_port_name) + if asic_index is None: + helper_logger.log_warning( + "Got invalid asic index for {}, ignored".format(logical_port_name)) + continue + + if logical_port_name in port_table_keys[asic_index]: + check_identifier_presence_and_setup_channel( + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + else: + # This port does not exist in Port table of config but is present inside + # logical_ports after loading the port_mappings from port_config_file + # This should not happen + helper_logger.log_warning( + "Could not retreive port inside config_db PORT table {} for gRPC channel initiation".format(logical_port_name)) + + +def try_grpc(callback, *args, **kwargs): + """ + Handy function to invoke the callback and catch NotImplementedError + :param callback: Callback to be invoked + :param args: Arguments to be passed to callback + :param kwargs: Default return value if exception occur + :return: Default return value if exception occur else return value of the callback + """ + + return_val = True + try: + resp = callback(*args) + if resp is None: + return_val = False + except grpc.RpcError as e: + #err_msg = 'Grpc error code '+str(e.code()) + if e.code() == grpc.StatusCode.CANCELLED: + helper_logger.log_notice("rpc cancelled for port= {}".format(str(e.code()))) + elif e.code() == grpc.StatusCode.UNAVAILABLE: + helper_logger.log_notice("rpc unavailable for port= {}".format(str(e.code()))) + elif e.code() == grpc.StatusCode.INVALID_ARGUMENT: + helper_logger.log_notice("rpc invalid for port= {}".format(str(e.code()))) + else: + helper_logger.log_notice("rpc exception error for port= {}".format(str(e.code()))) + resp = None + return_val = False + + return return_val, resp + + +def close(channel): + "Close the channel" + channel.close() + def set_result_and_delete_port(result, actual_result, command_table, response_table, port): fvs = swsscommon.FieldValuePairs([(result, str(actual_result))]) response_table.set(port, fvs) @@ -819,11 +1155,16 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen global y_cable_platform_chassis global y_cable_port_instances global y_cable_is_platform_vs + global read_side # Connect to CONFIG_DB and create port status table inside state_db config_db, state_db, port_tbl, y_cable_tbl = {}, {}, {}, {} static_tbl, mux_tbl = {}, {} port_table_keys = {} xcvrd_log_tbl = {} + loopback_tbl= {} + loopback_keys = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} y_cable_platform_sfputil = platform_sfp y_cable_platform_chassis = platform_chassis @@ -839,6 +1180,17 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen port_table_keys[asic_id] = port_tbl[asic_id].getKeys() xcvrd_log_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "XCVRD_LOG") xcvrd_log_tbl[asic_id].set("Y_CABLE", fvs_updated) + loopback_tbl[asic_id] = swsscommon.Table( + config_db[asic_id], "LOOPBACK_INTERFACE") + loopback_keys[asic_id] = loopback_tbl[asic_id].getKeys() + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + + if read_side == -1: + read_side = process_loopback_interface_and_get_read_side(loopback_keys) # Init PORT_STATUS table if ports are on Y cable logical_port_list = y_cable_platform_sfputil.logical @@ -855,8 +1207,13 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen continue if logical_port_name in port_table_keys[asic_index]: - check_identifier_presence_and_update_mux_table_entry( - state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + (status, cable_type) = check_mux_cable_port_type(logical_port_name, port_tbl, asic_index) + if status and cable_type == "active-standby": + check_identifier_presence_and_update_mux_table_entry( + state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + if status and cable_type == "active-active": + check_identifier_presence_and_setup_channel( + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) else: # This port does not exist in Port table of config but is present inside # logical_ports after loading the port_mappings from port_config_file @@ -867,10 +1224,15 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, stop_event=threading.Event()): # Connect to CONFIG_DB and create port status table inside state_db + global read_side config_db, state_db, port_tbl, y_cable_tbl = {}, {}, {}, {} static_tbl, mux_tbl = {}, {} port_table_keys = {} delete_change_event = [False] + loopback_tbl= {} + loopback_keys = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} # Get the namespaces in the platform namespaces = multi_asic.get_front_end_namespaces() @@ -880,6 +1242,18 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, st config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") port_table_keys[asic_id] = port_tbl[asic_id].getKeys() + loopback_tbl[asic_id] = swsscommon.Table( + config_db[asic_id], "LOOPBACK_INTERFACE") + loopback_keys[asic_id] = loopback_tbl[asic_id].getKeys() + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + + if read_side == -1: + read_side = process_loopback_interface_and_get_read_side(loopback_keys) + # Init PORT_STATUS table if ports are on Y cable and an event is received for logical_port_name, value in port_dict.items(): @@ -895,8 +1269,13 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, st if logical_port_name in port_table_keys[asic_index]: if value == SFP_STATUS_INSERTED: helper_logger.log_info("Got SFP inserted ycable event") - check_identifier_presence_and_update_mux_table_entry( - state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + (status, cable_type) = check_mux_cable_port_type(logical_port_name, port_tbl, asic_index) + if status and cable_type == "active-standby": + check_identifier_presence_and_update_mux_table_entry( + state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + if status and cable_type == "active-active": + check_identifier_presence_and_setup_channel( + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) elif value == SFP_STATUS_REMOVED: helper_logger.log_info("Got SFP deleted ycable event") check_identifier_presence_and_delete_mux_table_entry( @@ -992,7 +1371,7 @@ def check_identifier_presence_and_update_mux_info_entry(state_db, mux_tbl, asic_ (status, fvs) = port_tbl[asic_index].get(logical_port_name) if status is False: - helper_logger.log_debug("Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) return else: @@ -1526,7 +1905,7 @@ def post_port_mux_info_to_db(logical_port_name, table): else: mux_info_dict = get_muxcable_info(physical_port, logical_port_name) - if mux_info_dict is not None and mux_info_dict != -1: + if mux_info_dict is not None and mux_info_dict != -1: #transceiver_dict[physical_port] = port_info_dict fvs = swsscommon.FieldValuePairs( [('tor_active', mux_info_dict["tor_active"]), @@ -1663,7 +2042,7 @@ def gather_arg_from_db_and_check_for_type(arg_tbl, port, key, fvp_dict, arg): """ def task_download_firmware_worker(port, physical_port, port_instance, file_full_path, xcvrd_down_fw_rsp_tbl, xcvrd_down_fw_cmd_sts_tbl, rc): - helper_logger.log_debug("worker thread launched for downloading physical port {} path {}".format(physical_port, file_full_path)) + helper_logger.log_debug("Y_CABLE_DEBUG:worker thread launched for downloading physical port {} path {}".format(physical_port, file_full_path)) try: status = port_instance.download_firmware(file_full_path) time.sleep(5) @@ -1672,9 +2051,9 @@ def task_download_firmware_worker(port, physical_port, port_instance, file_full_ helper_logger.log_warning("Failed to execute the download firmware API for port {} due to {}".format(physical_port,repr(e))) set_result_and_delete_port('status', status, xcvrd_down_fw_cmd_sts_tbl, xcvrd_down_fw_rsp_tbl, port) - helper_logger.log_debug(" downloading complete {} {} {}".format(physical_port, file_full_path, status)) + helper_logger.log_debug("Y_CABLE_DEBUG:downloading complete {} {} {}".format(physical_port, file_full_path, status)) rc[0] = status - helper_logger.log_debug("download thread finished port {} physical_port {}".format(port, physical_port)) + helper_logger.log_debug("Y_CABLE_DEBUG:download thread finished port {} physical_port {}".format(port, physical_port)) def handle_config_prbs_cmd_arg_tbl_notification(fvp, xcvrd_config_prbs_cmd_arg_tbl, xcvrd_config_prbs_cmd_sts_tbl, xcvrd_config_prbs_rsp_tbl, asic_index, port): @@ -2444,7 +2823,10 @@ def handle_config_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_config_hwmode helper_logger.log_error("Error: Wrong input param for cli command config mux hwmode state active/standby logical port {}".format(port)) set_result_and_delete_port('result', 'False', xcvrd_config_hwmode_state_cmd_sts_tbl[asic_index], xcvrd_config_hwmode_state_rsp_tbl[asic_index], port) -def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port): +def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port): + state_db = {} + hw_mux_cable_tbl = {} + fvp_dict = dict(fvp) if "state" in fvp_dict: @@ -2466,60 +2848,322 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_show_hwmode_dir set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) return -1 + (cable_status, cable_type) = check_mux_cable_port_type(port, port_tbl, asic_index) - port_instance = get_ycable_port_instance_from_logical_port(port) - if port_instance is None or port_instance in port_mapping_error_values: - # error scenario update table accordingly - state = 'not Y-Cable port' - helper_logger.log_error( - "Error: Could not get port instance for cli command show mux hwmode muxdirection Y cable port {}".format(port)) - set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 + if cable_status and cable_type == "active-standby": - with y_cable_port_locks[physical_port]: - try: - read_side = port_instance.get_read_side() - except Exception as e: - read_side = None - helper_logger.log_warning("Failed to execute the get_read_side API for port {} due to {}".format(physical_port,repr(e))) + port_instance = get_ycable_port_instance_from_logical_port(port) + if port_instance is None or port_instance in port_mapping_error_values: + # error scenario update table accordingly + state = 'not Y-Cable port' + helper_logger.log_error( + "Error: Could not get port instance for cli command show mux hwmode muxdirection Y cable port {}".format(port)) + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 - if read_side is None or read_side == port_instance.EEPROM_ERROR or read_side < 0: + with y_cable_port_locks[physical_port]: + try: + read_side = port_instance.get_read_side() + except Exception as e: + read_side = None + helper_logger.log_warning("Failed to execute the get_read_side API for port {} due to {}".format(physical_port,repr(e))) - state = 'unknown' - helper_logger.log_warning( - "Error: Could not get read side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) - set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 + if read_side is None or read_side == port_instance.EEPROM_ERROR or read_side < 0: - with y_cable_port_locks[physical_port]: - try: - active_side = port_instance.get_mux_direction() - except Exception as e: - active_side = None - helper_logger.log_warning("Failed to execute the get_mux_direction API for port {} due to {}".format(physical_port,repr(e))) + state = 'unknown' + helper_logger.log_warning( + "Error: Could not get read side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 - if active_side is None or active_side == port_instance.EEPROM_ERROR or active_side < 0: + with y_cable_port_locks[physical_port]: + try: + active_side = port_instance.get_mux_direction() + except Exception as e: + active_side = None + helper_logger.log_warning("Failed to execute the get_mux_direction API for port {} due to {}".format(physical_port,repr(e))) - state = 'unknown' - helper_logger.log_warning("Error: Could not get active side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + if active_side is None or active_side == port_instance.EEPROM_ERROR or active_side < 0: + + state = 'unknown' + helper_logger.log_warning("Error: Could not get active side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 + + if read_side == active_side and (active_side == 1 or active_side == 2): + state = 'active' + elif read_side != active_side and (active_side == 1 or active_side == 2): + state = 'standby' + else: + state = 'unknown' + helper_logger.log_warning("Error: Could not get valid state for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 - if read_side == active_side and (active_side == 1 or active_side == 2): - state = 'active' - elif read_side != active_side and (active_side == 1 or active_side == 2): - state = 'standby' - else: - state = 'unknown' - helper_logger.log_warning("Error: Could not get valid state for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + elif cable_status and cable_type == "active-active": + + + namespaces = multi_asic.get_front_end_namespaces() + # Get the keys from PORT table inside config db to prepare check for mux_cable identifier + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + + (status, fv) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table while responding to cli cmd show mux status {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + + mux_port_dict = dict(fv) + read_side = mux_port_dict.get("read_side", None) + helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side)) + # TODO state only for dummy value in this request MSG remove this + request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[int(read_side), 1 - int(read_side)], state=[0, 0]) + helper_logger.log_debug( + "Y_CABLE_DEBUG:calling RPC for getting cli forwarding state read_side portid = {} Ethernet port {}".format(read_side, port)) + + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) + retry_setup_grpc_channel_for_port(port, asic_index) + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_warning( + "stub was None for performing cli fwd mux RPC port {}, setting it up again did not work".format(port)) + set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return + + ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + + (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) + state = self_state set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 + if response is not None: + # Debug only, remove this section once Server side is Finalized + fwd_response_port_ids = response.portid + fwd_response_port_ids_state = response.state + helper_logger.log_notice( + "forwarding state RPC received response port ids = {} port {}".format(fwd_response_port_ids, port)) + helper_logger.log_notice( + "forwarding state RPC received response state values = {} port {}".format(fwd_response_port_ids_state, port)) + else: + helper_logger.log_notice("response was none cli handle_fwd_state_command_grpc_notification {} ".format(port)) + + else: + helper_logger.log_warning("Error: Wrong input param for cli command show mux hwmode muxdirection logical port {}".format(port)) + set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + +def parse_grpc_response_hw_mux_cable_change_state(ret, response, portid, port): + state = 'unknown' + "return a list of states" + if ret is True: + if response.portid[0] == portid: + if response.state[0] == True: + state = 'active' + # No other values expected + elif response.state[0] == False: + state = 'standby' + else: + helper_logger.log_warning("recieved an error state while parsing response hw mux no response state for port".format(port)) + else: + helper_logger.log_warning("recieved an error portid while parsing response hw mux no portid for port".format(port)) - set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) else: - helper_logger.log_warning("Error: Wrong input param for cli command show mux hwmode muxdirection logical port {}".format(port)) - set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + helper_logger.log_warning("recieved an error state while parsing response hw mux for port".format(port)) + state = 'unknown' + + return state + + +def parse_grpc_response_forwarding_state(ret, response, read_side): + self_state = peer_state = 'unknown' + + if ret is True and response is not None: + if int(read_side) == 0: + if response.state[0] == True: + self_state = 'active' + elif response.state[0] == False: + self_state = 'standby' + # No other values expected, should we raise exception/msg + # TODO handle other responses + if response.state[1] == True: + peer_state = 'active' + elif response.state[1] == False: + peer_state = 'standby' + + elif int(read_side) == 1: + if response.state[1] == True: + self_state = 'active' + elif response.state[1] == False: + self_state = 'standby' + if response.state[0] == True: + peer_state = 'active' + elif response.state[0] == False: + peer_state = 'standby' + else: + self_state = 'unknown' + peer_state = 'unknown' + + return (self_state, peer_state) + + +def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_state_response_tbl, asic_index, port, appl_db): + + helper_logger.log_debug("Y_CABLE_DEBUG:recevied the notification fwd state port {}".format(port)) + fvp_dict = dict(fvp_m) + + if "command" in fvp_dict: + # check if xcvrd got a probe command + probe_identifier = fvp_dict["command"] + + if probe_identifier == "probe": + helper_logger.log_debug("Y_CABLE_DEBUG:processing the notification fwd_state port {}".format(port)) + (status, fv) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + return False + mux_port_dict = dict(fv) + read_side = mux_port_dict.get("read_side") + helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side)) + # TODO state only for dummy value in this request MSG remove this + request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[int(read_side), 1 - int(read_side)], state=[0, 0]) + helper_logger.log_warning( + "calling RPC for getting forwarding state read_side portid = {} Ethernet port {}".format(read_side, port)) + + self_state = "unknown" + peer_state = "unknown" + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) + retry_setup_grpc_channel_for_port(port, asic_index) + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_warning( + "stub was None for performing fwd mux RPC port {}, setting it up again did not work".format(port)) + fvs_updated = swsscommon.FieldValuePairs([('response', str(self_state)), + ('response_peer', str(peer_state))]) + fwd_state_response_tbl[asic_index].set(port, fvs_updated) + return + + ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + + (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) + if response is not None: + # Debug only, remove this section once Server side is Finalized + fwd_response_port_ids = response.portid + fwd_response_port_ids_state = response.state + helper_logger.log_notice( + "forwarding state RPC received response port ids = {} port {}".format(fwd_response_port_ids, port)) + helper_logger.log_notice( + "forwarding state RPC received response state values = {} port {}".format(fwd_response_port_ids_state, port)) + else: + helper_logger.log_notice("response was none handle_fwd_state_command_grpc_notification {} ".format(port)) + + fvs_updated = swsscommon.FieldValuePairs([('response', str(self_state)), + ('response_peer', str(peer_state))]) + fwd_state_response_tbl[asic_index].set(port, fvs_updated) + helper_logger.log_debug("Y_CABLE_DEBUG:processed the notification fwd state cleanly") + return True + else: + helper_logger.log_warning("probe val not present in the notification fwd state handling port {}".format(port)) + else: + helper_logger.log_warning("command key not present in the notification fwd state handling port {}".format(port)) + + +def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_index, grpc_metrics_tbl, peer, port): + + # entering this section signifies a gRPC start for state + # change request from swss so initiate recording in mux_metrics table + time_start = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") + # This check might be redundant, to check, the presence of this Port in keys + # in logical_port_list but keep for now for coherency + # also skip checking in logical_port_list inside sfp_util + + helper_logger.log_debug("Y_CABLE_DEBUG:recevied the notification mux hw state") + fvp_dict = dict(fvp) + toggle_side = "self" + + if "state" in fvp_dict: + # got a state change + new_state = fvp_dict["state"] + requested_status = new_state + if requested_status in ["active", "standby"]: + + (status, fvs) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + return + helper_logger.log_debug("Y_CABLE_DEBUG processing the notification mux hw state port {}".format(port)) + mux_port_dict = dict(fvs) + old_state = mux_port_dict.get("state", None) + read_side = mux_port_dict.get("read_side", None) + curr_read_side = int(read_side) + # Now whatever is the state requested, call gRPC to update the soc state appropriately + if peer == True: + curr_read_side = 1-int(read_side) + toggle_side = "peer" + + if new_state == "active": + state_req = 1 + elif new_state == "standby": + state_req = 0 + + helper_logger.log_notice( + "calling RPC for hw mux_cable set state state peer = {} portid {} Ethernet port".format(peer, port)) + + request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[curr_read_side], state=[state_req]) + + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_debug("Y_CABLE_DEBUG:stub is None for performing hw mux RPC port {}".format(port)) + retry_setup_grpc_channel_for_port(port, asic_index) + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_notice( + "stub was None for performing hw mux RPC port {}, setting it up again did not work".format(port)) + return + + ret, response = try_grpc(stub.SetAdminForwardingPortState, request, timeout=0.1) + if response is not None: + # Debug only, remove this section once Server side is Finalized + hw_response_port_ids = response.portid + hw_response_port_ids_state = response.state + helper_logger.log_notice( + "Set admin state RPC received response port ids = {}".format(hw_response_port_ids)) + helper_logger.log_notice( + "Set admin state RPC received response state values = {}".format(hw_response_port_ids_state)) + else: + helper_logger.log_notice("response was none hw_mux_cable_table_grpc_notification {} ".format(port)) + + active_side = parse_grpc_response_hw_mux_cable_change_state(ret, response, curr_read_side, port) + + if active_side == "unknown": + helper_logger.log_warning( + "ERR: Got a change event for updating gRPC but could not toggle the mux-direction for port {} state from {} to {}, writing unknown".format(port, old_state, new_state)) + new_state = 'unknown' + + time_end = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") + fvs_metrics = swsscommon.FieldValuePairs([('grpc_switch_{}_{}_start'.format(toggle_side, new_state), str(time_start)), + ('grpc_switch_{}_{}_end'.format(toggle_side, new_state), str(time_end))]) + grpc_metrics_tbl[asic_index].set(port, fvs_metrics) + + fvs_updated = swsscommon.FieldValuePairs([('state', new_state), + ('read_side', read_side), + ('active_side', str(active_side))]) + hw_mux_cable_tbl[asic_index].set(port, fvs_updated) + helper_logger.log_debug("Y_CABLE_DEBUG: processed the notification hw mux state cleanly {}".format(port)) + else: + helper_logger.log_info("Got a change event on port {} of table {} that does not contain state".format( + port, swsscommon.APP_HW_MUX_CABLE_TABLE_NAME)) + # Thread wrapper class to update y_cable status periodically class YCableTableUpdateTask(object): @@ -2536,9 +3180,11 @@ def __init__(self): def task_worker(self): # Connect to STATE_DB and APPL_DB and get both the HW_MUX_STATUS_TABLE info - appl_db, state_db, config_db, status_tbl, y_cable_tbl = {}, {}, {}, {}, {} - y_cable_tbl_keys = {} - mux_cable_command_tbl, y_cable_command_tbl = {}, {} + appl_db, state_db, config_db, status_tbl, status_tbl_peer = {}, {}, {}, {}, {} + hw_mux_cable_tbl, hw_mux_cable_tbl_peer = {}, {} + hw_mux_cable_tbl_keys = {} + port_tbl, port_table_keys = {}, {} + fwd_state_command_tbl, fwd_state_response_tbl, mux_cable_command_tbl = {}, {}, {} mux_metrics_tbl = {} sel = swsscommon.Select() @@ -2550,19 +3196,30 @@ def task_worker(self): asic_id = multi_asic.get_asic_index_from_namespace(namespace) appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace) config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) status_tbl[asic_id] = swsscommon.SubscriberStateTable( appl_db[asic_id], swsscommon.APP_HW_MUX_CABLE_TABLE_NAME) mux_cable_command_tbl[asic_id] = swsscommon.SubscriberStateTable( appl_db[asic_id], swsscommon.APP_MUX_CABLE_COMMAND_TABLE_NAME) - y_cable_command_tbl[asic_id] = swsscommon.Table( - appl_db[asic_id], swsscommon.APP_MUX_CABLE_COMMAND_TABLE_NAME) - state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - y_cable_tbl[asic_id] = swsscommon.Table( - state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) mux_metrics_tbl[asic_id] = swsscommon.Table( state_db[asic_id], swsscommon.STATE_MUX_METRICS_TABLE_NAME) - y_cable_tbl_keys[asic_id] = y_cable_tbl[asic_id].getKeys() + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + # TODO add definition inside app DB + status_tbl_peer[asic_id] = swsscommon.SubscriberStateTable( + appl_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + fwd_state_command_tbl[asic_id] = swsscommon.SubscriberStateTable( + appl_db[asic_id], "FORWARDING_STATE_COMMAND") + fwd_state_response_tbl[asic_id] = swsscommon.Table( + appl_db[asic_id], "FORWARDING_STATE_RESPONSE") + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + hw_mux_cable_tbl_keys[asic_id] = hw_mux_cable_tbl[asic_id].getKeys() + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + port_table_keys[asic_id] = port_tbl[asic_id].getKeys() sel.addSelectable(status_tbl[asic_id]) + sel.addSelectable(status_tbl_peer[asic_id]) + sel.addSelectable(fwd_state_command_tbl[asic_id]) sel.addSelectable(mux_cable_command_tbl[asic_id]) @@ -2605,46 +3262,56 @@ def task_worker(self): # This check might be redundant, to check, the presence of this Port in keys # in logical_port_list but keep for now for coherency # also skip checking in logical_port_list inside sfp_util - if port not in y_cable_tbl_keys[asic_index]: + if port not in hw_mux_cable_tbl_keys[asic_index]: continue - fvp_dict = dict(fvp) + (status, cable_type) = check_mux_cable_port_type(port, port_tbl, asic_index) + + if status: + + if cable_type == 'active-standby': + fvp_dict = dict(fvp) + + if "state" in fvp_dict: + # got a state change + new_status = fvp_dict["state"] + requested_status = new_status + (status, fvs) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + continue + mux_port_dict = dict(fvs) + old_status = mux_port_dict.get("state", None) + read_side = mux_port_dict.get("read_side", None) + # Now whatever is the state requested, toggle the mux appropriately + helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd trying to transition port {} from {} to {} read side {}".format(port, old_status, new_status, read_side)) + (active_side, read_side) = update_tor_active_side(read_side, new_status, port) + if active_side == -1: + helper_logger.log_warning("ERR: Got a change event for toggle but could not toggle the mux-direction for port {} state from {} to {}, writing unknown".format( + port, old_status, new_status)) + new_status = 'unknown' + + helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd successful to transition port {} from {} to {} and write back to the DB {}".format(port, old_status, new_status, threading.currentThread().getName())) + helper_logger.log_notice("Got a change event for toggle the mux-direction active side for port {} state requested {} from old state {} to new state {} read_side {} thread id {}".format(port, requested_status, old_status, new_status, read_side, threading.currentThread().getName())) + time_end = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") + fvs_metrics = swsscommon.FieldValuePairs([('xcvrd_switch_{}_start'.format(new_status), str(time_start)), + ('xcvrd_switch_{}_end'.format(new_status), str(time_end))]) + mux_metrics_tbl[asic_index].set(port, fvs_metrics) + + fvs_updated = swsscommon.FieldValuePairs([('state', new_status), + ('read_side', str(read_side)), + ('active_side', str(active_side))]) + hw_mux_cable_tbl[asic_index].set(port, fvs_updated) + else: + helper_logger.log_info("Got a change event on port {} of table {} that does not contain state".format( + port, swsscommon.APP_HW_MUX_CABLE_TABLE_NAME)) - if "state" in fvp_dict: - # got a state change - new_status = fvp_dict["state"] - requested_status = new_status - (status, fvs) = y_cable_tbl[asic_index].get(port) - if status is False: - helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( - port, y_cable_tbl[asic_index].getTableName())) - continue - mux_port_dict = dict(fvs) - old_status = mux_port_dict.get("state", None) - read_side = mux_port_dict.get("read_side", None) - # Now whatever is the state requested, toggle the mux appropriately - helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd trying to transition port {} from {} to {} read side {}".format(port, old_status, new_status, read_side)) - (active_side, read_side) = update_tor_active_side(read_side, new_status, port) - if active_side == -1: - helper_logger.log_warning("ERR: Got a change event for toggle but could not toggle the mux-direction for port {} state from {} to {}, writing unknown".format( - port, old_status, new_status)) - new_status = 'unknown' - - helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd successful to transition port {} from {} to {} and write back to the DB {}".format(port, old_status, new_status, threading.currentThread().getName())) - helper_logger.log_notice("Got a change event for toggle the mux-direction active side for port {} state requested {} from old state {} to new state {} read_side {} thread id {}".format(port, requested_status, old_status, new_status, read_side, threading.currentThread().getName())) - time_end = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") - fvs_metrics = swsscommon.FieldValuePairs([('xcvrd_switch_{}_start'.format(new_status), str(time_start)), - ('xcvrd_switch_{}_end'.format(new_status), str(time_end))]) - mux_metrics_tbl[asic_index].set(port, fvs_metrics) - - fvs_updated = swsscommon.FieldValuePairs([('state', new_status), - ('read_side', str(read_side)), - ('active_side', str(active_side))]) - y_cable_tbl[asic_index].set(port, fvs_updated) - else: - helper_logger.log_info("Got a change event on port {} of table {} that does not contain state".format( - port, swsscommon.APP_HW_MUX_CABLE_TABLE_NAME)) + elif cable_type == "active-active": + if fvp: + handle_hw_mux_cable_table_grpc_notification( + fvp, hw_mux_cable_tbl, asic_index, mux_metrics_tbl, False, port) while True: (port_m, op_m, fvp_m) = mux_cable_command_tbl[asic_index].pop() @@ -2654,25 +3321,59 @@ def task_worker(self): if fvp_m: - if port_m not in y_cable_tbl_keys[asic_index]: + if port_m not in hw_mux_cable_tbl_keys[asic_index]: continue fvp_dict = dict(fvp_m) - if "command" in fvp_dict: - # check if xcvrd got a probe command - probe_identifier = fvp_dict["command"] + (status, cable_type) = check_mux_cable_port_type(port_m, port_tbl, asic_index) + + if status: - if probe_identifier == "probe": - (status, fv) = y_cable_tbl[asic_index].get(port_m) - if status is False: - helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( - port_m, y_cable_tbl[asic_index].getTableName())) - continue - mux_port_dict = dict(fv) - read_side = mux_port_dict.get("read_side") - update_appdb_port_mux_cable_response_table(port_m, asic_index, appl_db, int(read_side)) + if cable_type == 'active-standby' and "command" in fvp_dict: + # check if xcvrd got a probe command + probe_identifier = fvp_dict["command"] + + if probe_identifier == "probe": + (status, fv) = hw_mux_cable_tbl[asic_index].get(port_m) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port_m, hw_mux_cable_tbl[asic_index].getTableName())) + continue + mux_port_dict = dict(fv) + read_side = mux_port_dict.get("read_side") + update_appdb_port_mux_cable_response_table(port_m, asic_index, appl_db, int(read_side)) + + while True: + (port_m, op_m, fvp_m) = fwd_state_command_tbl[asic_index].pop() + + if not port_m: + break + + helper_logger.log_debug("Y_CABLE_DEBUG: received a probe for Forwarding state using gRPC port status {} {}".format(port_m, threading.currentThread().getName())) + (status, cable_type) = check_mux_cable_port_type(port_m, port_tbl, asic_index) + + if status is False or cable_type != "active-active": + break + + if fvp_m: + handle_fwd_state_command_grpc_notification( + fvp_m, hw_mux_cable_tbl, fwd_state_response_tbl, asic_index, port_m, appl_db) + + while True: + (port_n, op_n, fvp_n) = status_tbl_peer[asic_index].pop() + if not port_n: + break + + (status, cable_type) = check_mux_cable_port_type(port_n, port_tbl, asic_index) + + if status is False or cable_type != "active-active": + break + + if fvp_n: + handle_hw_mux_cable_table_grpc_notification( + fvp_n, hw_mux_cable_tbl_peer, asic_index, mux_metrics_tbl, True, port_n) def task_cli_worker(self): @@ -2694,6 +3395,8 @@ def task_cli_worker(self): xcvrd_show_event_cmd_tbl, xcvrd_show_event_rsp_tbl , xcvrd_show_event_cmd_sts_tbl, xcvrd_show_event_res_tbl= {}, {}, {}, {} xcvrd_show_fec_cmd_tbl, xcvrd_show_fec_rsp_tbl , xcvrd_show_fec_cmd_sts_tbl, xcvrd_show_fec_res_tbl= {}, {}, {}, {} xcvrd_show_ber_cmd_tbl, xcvrd_show_ber_cmd_arg_tbl, xcvrd_show_ber_rsp_tbl , xcvrd_show_ber_cmd_sts_tbl, xcvrd_show_ber_res_tbl= {}, {}, {}, {}, {} + port_tbl, port_table_keys = {}, {} + status_app_tbl = {} y_cable_tbl, y_cable_tbl_keys = {}, {} @@ -2810,6 +3513,8 @@ def task_cli_worker(self): state_db[asic_id], "XCVRD_GET_BER_RSP") xcvrd_show_ber_res_tbl[asic_id] = swsscommon.Table( state_db[asic_id], "XCVRD_GET_BER_RES") + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + port_table_keys[asic_id] = port_tbl[asic_id].getKeys() status_app_tbl[asic_id] = swsscommon.SubscriberStateTable( appl_db[asic_id], swsscommon.APP_MUX_CABLE_TABLE_NAME) y_cable_tbl[asic_id] = swsscommon.Table( @@ -2888,7 +3593,7 @@ def task_cli_worker(self): break if fvp: - handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) break while True: