Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DellEMC Z9332f: Fix SFP issue #11819

Merged
merged 3 commits into from
Sep 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
SFP_DOM_OFFSET = 256

SFP_STATUS_CONTROL_OFFSET = 110
SFP_STATUS_CONTROL_WIDTH = 7
SFP_STATUS_CONTROL_WIDTH = 1
SFP_TX_DISABLE_HARD_BIT = 7
SFP_TX_DISABLE_SOFT_BIT = 6

Expand Down Expand Up @@ -912,15 +912,14 @@ def get_rx_los(self):
rx_los_list = []
try:
if self.sfp_type == 'QSFP_DD':
offset = 512
offset = 2176
rx_los_mask = [ 0x01, 0x02, 0x04, 0x08 ,0x10, 0x20, 0x40, 0x80 ]
dom_channel_monitor_raw = self._read_eeprom_bytes(self.eeprom_path,
offset + QSFP_DD_RXLOS_OFFSET, QSFP_DD_RXLOS_WIDTH)
offset + 147, 1)
if dom_channel_monitor_raw is not None:
rx_los_data = int(dom_channel_monitor_raw[0], 8)
for mask in rx_los_mask:
rx_los_list.append(rx_los_data & mask != 0)

rx_los_list.append(bool(rx_los_data & mask != 0))
elif self.sfp_type == 'QSFP':
rx_los_data = self._get_eeprom_data('rx_los')
# As the function expects a single boolean, if any one channel experience LOS,
Expand Down Expand Up @@ -962,13 +961,19 @@ def get_tx_disable(self):
tx_disable_list = []
try:
if self.sfp_type == 'QSFP_DD':
return False
dom_channel_monitor_raw = self._read_eeprom_bytes(self.eeprom_path,2048 + 130, 1)
if dom_channel_monitor_raw is not None:
tx_disable_data = int(dom_channel_monitor_raw[0], 16)
for i in range(8):
tx_disable_list.append(bool(tx_disable_data & (1 << i) != 0))

elif self.sfp_type == 'QSFP':
tx_disable_data = self._get_eeprom_data('tx_disable')
for tx_disable_id in ('Tx1Disable', 'Tx2Disable', 'Tx3Disable', 'Tx4Disable'):
tx_disable_list.append(tx_disable_data['data'][tx_disable_id]['value'] == 'On')
else:
tx_disable_data = self._read_eeprom_bytes(self.eeprom_path, SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH)
offset = 256
tx_disable_data = self._read_eeprom_bytes(self.eeprom_path, offset + SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH)
data = int(tx_disable_data[0], 16)
tx_disable_hard = (sffbase().test_bit(data, SFP_TX_DISABLE_HARD_BIT) != 0)
tx_disable_soft = (sffbase().test_bit(data, SFP_TX_DISABLE_SOFT_BIT) != 0)
Expand All @@ -977,19 +982,41 @@ def get_tx_disable(self):
return 'N/A'
return tx_disable_list

def tx_disable(self, tx_disable):
if self.sfp_type == 'QSFP_DD':
val = 0xff if tx_disable else 0x0
return self._write_eeprom_bytes(2048 + 130, 1, bytearray([val]))

elif self.sfp_type == 'QSFP':
val = 0xf if tx_disable else 0x0
return self._write_eeprom_bytes( 0 + 86, 1, bytearray([val]))

else:
offset = 256
if tx_disable is True:
val = 64
else:
val = 191
return self._write_eeprom_bytes(offset + SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH, bytearray([val]))

def get_tx_disable_channel(self):
"""
Retrieves the TX disabled channels in this SFP
"""
tx_disable_channel = 0
tx_disable_channel = None
try:
if self.sfp_type == 'QSFP_DD':
tx_disable_channel = 0
offset = 2048
dom_channel_monitor_raw = self._read_eeprom_bytes(self.eeprom_path, offset + 130, 1)
if dom_channel_monitor_raw is not None:
tx_disable_channel = int(dom_channel_monitor_raw[0], 16)
elif self.sfp_type == 'QSFP':
tx_disable_data = self._get_eeprom_data('tx_disable')
for tx_disable_id in ('Tx1Disable', 'Tx2Disable', 'Tx3Disable', 'Tx4Disable'):
tx_disable_channel <<= 1
tx_disable_channel |= (tx_disable_data['data']['Tx1Disable']['value'] == 'On')
else:
tx_disable_channel = int(self.get_tx_disable()[0])
except (TypeError, ValueError):
return 'N/A'
return tx_disable_channel
Expand Down Expand Up @@ -1091,7 +1118,7 @@ def get_tx_bias(self):
return None
if not self.qsfp_dd_txbias_supported:
for lane in range(0, 8):
tx_bias_list.append("N/A")
tx_bias_list.append(False)
return tx_bias_list

tx_bias_data_raw = self._read_eeprom_bytes(self.eeprom_path, offset + QSFP_DD_TXBIAS_OFFSET, QSFP_DD_TXBIAS_WIDTH)
Expand Down Expand Up @@ -1128,7 +1155,7 @@ def get_rx_power(self):
return None
if not self.qsfp_dd_rxpower_supported:
for lane in range(0, 8):
rx_power_list.append("N/A")
rx_power_list.append(False)
return rx_power_list

offset = QSFP_DD_PAGE17
Expand Down Expand Up @@ -1165,7 +1192,7 @@ def get_tx_power(self):
return None
if not self.qsfp_dd_txpower_supported:
for lane in range(0, 8):
tx_power_list.append("N/A")
tx_power_list.append(False)
return tx_power_list

offset = QSFP_DD_PAGE17
Expand Down Expand Up @@ -1293,17 +1320,45 @@ def get_intl_state(self):
except ValueError: pass
return intl_state

def tx_disable(self, tx_disable):
"""
Disable SFP TX for all channels
"""
return False

def tx_disable_channel(self, channel, disable):
"""
Sets the tx_disable for specified SFP channels
Enables/Disables TX channel for SFP based on disable flag
"""
return False
channel_state = self.get_tx_disable_channel()
if channel_state is None or channel_state == 'N/A':
return False
if self.sfp_type == 'QSFP_DD':
for i in range(8):
mask = (1 << i)
if not (channel & mask):
continue
if disable:
channel_state |= mask
else:
channel_state &= ~mask
return self._write_eeprom_bytes(2048 + 130, 1, bytearray([channel_state]))
elif self.sfp_type == 'QSFP':
for i in range(4):
mask = (1 << i)
if not (channel & mask):
continue
if disable:
channel_state |= mask
else:
channel_state &= ~mask
return self._write_eeprom_bytes(86, 1, bytearray([channel_state]))
else:
aravindmani-1 marked this conversation as resolved.
Show resolved Hide resolved
channel_state = self.get_tx_disable_channel()
if channel_state is None or channel_state == "N/A":
return False
offset = 256
mask = (1 << 0)
if disable:
channel_state |= mask
else:
channel_state &= ~mask

return self._write_eeprom_bytes(offset + SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH, bytearray([channel_state]))

def set_power_override(self, power_override, power_set):
"""
Expand Down