Skip to content

Commit

Permalink
Xcvrd should restart if any child thread crashes (#326)
Browse files Browse the repository at this point in the history
* Xcvrd should restart if any child thread crashes

Signed-off-by: Mihir Patel <patelmi@microsoft.com>

* Resolved test_SfpStateUpdateTask_task_run_stop test failure

* Added comment for raise_exception

Signed-off-by: Mihir Patel <patelmi@microsoft.com>

* Added check for avoiding cmis_manager.start() if CMIS thread is supposed to be skipped. Also, moidified join function to handle accordingly. Added logs for showing names of threads spawned

Signed-off-by: Mihir Patel <patelmi@microsoft.com>

Signed-off-by: Mihir Patel <patelmi@microsoft.com>
  • Loading branch information
mihirpat1 authored Jan 19, 2023
1 parent 753b550 commit 2211b7e
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 88 deletions.
173 changes: 140 additions & 33 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,95 @@
global_media_settings = media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS'].pop('1-32')
media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS']['1-5,6,7-20,21-32'] = global_media_settings

class TestXcvrdThreadException(object):

@patch('xcvrd.xcvrd.platform_chassis', MagicMock())
def test_CmisManagerTask_task_run_with_exception(self):
port_mapping = PortMapping()
stop_event = threading.Event()
cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cmis_manager.wait_for_port_config_done = MagicMock(side_effect = NotImplementedError)
exception_received = None
trace = None
try:
cmis_manager.start()
cmis_manager.join()
except Exception as e1:
exception_received = e1
trace = traceback.format_exc()

assert not cmis_manager.is_alive()
assert(type(exception_received) == NotImplementedError)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
assert("wait_for_port_config_done" in str(trace))

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
def test_DomInfoUpdateTask_task_run_with_exception(self):
port_mapping = PortMapping()
stop_event = threading.Event()
dom_info_update = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
exception_received = None
trace = None
try:
dom_info_update.start()
dom_info_update.join()
except Exception as e1:
exception_received = e1
trace = traceback.format_exc()

assert not dom_info_update.is_alive()
assert(type(exception_received) == NotImplementedError)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
assert("subscribe_port_config_change" in str(trace))

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
def test_SfpStateUpdateTask_task_run_with_exception(self):
port_mapping = PortMapping()
retry_eeprom_set = set()
stop_event = threading.Event()
sfp_error_event = threading.Event()
sfp_state_update = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
exception_received = None
trace = None
try:
sfp_state_update.start()
sfp_state_update.join()
except Exception as e1:
exception_received = e1
trace = traceback.format_exc()

assert not sfp_state_update.is_alive()
assert(type(exception_received) == NotImplementedError)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
assert("subscribe_port_config_change" in str(trace))

@patch('xcvrd.xcvrd.SfpStateUpdateTask.is_alive', MagicMock(return_value = False))
@patch('xcvrd.xcvrd.DomInfoUpdateTask.is_alive', MagicMock(return_value = False))
@patch('xcvrd.xcvrd.CmisManagerTask.is_alive', MagicMock(return_value = False))
@patch('xcvrd.xcvrd.CmisManagerTask.join', MagicMock(side_effect = NotImplementedError))
@patch('xcvrd.xcvrd.CmisManagerTask.start', MagicMock())
@patch('xcvrd.xcvrd.DomInfoUpdateTask.start', MagicMock())
@patch('xcvrd.xcvrd.SfpStateUpdateTask.start', MagicMock())
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit', MagicMock())
@patch('os.kill')
@patch('xcvrd.xcvrd.DaemonXcvrd.init')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.join')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.join')
def test_DaemonXcvrd_run_with_exception(self, mock_task_join1, mock_task_join2, mock_init, mock_os_kill):
mock_init.return_value = (PortMapping(), set())
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.stop_event.wait = MagicMock()
xcvrd.run()

assert len(xcvrd.threads) == 3
assert mock_init.call_count == 1
assert mock_task_join1.call_count == 1
assert mock_task_join2.call_count == 1
assert mock_os_kill.call_count == 1

class TestXcvrdScript(object):

@patch('xcvrd.xcvrd._wrapper_get_sfp_type')
Expand Down Expand Up @@ -482,10 +571,10 @@ def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table

@patch('xcvrd.xcvrd.DaemonXcvrd.init')
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.start')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.start')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.join')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.join')
def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init):
mock_init.return_value = (PortMapping(), set())
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
Expand All @@ -501,7 +590,8 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1,
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
def test_CmisManagerTask_handle_port_change_event(self):
port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)

