Skip to content

Commit

Permalink
[Xcvrd] Soak duplicate events and process only updated interested eve…
Browse files Browse the repository at this point in the history
…nts (#285)

* Subscribe to CONFIG_DB instead of APPL_DB

* Filter out events

* Fix build

* improve code coverage
  • Loading branch information
prgeor authored and yxieca committed Sep 21, 2022
1 parent 36ba7c0 commit 157f483
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 27 deletions.
24 changes: 21 additions & 3 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,24 @@ def test_is_error_sfp_status(self):
assert not is_error_block_eeprom_reading(int(SFP_STATUS_INSERTED))
assert not is_error_block_eeprom_reading(int(SFP_STATUS_REMOVED))

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_handle_port_update_event(self, mock_select, mock_sub_table):
mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(
side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable
logger = MagicMock()

sel, asic_context = subscribe_port_update_event(DEFAULT_NAMESPACE, logger)
port_mapping = PortMapping()
stop_event = threading.Event()
stop_event.is_set = MagicMock(return_value=False)
handle_port_update_event(sel, asic_context, stop_event,
logger, port_mapping.handle_port_change_event)

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
Expand Down Expand Up @@ -443,7 +461,7 @@ def test_CmisManagerTask_handle_port_change_event(self):
task.on_port_update_event(port_change_event)
assert len(task.port_dict) == 1


@patch('xcvrd.xcvrd.XcvrTableHelper')
def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
port_mapping = PortMapping()
Expand Down Expand Up @@ -474,6 +492,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):

port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
task.wait_for_port_config_done = MagicMock()
task.task_run()
task.task_stop()
assert task.task_process is None
Expand All @@ -482,6 +501,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock())
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
@patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock())
def test_CmisManagerTask_task_worker(self, mock_chassis):
mock_xcvr_api = MagicMock()
mock_xcvr_api.set_datapath_deinit = MagicMock(return_value=True)
Expand Down Expand Up @@ -553,7 +573,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
'DP8State': 'DataPathActivated'
}
])

mock_sfp = MagicMock()
mock_sfp.get_presence = MagicMock(return_value=True)
mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api)
Expand Down Expand Up @@ -584,7 +603,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_DEINIT'

task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert mock_xcvr_api.set_datapath_deinit.call_count == 1
Expand Down
35 changes: 26 additions & 9 deletions sonic-xcvrd/xcvrd/xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,10 +994,7 @@ def on_port_update_event(self, port_change_event):
self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes']
if 'host_tx_ready' in port_change_event.port_dict:
self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready']
if 'admin_status' in port_change_event.port_dict and 'oper_status' in port_change_event.port_dict:
# At times 'admin_status' is NOT the same in the PORT_TABLE of APPL_DB and STATE_DB
# We dont have better way to check if 'admin_status' is from APPL_DB or STATE_DB so this
# check is put temporarily to listen only to APPL_DB's admin_status and ignore that of STATE_DB
if 'admin_status' in port_change_event.port_dict:
self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status']
if 'laser_freq' in port_change_event.port_dict:
self.port_dict[lport]['laser_freq'] = int(port_change_event.port_dict['laser_freq'])
Expand Down Expand Up @@ -1277,13 +1274,36 @@ def configure_laser_frequency(self, api, lport, freq):
self.log_error("{} Tuning in progress, channel selection may fail!".format(lport))
return api.set_laser_freq(freq)

def wait_for_port_config_done(self, namespace):
# Connect to APPL_DB and subscribe to PORT table notifications
appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace)

sel = swsscommon.Select()
port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME)
sel.addSelectable(port_tbl)

# Make sure this daemon started after all port configured
while not self.task_stopping_event.is_set():
(state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS)
if state == swsscommon.Select.TIMEOUT:
continue
if state != swsscommon.Select.OBJECT:
self.log_warning("sel.select() did not return swsscommon.Select.OBJECT")
continue

(key, op, fvp) = port_tbl.pop()
if key in ["PortConfigDone", "PortInitDone"]:
break

def task_worker(self):
self.xcvr_table_helper = XcvrTableHelper(self.namespaces)

self.log_notice("Starting...")
self.log_notice("Waiting for PortConfigDone...")
for namespace in self.namespaces:
self.wait_for_port_config_done(namespace)

# APPL_DB for CONFIG updates, and STATE_DB for insertion/removal
sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces)
sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, helper_logger)
while not self.task_stopping_event.is_set():
# Handle port change event from main thread
port_mapping.handle_port_update_event(sel,
Expand All @@ -1292,9 +1312,6 @@ def task_worker(self):
helper_logger,
self.on_port_update_event)

