Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ycable] cleanup logic for creating grpc future ready #289

Merged
merged 7 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sonic-ycabled/tests/test_y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4962,10 +4962,18 @@ def test_setup_grpc_channel_for_port(self):
with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util:

patched_util.get_asic_id_for_logical_port.return_value = 0
rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1")
(channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1")

assert(rc == (None, None))
assert(stub == True)
assert(channel != None)

def test_connect_channel(self):

with patch('grpc.channel_ready_future') as patched_util:

patched_util.result.return_value = 0
rc = connect_channel(patched_util, None, None)
assert(rc == None)

def test_setup_grpc_channels(self):

Expand Down
70 changes: 48 additions & 22 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ def wrapper(*args, **kwargs):

return wrapper


def retry_setup_grpc_channel_for_port(port, asic_index):

global grpc_port_stubs
Expand Down Expand Up @@ -409,34 +408,61 @@ def get_grpc_credentials(type, kvp):

return credential

def create_channel(type,level, kvp, soc_ip):
def connect_channel(channel, stub, port):

channel_ready = grpc.channel_ready_future(channel)
retries = 3

for _ in range(retries):
try:
channel_ready.result(timeout=2)
except grpc.FutureTimeoutError:
helper_logger.log_warning("gRPC port {} state changed to SHUTDOWN".format(port))
else:
break

if type == "secure":
credential = get_grpc_credentials(level, kvp)
target_name = kvp.get("grpc_ssl_credential", None)
if credential is None or target_name is None:
return (None, None)
def create_channel(type, level, kvp, soc_ip, port):

GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name)))

channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS)
else:
channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS)
#Helper callback to get an channel connectivity state
def wait_for_state_change(channel_connectivity):
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port))
if channel_connectivity == grpc.ChannelConnectivity.CONNECTING:
helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port))
if channel_connectivity == grpc.ChannelConnectivity.READY:
helper_logger.log_notice("gRPC port {} state changed to READY".format(port))
if channel_connectivity == grpc.ChannelConnectivity.IDLE:
helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port))
if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN:
helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port))

stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)

channel_ready = grpc.channel_ready_future(channel)
if type == "secure":
credential = get_grpc_credentials(level, kvp)
target_name = kvp.get("grpc_ssl_credential", None)
if credential is None or target_name is None:
return (None, None)

try:
channel_ready.result(timeout=2)
except grpc.FutureTimeoutError:
channel = None
stub = None
else:
break
GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name)))

channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS)
else:
channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS)


stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)


if channel is not None:
channel.subscribe(wait_for_state_change)

#connect_channel(channel, stub, port)
"""
Comment the connect channel call for now, since it is not required for normal gRPC I/O
and all use cases work without it.
TODO: check if this subroutine call can be ommitted for all use cases in future enhancements
"""

return channel, stub

Expand Down Expand Up @@ -490,12 +516,12 @@ def setup_grpc_channel_for_port(port, soc_ip):
kvp = dict(fvs)


channel, stub = create_channel(type, level, kvp, soc_ip)
channel, stub = create_channel(type, level, kvp, soc_ip, port)

if stub is None:
helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port))
if channel is None:
helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port))
helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port))

return channel, stub

Expand Down