Skip to content

Commit

Permalink
[SmartSwitch] Add implementation for the DPU chassis daemon.
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksandrivantsiv committed Oct 25, 2024
1 parent ca812b0 commit dbdee66
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 33 deletions.
182 changes: 157 additions & 25 deletions sonic-chassisd/scripts/chassisd
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ try:
import sys
import threading
import time
from datetime import datetime

from sonic_py_common import daemon_base, logger, device_info
from sonic_py_common.task_base import ProcessTaskBase
Expand Down Expand Up @@ -81,8 +82,6 @@ CHASSIS_DB_CLEANUP_MODULE_DOWN_PERIOD = 30 # Minutes
CHASSIS_LOAD_ERROR = 1
CHASSIS_NOT_SUPPORTED = 2

platform_chassis = None

SELECT_TIMEOUT = 1000

NOT_AVAILABLE = 'N/A'
Expand Down Expand Up @@ -123,6 +122,14 @@ def try_get(callback, *args, **kwargs):

return ret

def get_chassis():
try:
import sonic_platform.platform
return sonic_platform.platform.Platform().get_chassis()
except Exception as e:
self.log_error("Failed to load chassis due to {}".format(repr(e)))
sys.exit(CHASSIS_LOAD_ERROR)

#
# Module Config Updater ========================================================
#
Expand Down Expand Up @@ -573,7 +580,7 @@ class ConfigManagerTask(ProcessTaskBase):
self.logger = logger.Logger(SYSLOG_IDENTIFIER)

def task_worker(self):
self.config_updater = ModuleConfigUpdater(SYSLOG_IDENTIFIER, platform_chassis)
self.config_updater = ModuleConfigUpdater(SYSLOG_IDENTIFIER, get_chassis())
config_db = daemon_base.db_connect("CONFIG_DB")

# Subscribe to CHASSIS_MODULE table notifications in the Config DB
Expand Down Expand Up @@ -605,58 +612,155 @@ class ConfigManagerTask(ProcessTaskBase):

self.config_updater.module_config_update(key, admin_state)


#
# State Manager task ========================================================
#

class DpuStateUpdater(logger.Logger):

DP_STATE = 'dpu_data_plane_state'
DP_UPDATE_TIME = 'dpu_data_plane_time'
CP_STATE = 'dpu_control_plane_state'
CP_UPDATE_TIME = 'dpu_control_plane_time'
CP_STATE = 'dpu_control_plane_state'

def __init__(self, log_identifier, chassis):
super(DpuStateUpdater, self).__init__(log_identifier)

self.chassis = chassis

self.state_db = daemon_base.db_connect('STATE_DB')
self.app_db = daemon_base.db_connect('APPL_DB')
self.chassis_state_db = daemon_base.db_connect('CHASSIS_STATE_DB')

self.config_db = swsscommon.ConfigDBConnector()
self.config_db.connect()

try:
self.chassis.get_dataplane_state()
except NotImplementedError:
self._get_dp_state = self._get_data_plane_state_common
else:
self._get_dp_state = self.chassis.get_dataplane_state

try:
self.chassis.get_controlplane_state()
except NotImplementedError:
self._get_cp_state = self._get_control_plane_state_common
else:
self._get_cp_state = self.chassis.get_controlplane_state

self.id = self.chassis.get_dpu_id()
self.name = f'DPU{self.id}'

self.dpu_state_table = swsscommon.Table(self.chassis_state_db, 'DPU_STATE')

def _get_data_plane_state_common(self):
port_table = swsscommon.Table(self.app_db, 'PORT_TABLE')

for port in self.config_db.get_table('PORT'):
status, oper_status = port_table.hget(port, 'oper_status')
if not status or oper_status.lower() != 'up':
return False

return True

def _get_control_plane_state_common(self):
sysready_table = swsscommon.Table(self.state_db,'SYSTEM_READY')

status, sysready_state = sysready_table.hget('SYSTEM_STATE', 'Status')
if not status or sysready_state.lower() != 'up':
return False

return True

def _time_now(self):
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

def _update_dp_dpu_state(self, state):
self.dpu_state_table.hset(self.name, self.DP_STATE, state)
self.dpu_state_table.hset(self.name, self.DP_UPDATE_TIME, self._time_now())