if not self.isPortConfigDone:
continue

for lport, info in self.port_dict.items():
if self.task_stopping_event.is_set():
break
Expand Down
75 changes: 60 additions & 15 deletions sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class PortChangeEvent:
PORT_REMOVE = 1
PORT_SET = 2
PORT_DEL = 3
PORT_EVENT = {}

def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None):
# Logical port name, e.g. Ethernet0
Expand Down Expand Up @@ -105,11 +106,16 @@ def subscribe_port_config_change(namespaces):
sel.addSelectable(port_tbl)
return sel, asic_context

def subscribe_port_update_event(namespaces):
def subscribe_port_update_event(namespaces, logger):
"""
Subscribe to a particular DB's table and listen to only interested fields
Format :
{ <DB name> : <Table name> , <field1>, <field2>, .. } where only field<n> update will be received
"""
port_tbl_map = [
{'APPL_DB': swsscommon.APP_PORT_TABLE_NAME},
{'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME},
{'STATE_DB': 'TRANSCEIVER_INFO'},
{'STATE_DB': 'PORT_TABLE'},
{'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']},
]

sel = swsscommon.Select()
Expand All @@ -119,13 +125,18 @@ def subscribe_port_update_event(namespaces):
db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace)
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0])
port_tbl.db_name = list(d.keys())[0]
port_tbl.table_name = list(d.values())[0]
port_tbl.filter = d['FILTER'] if 'FILTER' in d else None
asic_context[port_tbl] = asic_id
sel.addSelectable(port_tbl)
logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format(
port_tbl, list(d.values())[0], namespace))
return sel, asic_context

def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler):
"""
Select PORT update events, notify the observers upon a port update in APPL_DB/CONFIG_DB
Select PORT update events, notify the observers upon a port update in CONFIG_DB
or a XCVR insertion/removal in STATE_DB
"""
if not stop_event.is_set():
Expand All @@ -135,6 +146,8 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
if state != swsscommon.Select.OBJECT:
logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT')
return

port_event_cache = {}
for port_tbl in asic_context.keys():
while True:
(key, op, fvp) = port_tbl.pop()
Expand All @@ -143,24 +156,56 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
if not validate_port(key):
continue
fvp = dict(fvp) if fvp is not None else {}
logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format(
key, op, port_tbl.db_name, port_tbl.table_name, fvp))

if 'index' not in fvp:
fvp['index'] = '-1'
port_index = int(fvp['index'])
port_change_event = None
if op == swsscommon.SET_COMMAND:
port_change_event = PortChangeEvent(key,
fvp['index'] = '-1'
fvp['key'] = key
fvp['asic_id'] = asic_context[port_tbl]
fvp['op'] = op
fvp['FILTER'] = port_tbl.filter
# Soak duplicate events and consider only the last event
port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp

# Now apply filter over soaked events
for key, fvp in port_event_cache.items():
port_index = int(fvp['index'])
port_change_event = None
diff = {}
filter = fvp['FILTER']
del fvp['FILTER']
if key in PortChangeEvent.PORT_EVENT:
diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items()))
# Ignore duplicate events
if not diff:
PortChangeEvent.PORT_EVENT[key] = fvp
continue
# Ensure only interested field update gets through for processing
if filter is not None:
if not (set(filter) & set(diff.keys())):
PortChangeEvent.PORT_EVENT[key] = fvp
continue
PortChangeEvent.PORT_EVENT[key] = fvp

if fvp['op'] == swsscommon.SET_COMMAND:
port_change_event = PortChangeEvent(fvp['key'],
port_index,
asic_context[port_tbl],
fvp['asic_id'],
PortChangeEvent.PORT_SET,
fvp)
elif op == swsscommon.DEL_COMMAND:
port_change_event = PortChangeEvent(key,
elif fvp['op'] == swsscommon.DEL_COMMAND:
port_change_event = PortChangeEvent(fvp['key'],
port_index,
asic_context[port_tbl],
fvp['asic_id'],
PortChangeEvent.PORT_DEL,
fvp)
if port_change_event is not None:
port_change_event_handler(port_change_event)
# This is the final event considered for processing
logger.log_warning("*** {} handle_port_update_event() fvp {}".format(
key, fvp))
if port_change_event is not None:
port_change_event_handler(port_change_event)


def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler):
"""Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers
Expand Down

0 comments on commit 157f483

Please sign in to comment.