From 157f4830b4271e743c5c6e455fe6b9c6a0605582 Mon Sep 17 00:00:00 2001 From: Prince George <45705344+prgeor@users.noreply.github.com> Date: Thu, 15 Sep 2022 18:03:04 -0700 Subject: [PATCH] [Xcvrd] Soak duplicate events and process only updated interested events (#285) * Subscribe to CONFIG_DB instead of APPL_DB * Filter out events * Fix build * improve code coverage --- sonic-xcvrd/tests/test_xcvrd.py | 24 +++++- sonic-xcvrd/xcvrd/xcvrd.py | 35 ++++++--- .../xcvrd/xcvrd_utilities/port_mapping.py | 75 +++++++++++++++---- 3 files changed, 107 insertions(+), 27 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 298b26f3e..4eedfead5 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -331,6 +331,24 @@ def test_is_error_sfp_status(self): assert not is_error_block_eeprom_reading(int(SFP_STATUS_INSERTED)) assert not is_error_block_eeprom_reading(int(SFP_STATUS_REMOVED)) + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_handle_port_update_event(self, mock_select, mock_sub_table): + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + logger = MagicMock() + + sel, asic_context = subscribe_port_update_event(DEFAULT_NAMESPACE, logger) + port_mapping = PortMapping() + stop_event = threading.Event() + stop_event.is_set = MagicMock(return_value=False) + handle_port_update_event(sel, asic_context, stop_event, + logger, port_mapping.handle_port_change_event) + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) @patch('swsscommon.swsscommon.SubscriberStateTable') @patch('swsscommon.swsscommon.Select.select') @@ -443,7 +461,7 @@ def test_CmisManagerTask_handle_port_change_event(self): task.on_port_update_event(port_change_event) assert len(task.port_dict) == 1 - + @patch('xcvrd.xcvrd.XcvrTableHelper') def test_CmisManagerTask_get_configured_freq(self, mock_table_helper): port_mapping = PortMapping() @@ -474,6 +492,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): port_mapping = PortMapping() task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) + task.wait_for_port_config_done = MagicMock() task.task_run() task.task_stop() assert task.task_process is None @@ -482,6 +501,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) + @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) def test_CmisManagerTask_task_worker(self, mock_chassis): mock_xcvr_api = MagicMock() mock_xcvr_api.set_datapath_deinit = MagicMock(return_value=True) @@ -553,7 +573,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): 'DP8State': 'DataPathActivated' } ]) - mock_sfp = MagicMock() mock_sfp.get_presence = MagicMock(return_value=True) mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api) @@ -584,7 +603,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_DEINIT' - task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() assert mock_xcvr_api.set_datapath_deinit.call_count == 1 diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index c73ebadb9..a5c53b0eb 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -994,10 +994,7 @@ def on_port_update_event(self, port_change_event): self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes'] if 'host_tx_ready' in port_change_event.port_dict: self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready'] - if 'admin_status' in port_change_event.port_dict and 'oper_status' in port_change_event.port_dict: - # At times 'admin_status' is NOT the same in the PORT_TABLE of APPL_DB and STATE_DB - # We dont have better way to check if 'admin_status' is from APPL_DB or STATE_DB so this - # check is put temporarily to listen only to APPL_DB's admin_status and ignore that of STATE_DB + if 'admin_status' in port_change_event.port_dict: self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status'] if 'laser_freq' in port_change_event.port_dict: self.port_dict[lport]['laser_freq'] = int(port_change_event.port_dict['laser_freq']) @@ -1277,13 +1274,36 @@ def configure_laser_frequency(self, api, lport, freq): self.log_error("{} Tuning in progress, channel selection may fail!".format(lport)) return api.set_laser_freq(freq) + def wait_for_port_config_done(self, namespace): + # Connect to APPL_DB and subscribe to PORT table notifications + appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) + + sel = swsscommon.Select() + port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + sel.addSelectable(port_tbl) + + # Make sure this daemon started after all port configured + while not self.task_stopping_event.is_set(): + (state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + continue + if state != swsscommon.Select.OBJECT: + self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") + continue + + (key, op, fvp) = port_tbl.pop() + if key in ["PortConfigDone", "PortInitDone"]: + break + def task_worker(self): self.xcvr_table_helper = XcvrTableHelper(self.namespaces) - self.log_notice("Starting...") + self.log_notice("Waiting for PortConfigDone...") + for namespace in self.namespaces: + self.wait_for_port_config_done(namespace) # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal - sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces) + sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, helper_logger) while not self.task_stopping_event.is_set(): # Handle port change event from main thread port_mapping.handle_port_update_event(sel, @@ -1292,9 +1312,6 @@ def task_worker(self): helper_logger, self.on_port_update_event) - if not self.isPortConfigDone: - continue - for lport, info in self.port_dict.items(): if self.task_stopping_event.is_set(): break diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index 3c24415a4..fe8f04cda 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -11,6 +11,7 @@ class PortChangeEvent: PORT_REMOVE = 1 PORT_SET = 2 PORT_DEL = 3 + PORT_EVENT = {} def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): # Logical port name, e.g. Ethernet0 @@ -105,11 +106,16 @@ def subscribe_port_config_change(namespaces): sel.addSelectable(port_tbl) return sel, asic_context -def subscribe_port_update_event(namespaces): +def subscribe_port_update_event(namespaces, logger): + """ + Subscribe to a particular DB's table and listen to only interested fields + Format : + { : , , , .. } where only field update will be received + """ port_tbl_map = [ - {'APPL_DB': swsscommon.APP_PORT_TABLE_NAME}, + {'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME}, {'STATE_DB': 'TRANSCEIVER_INFO'}, - {'STATE_DB': 'PORT_TABLE'}, + {'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']}, ] sel = swsscommon.Select() @@ -119,13 +125,18 @@ def subscribe_port_update_event(namespaces): db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace) asic_id = multi_asic.get_asic_index_from_namespace(namespace) port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0]) + port_tbl.db_name = list(d.keys())[0] + port_tbl.table_name = list(d.values())[0] + port_tbl.filter = d['FILTER'] if 'FILTER' in d else None asic_context[port_tbl] = asic_id sel.addSelectable(port_tbl) + logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format( + port_tbl, list(d.values())[0], namespace)) return sel, asic_context def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler): """ - Select PORT update events, notify the observers upon a port update in APPL_DB/CONFIG_DB + Select PORT update events, notify the observers upon a port update in CONFIG_DB or a XCVR insertion/removal in STATE_DB """ if not stop_event.is_set(): @@ -135,6 +146,8 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_ if state != swsscommon.Select.OBJECT: logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') return + + port_event_cache = {} for port_tbl in asic_context.keys(): while True: (key, op, fvp) = port_tbl.pop() @@ -143,24 +156,56 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_ if not validate_port(key): continue fvp = dict(fvp) if fvp is not None else {} + logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format( + key, op, port_tbl.db_name, port_tbl.table_name, fvp)) + if 'index' not in fvp: - fvp['index'] = '-1' - port_index = int(fvp['index']) - port_change_event = None - if op == swsscommon.SET_COMMAND: - port_change_event = PortChangeEvent(key, + fvp['index'] = '-1' + fvp['key'] = key + fvp['asic_id'] = asic_context[port_tbl] + fvp['op'] = op + fvp['FILTER'] = port_tbl.filter + # Soak duplicate events and consider only the last event + port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp + + # Now apply filter over soaked events + for key, fvp in port_event_cache.items(): + port_index = int(fvp['index']) + port_change_event = None + diff = {} + filter = fvp['FILTER'] + del fvp['FILTER'] + if key in PortChangeEvent.PORT_EVENT: + diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items())) + # Ignore duplicate events + if not diff: + PortChangeEvent.PORT_EVENT[key] = fvp + continue + # Ensure only interested field update gets through for processing + if filter is not None: + if not (set(filter) & set(diff.keys())): + PortChangeEvent.PORT_EVENT[key] = fvp + continue + PortChangeEvent.PORT_EVENT[key] = fvp + + if fvp['op'] == swsscommon.SET_COMMAND: + port_change_event = PortChangeEvent(fvp['key'], port_index, - asic_context[port_tbl], + fvp['asic_id'], PortChangeEvent.PORT_SET, fvp) - elif op == swsscommon.DEL_COMMAND: - port_change_event = PortChangeEvent(key, + elif fvp['op'] == swsscommon.DEL_COMMAND: + port_change_event = PortChangeEvent(fvp['key'], port_index, - asic_context[port_tbl], + fvp['asic_id'], PortChangeEvent.PORT_DEL, fvp) - if port_change_event is not None: - port_change_event_handler(port_change_event) + # This is the final event considered for processing + logger.log_warning("*** {} handle_port_update_event() fvp {}".format( + key, fvp)) + if port_change_event is not None: + port_change_event_handler(port_change_event) + def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers