Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Xcvrd] Soak duplicate events and process only updated interested events #285

Merged
merged 4 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,24 @@ def test_is_error_sfp_status(self):
assert not is_error_block_eeprom_reading(int(SFP_STATUS_INSERTED))
assert not is_error_block_eeprom_reading(int(SFP_STATUS_REMOVED))

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_handle_port_update_event(self, mock_select, mock_sub_table):
mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(
side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable
logger = MagicMock()

sel, asic_context = subscribe_port_update_event(DEFAULT_NAMESPACE, logger)
port_mapping = PortMapping()
stop_event = threading.Event()
stop_event.is_set = MagicMock(return_value=False)
handle_port_update_event(sel, asic_context, stop_event,
logger, port_mapping.handle_port_change_event)

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
Expand Down Expand Up @@ -443,7 +461,7 @@ def test_CmisManagerTask_handle_port_change_event(self):
task.on_port_update_event(port_change_event)
assert len(task.port_dict) == 1


@patch('xcvrd.xcvrd.XcvrTableHelper')
def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
port_mapping = PortMapping()
Expand Down Expand Up @@ -474,6 +492,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):

port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
task.wait_for_port_config_done = MagicMock()
task.task_run()
task.task_stop()
assert task.task_process is None
Expand All @@ -482,6 +501,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock())
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
@patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock())
def test_CmisManagerTask_task_worker(self, mock_chassis):
mock_xcvr_api = MagicMock()
mock_xcvr_api.set_datapath_deinit = MagicMock(return_value=True)
Expand Down Expand Up @@ -553,7 +573,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
'DP8State': 'DataPathActivated'
}
])

mock_sfp = MagicMock()
mock_sfp.get_presence = MagicMock(return_value=True)
mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api)
Expand Down Expand Up @@ -584,7 +603,6 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_DEINIT'

task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True])
task.task_worker()
assert mock_xcvr_api.set_datapath_deinit.call_count == 1
Expand Down
35 changes: 26 additions & 9 deletions sonic-xcvrd/xcvrd/xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,10 +994,7 @@ def on_port_update_event(self, port_change_event):
self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes']
if 'host_tx_ready' in port_change_event.port_dict:
self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready']
if 'admin_status' in port_change_event.port_dict and 'oper_status' in port_change_event.port_dict:
# At times 'admin_status' is NOT the same in the PORT_TABLE of APPL_DB and STATE_DB
# We dont have better way to check if 'admin_status' is from APPL_DB or STATE_DB so this
# check is put temporarily to listen only to APPL_DB's admin_status and ignore that of STATE_DB
if 'admin_status' in port_change_event.port_dict:
self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status']
if 'laser_freq' in port_change_event.port_dict:
self.port_dict[lport]['laser_freq'] = int(port_change_event.port_dict['laser_freq'])
Expand Down Expand Up @@ -1277,13 +1274,36 @@ def configure_laser_frequency(self, api, lport, freq):
self.log_error("{} Tuning in progress, channel selection may fail!".format(lport))
return api.set_laser_freq(freq)

def wait_for_port_config_done(self, namespace):
# Connect to APPL_DB and subscribe to PORT table notifications
appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace)

sel = swsscommon.Select()
port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME)
sel.addSelectable(port_tbl)

# Make sure this daemon started after all port configured
while not self.task_stopping_event.is_set():
(state, c) = sel.select(port_mapping.SELECT_TIMEOUT_MSECS)
if state == swsscommon.Select.TIMEOUT:
continue
if state != swsscommon.Select.OBJECT:
self.log_warning("sel.select() did not return swsscommon.Select.OBJECT")
continue

(key, op, fvp) = port_tbl.pop()
if key in ["PortConfigDone", "PortInitDone"]:
break

def task_worker(self):
self.xcvr_table_helper = XcvrTableHelper(self.namespaces)

self.log_notice("Starting...")
self.log_notice("Waiting for PortConfigDone...")
for namespace in self.namespaces:
self.wait_for_port_config_done(namespace)

# APPL_DB for CONFIG updates, and STATE_DB for insertion/removal
sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces)
sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces, helper_logger)
while not self.task_stopping_event.is_set():
# Handle port change event from main thread
port_mapping.handle_port_update_event(sel,
Expand All @@ -1292,9 +1312,6 @@ def task_worker(self):
helper_logger,
self.on_port_update_event)