assert not task.isPortConfigDone
port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET)
Expand All @@ -528,7 +618,8 @@ def test_CmisManagerTask_handle_port_change_event(self):
@patch('xcvrd.xcvrd.XcvrTableHelper')
def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cfg_port_tbl = MagicMock()
cfg_port_tbl.get = MagicMock(return_value=(True, (('laser_freq', 193100),)))
mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl)
Expand All @@ -538,7 +629,8 @@ def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
@patch('xcvrd.xcvrd.XcvrTableHelper')
def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper):
port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cfg_port_tbl = MagicMock()
cfg_port_tbl.get = MagicMock(return_value=(True, (('tx_power', -10),)))
mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl)
Expand All @@ -554,11 +646,12 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):
mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object])

port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
task.wait_for_port_config_done = MagicMock()
task.task_run()
task.task_stop()
assert task.task_process is None
stop_event = threading.Event()
cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
cmis_manager.wait_for_port_config_done = MagicMock()
cmis_manager.start()
cmis_manager.join()
assert not cmis_manager.is_alive()

@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
Expand Down Expand Up @@ -657,7 +750,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
mock_chassis.get_sfp = MagicMock(return_value=mock_sfp)

port_mapping = PortMapping()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)

port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET)
task.on_port_update_event(port_change_event)
Expand Down Expand Up @@ -708,7 +802,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
@patch('xcvrd.xcvrd.delete_port_from_status_table_hw')
def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw):
port_mapping = PortMapping()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.on_port_config_change(port_change_event)
Expand All @@ -730,10 +825,11 @@ def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
def test_DomInfoUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
task.task_run()
task.task_stop()
assert not task.task_thread.is_alive()
stop_event = threading.Event()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
task.start()
task.join()
assert not task.is_alive()

@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
@patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status')
Expand All @@ -754,7 +850,8 @@ def test_DomInfoUpdateTask_task_worker(self, mock_post_pm_info, mock_update_stat
mock_sub_table.return_value = mock_selectable

port_mapping = PortMapping()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
stop_event = threading.Event()
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.task_stopping_event.wait = MagicMock(side_effect=[False, True])
mock_detect_error.return_value = True
Expand Down Expand Up @@ -785,10 +882,11 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw
mock_table_helper.get_int_tbl = MagicMock(return_value=mock_table)
mock_table_helper.get_dom_tbl = MagicMock(return_value=mock_table)
mock_table_helper.get_dom_threshold_tbl = MagicMock(return_value=mock_table)
stopping_event = multiprocessing.Event()
stop_event = threading.Event()
sfp_error_event = threading.Event()
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl
task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl
Expand Down Expand Up @@ -821,15 +919,18 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw
assert not task.port_mapping.logical_to_asic
assert mock_update_status_hw.call_count == 1

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
def test_SfpStateUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
sfp_error_event = multiprocessing.Event()
task.task_run(sfp_error_event)
assert wait_until(5, 1, task.task_process.is_alive)
task.task_stop()
assert wait_until(5, 1, lambda: task.task_process.is_alive() is False)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.start()
assert wait_until(5, 1, task.is_alive)
task.raise_exception()
task.join()
assert wait_until(5, 1, lambda: task.is_alive() is False)

@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
@patch('xcvrd.xcvrd.post_port_sfp_info_to_db')
Expand All @@ -840,7 +941,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo

port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=mock_table)
task.xcvr_table_helper.get_dom_tbl = MagicMock(return_value=mock_table)
Expand Down Expand Up @@ -872,7 +975,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo
def test_SfpStateUpdateTask_mapping_event_from_change_event(self):
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
port_dict = {}
assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL
assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL
Expand Down Expand Up @@ -909,10 +1014,10 @@ def test_SfpStateUpdateTask_task_worker(self, mock_post_pm_info, mock_del_status
mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill):
port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
stop_event = multiprocessing.Event()
sfp_error_event = multiprocessing.Event()
mock_change_event.return_value = (True, {0: 0}, {})
mock_mapping_event.return_value = SYSTEM_NOT_READY

Expand Down Expand Up @@ -1035,7 +1140,9 @@ class MockTable:

port_mapping = PortMapping()
retry_eeprom_set = set()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
stop_event = threading.Event()
sfp_error_event = threading.Event()
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl
task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl
Expand Down
Loading

0 comments on commit 2211b7e

Please sign in to comment.