From 9cb04ff7afc6033e1caf6fc2aa10fd08f13c4446 Mon Sep 17 00:00:00 2001 From: anitarua <anita@momentohq.com> Date: Fri, 20 Oct 2023 11:40:29 -0700 Subject: [PATCH 1/2] fix: use just one grpc connection for topics --- internal/grpcmanagers/topic_manager.go | 1 + momento/pubsub_client.go | 18 ++++-------------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/internal/grpcmanagers/topic_manager.go b/internal/grpcmanagers/topic_manager.go index 5222c276..1b50314d 100644 --- a/internal/grpcmanagers/topic_manager.go +++ b/internal/grpcmanagers/topic_manager.go @@ -27,6 +27,7 @@ func NewStreamTopicGrpcManager(request *models.TopicStreamGrpcManagerRequest) (* endpoint, grpc.WithTransportCredentials(credentials.NewTLS(config)), grpc.WithChainStreamInterceptor(interceptor.AddStreamHeaderInterceptor(authToken)), + grpc.WithChainUnaryInterceptor(interceptor.AddHeadersInterceptor(authToken)), ) if err != nil { diff --git a/momento/pubsub_client.go b/momento/pubsub_client.go index f0b53ba9..c6dd1f57 100644 --- a/momento/pubsub_client.go +++ b/momento/pubsub_client.go @@ -9,14 +9,12 @@ import ( "github.com/momentohq/client-sdk-go/internal/models" "github.com/momentohq/client-sdk-go/internal/momentoerrors" pb "github.com/momentohq/client-sdk-go/internal/protos" - "github.com/momentohq/client-sdk-go/internal/retry" "google.golang.org/grpc" ) type pubSubClient struct { streamTopicManagers []*grpcmanagers.TopicGrpcManager - unaryDataManager *grpcmanagers.DataGrpcManager unaryGrpcClient pb.PubsubClient endpoint string } @@ -29,7 +27,8 @@ func newPubSubClient(request *models.PubSubClientRequest) (*pubSubClient, moment if numSubscriptions > 0 { // a single channel can support 100 streams, so we need to create enough // channels to handle the maximum number of subscriptions - numChannels = uint32(math.Ceil(numSubscriptions / 100.0)) + // plus one for the publishing channel + numChannels = uint32(math.Ceil((numSubscriptions + 1) / 100.0)) } else { numChannels = 1 } @@ -44,19 +43,11 @@ func newPubSubClient(request *models.PubSubClientRequest) (*pubSubClient, moment } streamTopicManagers = append(streamTopicManagers, streamTopicManager) } - - unaryDataManager, err := grpcmanagers.NewUnaryDataGrpcManager(&models.DataGrpcManagerRequest{ - CredentialProvider: request.CredentialProvider, - RetryStrategy: retry.NewNeverRetryStrategy(), - }) - if err != nil { - return nil, err - } + lastStreamTopicManager := streamTopicManagers[numChannels-1] return &pubSubClient{ streamTopicManagers: streamTopicManagers, - unaryDataManager: unaryDataManager, - unaryGrpcClient: pb.NewPubsubClient(unaryDataManager.Conn), + unaryGrpcClient: lastStreamTopicManager.StreamClient, endpoint: request.CredentialProvider.GetCacheEndpoint(), }, nil } @@ -113,5 +104,4 @@ func (client *pubSubClient) close() { for clientIndex := range client.streamTopicManagers { defer client.streamTopicManagers[clientIndex].Close() } - defer client.unaryDataManager.Close() } From 1d6fd8f50e2c4ad7dfac25568541d76df2441020 Mon Sep 17 00:00:00 2001 From: anitarua <anita@momentohq.com> Date: Fri, 20 Oct 2023 14:51:01 -0700 Subject: [PATCH 2/2] publish should use next topic manager in rotation instead of reserved channel --- momento/pubsub_client.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/momento/pubsub_client.go b/momento/pubsub_client.go index c6dd1f57..06a6fcb8 100644 --- a/momento/pubsub_client.go +++ b/momento/pubsub_client.go @@ -15,7 +15,6 @@ import ( type pubSubClient struct { streamTopicManagers []*grpcmanagers.TopicGrpcManager - unaryGrpcClient pb.PubsubClient endpoint string } @@ -43,11 +42,9 @@ func newPubSubClient(request *models.PubSubClientRequest) (*pubSubClient, moment } streamTopicManagers = append(streamTopicManagers, streamTopicManager) } - lastStreamTopicManager := streamTopicManagers[numChannels-1] return &pubSubClient{ streamTopicManagers: streamTopicManagers, - unaryGrpcClient: lastStreamTopicManager.StreamClient, endpoint: request.CredentialProvider.GetCacheEndpoint(), }, nil } @@ -69,9 +66,10 @@ func (client *pubSubClient) topicSubscribe(ctx context.Context, request *TopicSu } func (client *pubSubClient) topicPublish(ctx context.Context, request *TopicPublishRequest) error { + topicManager := client.getNextStreamTopicManager() switch value := request.Value.(type) { case String: - _, err := client.unaryGrpcClient.Publish(ctx, &pb.XPublishRequest{ + _, err := topicManager.StreamClient.Publish(ctx, &pb.XPublishRequest{ CacheName: request.CacheName, Topic: request.TopicName, Value: &pb.XTopicValue{ @@ -82,7 +80,7 @@ func (client *pubSubClient) topicPublish(ctx context.Context, request *TopicPubl }) return err case Bytes: - _, err := client.unaryGrpcClient.Publish(ctx, &pb.XPublishRequest{ + _, err := topicManager.StreamClient.Publish(ctx, &pb.XPublishRequest{ CacheName: request.CacheName, Topic: request.TopicName, Value: &pb.XTopicValue{