if not self.isPortConfigDone:
continue

for lport, info in self.port_dict.items():
if self.task_stopping_event.is_set():
break
Expand Down
75 changes: 60 additions & 15 deletions sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class PortChangeEvent:
PORT_REMOVE = 1
PORT_SET = 2
PORT_DEL = 3
PORT_EVENT = {}

def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None):
# Logical port name, e.g. Ethernet0
Expand Down Expand Up @@ -105,11 +106,16 @@ def subscribe_port_config_change(namespaces):
sel.addSelectable(port_tbl)
return sel, asic_context

def subscribe_port_update_event(namespaces):
def subscribe_port_update_event(namespaces, logger):
"""
Subscribe to a particular DB's table and listen to only interested fields
Format :
{ <DB name> : <Table name> , <field1>, <field2>, .. } where only field<n> update will be received
"""
port_tbl_map = [
{'APPL_DB': swsscommon.APP_PORT_TABLE_NAME},
{'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME},
{'STATE_DB': 'TRANSCEIVER_INFO'},
{'STATE_DB': 'PORT_TABLE'},
{'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']},
]

sel = swsscommon.Select()
Expand All @@ -119,13 +125,18 @@ def subscribe_port_update_event(namespaces):
db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace)
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0])
port_tbl.db_name = list(d.keys())[0]
port_tbl.table_name = list(d.values())[0]
port_tbl.filter = d['FILTER'] if 'FILTER' in d else None
asic_context[port_tbl] = asic_id
sel.addSelectable(port_tbl)
logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format(
port_tbl, list(d.values())[0], namespace))
return sel, asic_context

def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler):
"""
Select PORT update events, notify the observers upon a port update in APPL_DB/CONFIG_DB
Select PORT update events, notify the observers upon a port update in CONFIG_DB
or a XCVR insertion/removal in STATE_DB
"""
if not stop_event.is_set():
Expand All @@ -135,6 +146,8 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
if state != swsscommon.Select.OBJECT:
logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT')
return

port_event_cache = {}
for port_tbl in asic_context.keys():
while True:
(key, op, fvp) = port_tbl.pop()
Expand All @@ -143,24 +156,56 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
if not validate_port(key):
continue
fvp = dict(fvp) if fvp is not None else {}
logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format(
key, op, port_tbl.db_name, port_tbl.table_name, fvp))

if 'index' not in fvp:
fvp['index'] = '-1'
port_index = int(fvp['index'])
port_change_event = None
if op == swsscommon.SET_COMMAND:
port_change_event = PortChangeEvent(key,
fvp['index'] = '-1'
fvp['key'] = key
fvp['asic_id'] = asic_context[port_tbl]
fvp['op'] = op
fvp['FILTER'] = port_tbl.filter
# Soak duplicate events and consider only the last event
port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp

# Now apply filter over soaked events
for key, fvp in port_event_cache.items():
port_index = int(fvp['index'])
port_change_event = None
diff = {}
filter = fvp['FILTER']
del fvp['FILTER']
if key in PortChangeEvent.PORT_EVENT:
diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items()))
# Ignore duplicate events
if not diff:
PortChangeEvent.PORT_EVENT[key] = fvp
continue
# Ensure only interested field update gets through for processing
if filter is not None:
if not (set(filter) & set(diff.keys())):
PortChangeEvent.PORT_EVENT[key] = fvp
continue
PortChangeEvent.PORT_EVENT[key] = fvp

if fvp['op'] == swsscommon.SET_COMMAND:
port_change_event = PortChangeEvent(fvp['key'],
port_index,
asic_context[port_tbl],
fvp['asic_id'],
PortChangeEvent.PORT_SET,
fvp)
elif op == swsscommon.DEL_COMMAND:
port_change_event = PortChangeEvent(key,
elif fvp['op'] == swsscommon.DEL_COMMAND:
port_change_event = PortChangeEvent(fvp['key'],
port_index,
asic_context[port_tbl],
fvp['asic_id'],
PortChangeEvent.PORT_DEL,
fvp)
if port_change_event is not None:
port_change_event_handler(port_change_event)
# This is the final event considered for processing
logger.log_warning("*** {} handle_port_update_event() fvp {}".format(
key, fvp))
if port_change_event is not None:
port_change_event_handler(port_change_event)


def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler):
"""Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers
Expand Down