Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[action] [PR:485] [CMIS] Return the CDB status value for the caller to check the status… (#485) #496

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ def get_module_fw_mgmt_feature(self, verbose = False):
"""
txt = ''
if self.cdb is None:
return {'status': False, 'info': "CDB Not supported", 'result': None}
return {'status': False, 'info': "CDB Not supported", 'feature': None}

# get fw upgrade features (CMD 0041h)
starttime = time.time()
Expand All @@ -1279,8 +1279,11 @@ def get_module_fw_mgmt_feature(self, verbose = False):
writelength = (writelength_raw + 1) * 8
txt += 'Auto page support: %s\n' %autopaging_flag
txt += 'Max write length: %d\n' %writelength
rpllen, rpl_chkcode, rpl = self.cdb.get_fw_management_features()
if self.cdb.cdb_chkcode(rpl) == rpl_chkcode:
result = self.cdb.get_fw_management_features()
status = result['status']
_, rpl_chkcode, rpl = result['rpl']

if status == 1 and self.cdb.cdb_chkcode(rpl) == rpl_chkcode:
startLPLsize = rpl[2]
txt += 'Start payload size %d\n' % startLPLsize
maxblocksize = (rpl[4] + 1) * 8
Expand All @@ -1303,8 +1306,10 @@ def get_module_fw_mgmt_feature(self, verbose = False):
txt += 'Read to LPL/EPL {:#x}\n'.format(rpl[6])

else:
txt += 'Reply payload check code error\n'
return {'status': False, 'info': txt, 'result': None}
txt += 'Status or reply payload check code error\n'
logger.error(txt)
logger.error('Fail to get fw mgmt feature, cdb status: {:#x}, cdb_chkcode: {:#x}, rpl_chkcode: {:#x}\n'.format(status, self.cdb.cdb_chkcode(rpl), rpl_chkcode))
return {'status': False, 'info': txt, 'feature': None}
elapsedtime = time.time()-starttime
logger.info('Get module FW upgrade features time: %.2f s\n' %elapsedtime)
logger.info(txt)
Expand All @@ -1324,20 +1329,25 @@ def get_module_fw_info(self):
return {'status': False, 'info': "CDB Not supported", 'result': None}

# get fw info (CMD 0100h)
rpllen, rpl_chkcode, rpl = self.cdb.get_fw_info()
result = self.cdb.get_fw_info()
status = result['status']
rpllen, rpl_chkcode, rpl = result['rpl']

# Interface NACK or timeout
if (rpllen is None) or (rpl_chkcode is None):
return {'status': False, 'info': "Interface fail", 'result': 0} # Return result 0 for distinguishing CDB is maybe in busy or failure.

# password issue
if self.cdb.cdb_chkcode(rpl) != rpl_chkcode:
if status == 0x46:
string = 'Get module FW info: Need to enter password\n'
logger.info(string)
# Reset password for module using CMIS 4.0
self.cdb.module_enter_password(0)
rpllen, rpl_chkcode, rpl = self.cdb.get_fw_info()
result = self.cdb.get_fw_info()
status = result['status']
rpllen, rpl_chkcode, rpl = result['rpl']

if self.cdb.cdb_chkcode(rpl) == rpl_chkcode:
if status == 1 and self.cdb.cdb_chkcode(rpl) == rpl_chkcode:
# Regiter 9Fh:136
fwStatus = rpl[0]
ImageARunning = (fwStatus & 0x01) # bit 0 - image A is running
Expand Down Expand Up @@ -1630,7 +1640,7 @@ def module_fw_upgrade(self, imagepath):
return result['status'], result['info']
result = self.get_module_fw_mgmt_feature()
try:
startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = result['result']
startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength = result['feature']
except (ValueError, TypeError):
return result['status'], result['info']
download_status, txt = self.module_fw_download(startLPLsize, maxblocksize, lplonly_flag, autopaging_flag, writelength, imagepath)
Expand Down
25 changes: 16 additions & 9 deletions sonic_platform_base/sonic_xcvr/api/public/cmisCDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ def query_cdb_status(self):
'''
This QUERY Status command may be used to retrieve the password acceptance
status and to perform a test of the CDB interface.
It returns the reply message of this CDB command 0000h.
It returns the status and reply message of this CDB command 0000h.
'''
cmd = bytearray(b'\x00\x00\x00\x00\x02\x00\x00\x00\x00\x10')
cmd[133-INIT_OFFSET] = self.cdb_chkcode(cmd)
self.write_cdb(cmd)
status = self.cdb1_chkstatus()
if (status != 0x1):
if status != 0x1:
if status > 127:
txt = 'Query CDB status: Busy'
else:
Expand All @@ -170,7 +170,8 @@ def query_cdb_status(self):
else:
txt = 'Query CDB status: Success'
logger.info(txt)
return self.read_cdb()
rpl = self.read_cdb()
return {'status': status, 'rpl': rpl}

# Enter password
def module_enter_password(self, psw = 0x00001011):
Expand Down Expand Up @@ -199,7 +200,7 @@ def module_enter_password(self, psw = 0x00001011):
def get_module_feature(self):
'''
This command is used to query which CDB commands are supported.
It returns the reply message of this CDB command 0040h.
It returns the status and reply message of this CDB command 0040h.
'''
cmd = bytearray(b'\x00\x40\x00\x00\x00\x00\x00\x00')
cmd[133-INIT_OFFSET] = self.cdb_chkcode(cmd)
Expand All @@ -214,13 +215,15 @@ def get_module_feature(self):
else:
txt = 'Get module feature status: Success'
logger.info(txt)
return self.read_cdb()

rpl = self.read_cdb()
return {'status': status, 'rpl': rpl}

# Firmware Update Features Supported
def get_fw_management_features(self):
'''
This command is used to query supported firmware update features
It returns the reply message of this CDB command 0041h.
It returns the status and reply message of this CDB command 0041h.
'''
cmd = bytearray(b'\x00\x41\x00\x00\x00\x00\x00\x00')
cmd[133-INIT_OFFSET] = self.cdb_chkcode(cmd)
Expand All @@ -235,14 +238,16 @@ def get_fw_management_features(self):
else:
txt = 'Get firmware management feature status: Success'
logger.info(txt)
return self.read_cdb()

rpl = self.read_cdb()
return {'status': status, 'rpl': rpl}

# Get FW info
def get_fw_info(self):
'''
This command returns the firmware versions and firmware default running
images that reside in the module
It returns the reply message of this CDB command 0100h.
It returns the status and reply message of this CDB command 0100h.
'''
cmd = bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00')
cmd[133-INIT_OFFSET] = self.cdb_chkcode(cmd)
Expand All @@ -257,7 +262,9 @@ def get_fw_info(self):
else:
txt = 'Get firmware info status: Success'
logger.info(txt)
return self.read_cdb()

rpl = self.read_cdb()
return {'status': status, 'rpl': rpl}

# Start FW download
def start_fw_download(self, startLPLsize, header, imagesize):
Expand Down
40 changes: 28 additions & 12 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,14 +1171,15 @@ def test_get_module_level_flag(self, mock_response, expected):
assert result == expected

@pytest.mark.parametrize("mock_response, expected", [
((128, 1, [0] * 128), {'status': True, 'info': "", 'result': 0}),
((None, 1, [0] * 128), {'status': False, 'info': "", 'result': 0}),
((128, None, [0] * 128), {'status': False, 'info': "", 'result': 0}),
((128, 0, [0] * 128), {'status': False, 'info': "", 'result': None}),
((128, 1, [67, 3, 2, 2, 3, 183] + [0] * 104), {'status': True, 'info': "", 'result': None}),
((128, 1, [52, 3, 2, 2, 3, 183] + [0] * 104), {'status': True, 'info': "", 'result': None}),
((110, 1, [3, 3, 2, 2, 3, 183] + [0] * 104), {'status': True, 'info': "", 'result': None}),
((110, 1, [48, 3, 2, 2, 3, 183] + [0] * 104), {'status': True, 'info': "", 'result': None}),
({'status':1, 'rpl':(128, 1, [0] * 128)}, {'status': True, 'info': "", 'result': 0}),
({'status':1, 'rpl':(None, 1, [0] * 128)}, {'status': False, 'info': "", 'result': 0}),
({'status':1, 'rpl':(128, None, [0] * 128)}, {'status': False, 'info': "", 'result': 0}),
({'status':1, 'rpl':(128, 0, [0] * 128)}, {'status': False, 'info': "", 'result': None}),
({'status':1, 'rpl':(128, 1, [67, 3, 2, 2, 3, 183] + [0] * 104)}, {'status': True, 'info': "", 'result': None}),
({'status':1, 'rpl':(128, 1, [52, 3, 2, 2, 3, 183] + [0] * 104)}, {'status': True, 'info': "", 'result': None}),
({'status':1, 'rpl':(110, 1, [3, 3, 2, 2, 3, 183] + [0] * 104)}, {'status': True, 'info': "", 'result': None}),
({'status':1, 'rpl':(110, 1, [48, 3, 2, 2, 3, 183] + [0] * 104)}, {'status': True, 'info': "", 'result': None}),
({'status':0x46, 'rpl':(128, 0, [0] * 128)}, {'status': False, 'info': "", 'result': None}),
])
def test_get_module_fw_info(self, mock_response, expected):
self.api.cdb = MagicMock()
Expand All @@ -1195,6 +1196,21 @@ def test_get_module_fw_info(self, mock_response, expected):
assert result['result'] == expected['result']
assert result['status'] == expected['status']

@pytest.mark.parametrize("mock_response, expected", [
({'status':0, 'rpl':(18, 0, [0] * 18)}, {'status': False, 'info': "", 'feature': None}),
({'status':1, 'rpl':(18, 1, [0] * 18)}, {'status': True, 'info': "", 'feature': (0, 8, False, True, 16)})
])
def test_get_module_fw_mgmt_feature(self, mock_response, expected):
self.api.cdb = MagicMock()
self.api.cdb.cdb_chkcode = MagicMock()
self.api.cdb.cdb_chkcode.return_value = 1
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.side_effect = [1, 1]
self.api.cdb.get_fw_management_features = MagicMock()
self.api.cdb.get_fw_management_features.return_value = mock_response
result = self.api.get_module_fw_mgmt_feature()
assert result['feature'] == expected['feature']

@pytest.mark.parametrize("input_param, mock_response, expected", [
(1, 1, (True, 'Module FW run: Success\n')),
(1, 64, (False, 'Module FW run: Fail\nFW_run_status 64\n')),
Expand All @@ -1220,22 +1236,22 @@ def test_module_fw_commit(self, mock_response, expected):
@pytest.mark.parametrize("input_param, mock_response, expected", [
(
'abc',
[{'status': True, 'info': '', 'result': ('a', 1, 1, 0, 'b', 0, 0, 0, 'a', 'b')}, {'status': True, 'info': '', 'result': (112, 2048, True, True, 2048)}, (True, ''), (True, '')],
[{'status': True, 'info': '', 'result': ('a', 1, 1, 0, 'b', 0, 0, 0, 'a', 'b')}, {'status': True, 'info': '', 'feature': (112, 2048, True, True, 2048)}, (True, ''), (True, '')],
(True, '')
),
(
'abc',
[{'status': False, 'info': '', 'result': None}, {'status': True, 'info': '', 'result': (112, 2048, True, True, 2048)}, (True, ''), (True, '')],
[{'status': False, 'info': '', 'result': None}, {'status': True, 'info': '', 'feature': (112, 2048, True, True, 2048)}, (True, ''), (True, '')],
(False, '')
),
(
'abc',
[{'status': True, 'info': '', 'result': ('a', 1, 1, 0, 'b', 0, 0, 0, 'a', 'b')}, {'status': False, 'info': '', 'result': None}, (True, ''), (True, '')],
[{'status': True, 'info': '', 'result': ('a', 1, 1, 0, 'b', 0, 0, 0, 'a', 'b')}, {'status': False, 'info': '', 'feature': None}, (True, ''), (True, '')],
(False, '')
),
(
'abc',
[{'status': True, 'info': '', 'result': ('a', 1, 1, 0, 'b', 0, 0, 0, 'a', 'b')}, {'status': True, 'info': '', 'result': (112, 2048, True, True, 2048)}, (False, ''), (True, '')],
[{'status': True, 'info': '', 'result': ('a', 1, 1, 0, 'b', 0, 0, 0, 'a', 'b')}, {'status': True, 'info': '', 'feature': (112, 2048, True, True, 2048)}, (False, ''), (True, '')],
(False, '')
),
])
Expand Down
8 changes: 4 additions & 4 deletions tests/sonic_xcvr/test_cmisCDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_query_cdb_status(self, mock_response, expected):
self.api.cdb1_chkstatus.return_value = mock_response[0]
self.api.read_cdb = MagicMock()
self.api.read_cdb.return_value = mock_response[1]
result = self.api.query_cdb_status()
result = self.api.query_cdb_status()['rpl']
assert result == expected

@pytest.mark.parametrize("mock_response, expected", [
Expand All @@ -102,7 +102,7 @@ def test_get_module_feature(self, mock_response, expected):
self.api.cdb1_chkstatus.return_value = mock_response[0]
self.api.read_cdb = MagicMock()
self.api.read_cdb.return_value = mock_response[1]
result = self.api.get_module_feature()
result = self.api.get_module_feature()['rpl']
assert result == expected

@pytest.mark.parametrize("mock_response, expected", [
Expand All @@ -115,7 +115,7 @@ def test_get_fw_management_features(self, mock_response, expected):
self.api.cdb1_chkstatus.return_value = mock_response[0]
self.api.read_cdb = MagicMock()
self.api.read_cdb.return_value = mock_response[1]
result = self.api.get_fw_management_features()
result = self.api.get_fw_management_features()['rpl']
assert result == expected

@pytest.mark.parametrize("mock_response, expected", [
Expand All @@ -128,7 +128,7 @@ def test_get_fw_info(self, mock_response, expected):
self.api.cdb1_chkstatus.return_value = mock_response[0]
self.api.read_cdb = MagicMock()
self.api.read_cdb.return_value = mock_response[1]
result = self.api.get_fw_info()
result = self.api.get_fw_info()['rpl']
assert result == expected

@pytest.mark.parametrize("input_param, mock_response, expected", [
Expand Down
Loading