Skip to content

Commit

Permalink
add async notification support in active-active topo; refactor code f…
Browse files Browse the repository at this point in the history
…or ycable tasks for change events (#327)

Signed-off-by: vaibhav-dahiya vdahiya@microsoft.com
This PR adds an enhancement to ycabled when if and whenever a notification arrives from SoC or server that it is going to be serviced. The notification can come at anytime and hence it must be listened to or awaited by the client at all times. For this we use asyncio lib and avail the async functionality in python.

the added code defines a GracefulRestartClient class that can be used to send requests to a gRPC server for graceful restart. The class has three async methods: send_request, receive_response, and notify_graceful_restart_start.

send_request method retrieves tor from request_queue, creates a request with the ToR, and sends it to the server using the stub's NotifyGracefulRestartStart method. It then reads the response from the server and prints the details of the response.

receive_response method retrieves response from response_queue, prints/puts the the response in DB, sleeps for the period mentioned in the response, and then puts the tor of the response back in the request_queue

We follow the existing ycabled pattern and instantiate the task_worker which contains all these tasks in a seperate thread

Description
Motivation and Context
How Has This Been Tested?
UT and using this server to validate
  • Loading branch information
vdahiya12 authored May 26, 2023
1 parent 6202a95 commit d6b2a02
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 1 deletion.
32 changes: 32 additions & 0 deletions sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,35 @@ message ServerVersionReply {
}


service GracefulRestart {

rpc NotifyGracefulRestartStart(GracefulAdminRequest) returns (stream GracefulAdminResponse) {}

}

enum ToRSide {
LOWER_TOR = 0;
UPPER_TOR =1;
}

message GracefulAdminRequest {
ToRSide tor = 1;
}

enum GracefulRestartMsgType {
SERVICE_BEGIN = 0;
SERVICE_END = 1;// send this always from SoC Side even if not paired with Begin
}

enum GracefulRestartNotifyType {
CONTROL_PLANE = 0;// need proper definitions
DATA_PLANE = 1;
BOTH = 2;
}

message GracefulAdminResponse {
GracefulRestartMsgType msgtype = 1;
GracefulRestartNotifyType notifytype = 2;
string guid = 3;
int32 period = 4; // in seconds
}
17 changes: 16 additions & 1 deletion sonic-ycabled/tests/test_y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2676,7 +2676,6 @@ def test_get_muxcable_static_info_read_side_peer_exceptions(self, platform_sfput

status = True
fvs = [('state', "auto"), ('read_side', 2)]

y_cable_tbl[asic_index] = swsscommon.Table(
test_db[asic_index], "Y_CABLE_TABLE")
y_cable_tbl[asic_index].get.return_value = (status, fvs)
Expand Down Expand Up @@ -7120,3 +7119,19 @@ def test_get_muxcable_info_for_active_active(self):



@patch("grpc.aio.secure_channel")
@patch("proto_out.linkmgr_grpc_driver_pb2_grpc.GracefulRestartStub")
def test_ycable_graceful_client(self, channel, stub):


mock_channel = MagicMock()
channel.return_value = mock_channel

mock_stub = MagicMock()
mock_stub.NotifyGracefulRestartStart = MagicMock(return_value=[4, 5])
stub.return_value = mock_stub


read_side = 1
Y_cable_restart_client = GracefulRestartClient("Ethernet48", None, read_side)

32 changes: 32 additions & 0 deletions sonic-ycabled/tests/test_ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,36 @@ def test_ycable_helper_class_run_loop_with_exception(self):
assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace))
assert("swsscommon.Select" in str(trace))

class TestYcableAsyncScript(object):

@patch("swsscommon.swsscommon.Select", MagicMock(side_effect=NotImplementedError))
@patch("swsscommon.swsscommon.Select.addSelectable", MagicMock(side_effect=NotImplementedError))
@patch("swsscommon.swsscommon.Select.select", MagicMock(side_effect=NotImplementedError))
@patch("swsscommon.swsscommon.Table.get", MagicMock(
return_value=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]))
@patch("ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port", MagicMock(side_effect=NotImplementedError))
@patch("ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil")
def test_ycable_helper_async_client_run_loop_with_exception(self, sfputil):


sfputil.logical = ["Ethernet0", "Ethernet4"]
sfputil.get_asic_id_for_logical_port = MagicMock(return_value=0)
Y_cable_async_task = YCableAsyncNotificationTask()
expected_exception_start = None
expected_exception_join = None
trace = None
try:
Y_cable_async_task.start()
Y_cable_async_task.task_worker()
Y_cable_async_task.join()
except Exception as e1:
expected_exception_start = e1
trace = traceback.format_exc()




assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace))
assert("setup_grpc_channel_for_port" in str(trace))

3 changes: 3 additions & 0 deletions sonic-ycabled/ycable/ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,9 @@ def run(self):
y_cable_cli_worker_update = y_cable_helper.YCableCliUpdateTask()
y_cable_cli_worker_update.start()
self.threads.append(y_cable_cli_worker_update)
y_cable_async_noti_worker = y_cable_helper.YCableAsyncNotificationTask()
y_cable_async_noti_worker.start()
self.threads.append(y_cable_async_noti_worker)

# Start main loop
self.log_info("Start daemon main loop")
Expand Down
117 changes: 117 additions & 0 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4013,3 +4013,120 @@ def join(self):

raise self.exc

class GracefulRestartClient:
def __init__(self, port, channel: grpc.aio.secure_channel, read_side):
self.port = port
self.stub = linkmgr_grpc_driver_pb2_grpc.GracefulRestartStub(channel)
self.request_queue = asyncio.Queue()
self.response_queue = asyncio.Queue()
self.read_side = read_side