def _update_cp_dpu_state(self, state):
self.dpu_state_table.hset(self.name, self.CP_STATE, state)
self.dpu_state_table.hset(self.name, self.CP_UPDATE_TIME, self._time_now())

def get_dp_state(self):
return 'up' if self._get_dp_state() else 'down'

def get_cp_state(self):
return 'up' if self._get_cp_state() else 'down'

def update_state(self):

dp_current_state = self.get_dp_state()
_, dp_prev_state = self.dpu_state_table.hget(self.name, self.DP_STATE)

if dp_current_state != dp_prev_state:
self._update_dp_dpu_state(dp_current_state)

cp_current_state = self.get_cp_state()
_, cp_prev_state = self.dpu_state_table.hget(self.name, self.CP_STATE)

if cp_current_state != cp_prev_state:
self._update_cp_dpu_state(cp_current_state)

def deinit(self):
self._update_dp_dpu_state('down')
self._update_cp_dpu_state('down')


#
# Daemon =======================================================================
#


class ChassisdDaemon(daemon_base.DaemonBase):
def __init__(self, log_identifier):

FATAL_SIGNALS = [signal.SIGINT, signal.SIGTERM]
NONFATAL_SIGNALS = [signal.SIGHUP]

def __init__(self, log_identifier, chassis):
super(ChassisdDaemon, self).__init__(log_identifier)

self.stop = threading.Event()

self.platform_chassis = chassis

for signum in self.FATAL_SIGNALS + self.NONFATAL_SIGNALS:
try:
signal.signal(signum, self.signal_handler)
print(f"Registered handler for {signum}")
except Exception as e:
self.log_error(f"Cannot register handler for {signum}: {e}")

# Override signal handler from DaemonBase
def signal_handler(self, sig, frame):
FATAL_SIGNALS = [signal.SIGINT, signal.SIGTERM]
NONFATAL_SIGNALS = [signal.SIGHUP]

global exit_code

if sig in FATAL_SIGNALS:
if sig in self.FATAL_SIGNALS:
exit_code = 128 + sig # Make sure we exit with a non-zero code so that supervisor will try to restart us
self.log_info("Caught {} signal '{}' - exiting...".format(exit_code,SIGNALS_TO_NAMES_DICT[sig]))
self.stop.set()
elif sig in NONFATAL_SIGNALS:
elif sig in self.NONFATAL_SIGNALS:
self.log_info("Caught signal '{}' - ignoring...".format(SIGNALS_TO_NAMES_DICT[sig]))
else:
self.log_warning("Caught unhandled signal '{}' - ignoring...".format(SIGNALS_TO_NAMES_DICT[sig]))

# Run daemon
def run(self):
global platform_chassis

self.log_info("Starting up...")

# Load new platform api class
try:
import sonic_platform.platform
platform_chassis = sonic_platform.platform.Platform().get_chassis()
except Exception as e:
self.log_error("Failed to load chassis due to {}".format(repr(e)))
sys.exit(CHASSIS_LOAD_ERROR)

# Check for valid slot numbers
my_slot = try_get(platform_chassis.get_my_slot,
my_slot = try_get(self.platform_chassis.get_my_slot,
default=INVALID_SLOT)
supervisor_slot = try_get(platform_chassis.get_supervisor_slot,
supervisor_slot = try_get(self.platform_chassis.get_supervisor_slot,
default=INVALID_SLOT)

# Check if module list is populated
self.module_updater = ModuleUpdater(SYSLOG_IDENTIFIER, platform_chassis, my_slot, supervisor_slot)
self.module_updater = ModuleUpdater(SYSLOG_IDENTIFIER, self.platform_chassis, my_slot, supervisor_slot)
self.module_updater.modules_num_update()


if ((self.module_updater.my_slot == INVALID_SLOT) or
(self.module_updater.supervisor_slot == INVALID_SLOT)):
self.log_error("Chassisd not supported for this platform")
Expand Down Expand Up @@ -687,14 +791,42 @@ class ChassisdDaemon(daemon_base.DaemonBase):

self.log_info("Shutting down...")


class DpuChassisdDaemon(ChassisdDaemon):

def run(self):
self.log_info("Starting up...")

dpu_updater = DpuStateUpdater(SYSLOG_IDENTIFIER, self.platform_chassis)

# Start main loop
self.log_info("Start daemon main loop")

while not self.stop.wait(CHASSIS_INFO_UPDATE_PERIOD_SECS):
dpu_updater.update_state()

self.log_info("Stop daemon main loop")

dpu_updater.deinit()

self.log_info("Shutting down...")


#
# Main =========================================================================
#


def main():
global exit_code
chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER)

chassis = get_chassis()

if chassis.is_smartswitch() and chassis.is_dpu():
chassisd = DpuChassisdDaemon(SYSLOG_IDENTIFIER, chassis)
else:
chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER, chassis)

