From 4229dd6bd47a04b62e5489ea76729c31afc5db26 Mon Sep 17 00:00:00 2001 From: vaibhav-dahiya Date: Fri, 2 Sep 2022 00:35:26 +0000 Subject: [PATCH] [ycable] cleanup logic for creating grpc future ready Signed-off-by: vaibhav-dahiya --- .../ycable/ycable_utilities/y_cable_helper.py | 71 ++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index c037c22dc..fa5061112 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -332,7 +332,6 @@ def wrapper(*args, **kwargs): return wrapper - def retry_setup_grpc_channel_for_port(port, asic_index): global grpc_port_stubs @@ -409,35 +408,60 @@ def get_grpc_credentials(type, kvp): return credential -def create_channel(type,level, kvp, soc_ip): +def connect_channel(channel, stub, port): + channel_ready = grpc.channel_ready_future(channel) retries = 3 + for _ in range(retries): + try: + channel_ready.result(timeout=2) + except grpc.FutureTimeoutError: + helper_logger.log_warning("gRPC port {} state changed to SHUTDOWN".format(port)) + else: + break - if type == "secure": - credential = get_grpc_credentials(level, kvp) - target_name = kvp.get("grpc_ssl_credential", None) - if credential is None or target_name is None: - return (None, None) +def create_channel(type,level, kvp, soc_ip, port): + + channel, stub = None, None + def wait_for_state_change(channel_connectivity): + if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE: + helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.CONNECTING: + helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.READY: + helper_logger.log_notice("gRPC port {} state changed to READY".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.IDLE: + helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port)) + if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN: + helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port)) + + + if type == "secure": + credential = get_grpc_credentials(level, kvp) + target_name = kvp.get("grpc_ssl_credential", None) + if credential is None or target_name is None: + return (None, None) - GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) + GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) - channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) - else: - channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) + else: + channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) - stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) - channel_ready = grpc.channel_ready_future(channel) + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) - try: - channel_ready.result(timeout=2) - except grpc.FutureTimeoutError: - channel = None - stub = None - else: - break + if channel is not None: + channel.subscribe(wait_for_state_change) + + connect_channel(channel, stub, port) + if channel is None: + helper_logger.log_notice("gRPC port {} channel is None".format(port)) + if stub is None: + helper_logger.log_notice("gRPC port {} stub is None".format(port)) + return channel, stub def setup_grpc_channel_for_port(port, soc_ip): @@ -490,12 +514,7 @@ def setup_grpc_channel_for_port(port, soc_ip): kvp = dict(fvs) - channel, stub = create_channel(type, level, kvp, soc_ip) - - if stub is None: - helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) - if channel is None: - helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port)) + channel, stub = create_channel(type, level, kvp, soc_ip, port) return channel, stub