async def send_request_and_get_response(self):
while True:
tor = await self.request_queue.get()
request = linkmgr_grpc_driver_pb2.GracefulAdminRequest(tor=tor)
response = None
try:
response_stream = self.stub.NotifyGracefulRestartStart(request)
index = 0
async for response in response_stream:
helper_logger.log_notice("Async client received from direct read period port = {}: period = {} index = {} guid = {} notifytype {} msgtype = {}".format(self.port, response.period, index, response.guid, response.notifytype, response.msgtype))
helper_logger.log_debug("Async Debug only :{} {}".format(dir(response_stream), dir(response)))
index = index+1
if response == grpc.aio.EOF:
break
helper_logger.log_notice("Async client finished loop from direct read period port:{} ".format(self.port))
index = index+1
except grpc.RpcError as e:
helper_logger.log_notice("Async client port = {} exception occured because of {} ".format(self.port, e.code()))

await self.response_queue.put(response)

async def process_response(self):
while True:
response = await self.response_queue.get()
helper_logger.log_debug("Async recieved a response from {} {}".format(self.port, response))
# do something with response
if response is not None:
await asyncio.sleep(response.period)
else:
await asyncio.sleep(20)

if self.read_side == 0:
tor_side = linkmgr_grpc_driver_pb2.ToRSide.UPPER_TOR
else:
tor_side = linkmgr_grpc_driver_pb2.ToRSide.LOWER_TOR
await self.request_queue.put(tor_side)

async def notify_graceful_restart_start(self, tor: linkmgr_grpc_driver_pb2.ToRSide):
await self.request_queue.put(tor)


class YCableAsyncNotificationTask(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

self.exc = None
self.task_stopping_event = threading.Event()
self.table_helper = y_cable_table_helper.YcableAsyncNotificationTableHelper()
self.read_side = process_loopback_interface_and_get_read_side(self.table_helper.loopback_keys)

async def task_worker(self):

# Create tasks for all ports
logical_port_list = y_cable_platform_sfputil.logical
tasks = []
for logical_port_name in logical_port_list:
if self.task_stopping_event.is_set():
break

# Get the asic to which this port belongs
asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port(logical_port_name)
(status, fvs) = self.table_helper.get_port_tbl()[asic_index].get(logical_port_name)
if status is False:
helper_logger.log_warning(
"Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, self.table_helper.get_port_tbl()[asic_index].getTableName()))
continue

else:
# Convert list of tuples to a dictionary
mux_table_dict = dict(fvs)
if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict:

soc_ipv4_full = mux_table_dict.get("soc_ipv4", None)
if soc_ipv4_full is not None:
soc_ipv4 = soc_ipv4_full.split('/')[0]

channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), True)

client = GracefulRestartClient(logical_port_name, channel, read_side)
tasks.append(asyncio.create_task(client.send_request_and_get_response()))
tasks.append(asyncio.create_task(client.process_response()))

if self.read_side == 0:
tor_side = linkmgr_grpc_driver_pb2.ToRSide.UPPER_TOR
else:
tor_side = linkmgr_grpc_driver_pb2.ToRSide.LOWER_TOR

tasks.append(asyncio.create_task(client.notify_graceful_restart_start(tor_side)))

await asyncio.gather(*tasks)

def run(self):
if self.task_stopping_event.is_set():
return

try:
asyncio.run(self.task_worker())
except Exception as e:
helper_logger.log_error("Exception occured at child thread YcableCliUpdateTask due to {} {}".format(repr(e), traceback.format_exc()))
self.exc = e

def join(self):

threading.Thread.join(self)

helper_logger.log_info("stopped all thread")
if self.exc is not None:

raise self.exc
58 changes: 58 additions & 0 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_table_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,61 @@ def get_appl_db(self):
return self.appl_db


class YcableAsyncNotificationTableHelper(object):
def __init__(self):

self.state_db = {}
self.config_db = {}
self.appl_db = {}
self.port_tbl = {}
self.status_tbl = {}
self.y_cable_tbl = {}
self.mux_tbl = {}
self.grpc_config_tbl = {}
self.fwd_state_response_tbl = {}
self.loopback_tbl= {}
self.loopback_keys = {}

# Get the namespaces in the platform
namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace)
self.appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace)
self.config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace)
self.port_tbl[asic_id] = swsscommon.Table(self.config_db[asic_id], "MUX_CABLE")
self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE)
self.y_cable_tbl[asic_id] = swsscommon.Table(
self.state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME)
self.mux_tbl[asic_id] = swsscommon.Table(
self.state_db[asic_id], MUX_CABLE_INFO_TABLE)
self.grpc_config_tbl[asic_id] = swsscommon.Table(self.config_db[asic_id], "GRPCCLIENT")
self.fwd_state_response_tbl[asic_id] = swsscommon.Table(
self.appl_db[asic_id], "FORWARDING_STATE_RESPONSE")
self.loopback_tbl[asic_id] = swsscommon.Table(
self.config_db[asic_id], "LOOPBACK_INTERFACE")
self.loopback_keys[asic_id] = self.loopback_tbl[asic_id].getKeys()

def get_state_db(self):
return self.state_db

def get_config_db(self):
return self.config_db

def get_port_tbl(self):
return self.port_tbl

def get_status_tbl(self):
return self.status_tbl

def get_y_cable_tbl(self):
return self.y_cable_tbl

def get_mux_tbl(self):
return self.mux_tbl

def get_grpc_config_tbl(self):
return self.grpc_config_tbl

def get_fwd_state_response_tbl(self):
return self.fwd_state_response_tbl

0 comments on commit d6b2a02

Please sign in to comment.