chassisd.run()

sys.exit(exit_code)
Expand Down
17 changes: 17 additions & 0 deletions sonic-chassisd/tests/mock_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,20 @@ def get_model(self):

def get_revision(self):
return "Rev C"

def get_dataplane_state(self):
raise NotImplementedError

def get_controlplane_state(self):
raise NotImplementedError

class MockDpuChassis:

def get_dpu_id(self):
return 0

def get_dataplane_state(self):
raise NotImplementedError

def get_controlplane_state(self):
raise NotImplementedError
26 changes: 26 additions & 0 deletions sonic-chassisd/tests/mock_swsscommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ def get(self, key):
return rv
return None

def hget(self, key, field):
if key not in self.mock_dict or field not in self.mock_dict[key]:
return [False, None]

return [True, self.mock_dict[key][field]]

def hset(self, key, field, value):
if key not in self.mock_dict:
self.mock_dict[key] = {}

self.mock_dict[key][field] = value

def hdel(self, key, field):
if key not in self.mock_dict or field not in self.mock_dict:
return

del self.mock_dict[key][field]

def getKeys(self):
return list(self.mock_dict)

Expand Down Expand Up @@ -58,3 +76,11 @@ def loadRedisScript(self, script):
self.script = script
self.script_mock_sha = 'd79033d1cab85249929e8c069f6784474d71cc43'
return self.script_mock_sha

class ConfigDBConnector:

def connect(*args, **kwargs):
pass

def get_table(*args, **kwargs):
pass
27 changes: 19 additions & 8 deletions sonic-chassisd/tests/test_chassisd.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,8 @@ def verify_fabric_asic(asic_name, asic_pci_address, module_name, asic_id_in_modu

def test_signal_handler():
exit_code = 0
daemon_chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER)
chassis = MockChassis()
daemon_chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER, chassis)
daemon_chassisd.stop.set = MagicMock()
daemon_chassisd.log_info = MagicMock()
daemon_chassisd.log_warning = MagicMock()
Expand Down Expand Up @@ -686,21 +687,31 @@ def test_signal_handler():

def test_daemon_run_supervisor():
# Test the chassisd run
daemon_chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER)
chassis = MockChassis()

chassis.get_supervisor_slot = Mock()
chassis.get_supervisor_slot.return_value = 0
chassis.get_my_slot = Mock()
chassis.get_my_slot.return_value = 0

daemon_chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER, chassis)
daemon_chassisd.stop = MagicMock()
daemon_chassisd.stop.wait.return_value = True
daemon_chassisd.run()

def test_daemon_run_linecard():
# Test the chassisd run
daemon_chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER)
chassis = MockChassis()

chassis.get_supervisor_slot = Mock()
chassis.get_supervisor_slot.return_value = 0
chassis.get_my_slot = Mock()
chassis.get_my_slot.return_value = 1

daemon_chassisd = ChassisdDaemon(SYSLOG_IDENTIFIER, chassis)
daemon_chassisd.stop = MagicMock()
daemon_chassisd.stop.wait.return_value = True

import sonic_platform.platform
with patch.object(sonic_platform.platform.Chassis, 'get_my_slot') as mock:
mock.return_value = sonic_platform.platform.Platform().get_chassis().get_supervisor_slot() + 1
daemon_chassisd.run()
daemon_chassisd.run()

def test_chassis_db_cleanup():
chassis = MockChassis()
Expand Down
Loading

0 comments on commit dbdee66

Please sign in to comment.