Skip to content

Commit

Permalink
feat: warn users if approaching grpc max concurrent streams limit, ad…
Browse files Browse the repository at this point in the history
…d method to close topics subscriptions (#362)

* fix: warn users if approaching grpc max concurrent streams limit

* revise checkNumConcurrentStreams function

* use logger instead of fmt.printf

* add method for closing a single subscription

* decrement upon subscription closing

* bump logged connection message from INFO to WARN

* missed another INFO to WARN

* fix lint errors
  • Loading branch information
anitarua authored Nov 2, 2023
1 parent cd6b46d commit 312c10d
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 8 deletions.
2 changes: 2 additions & 0 deletions internal/models/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"time"

"github.com/momentohq/client-sdk-go/config/logger"
"github.com/momentohq/client-sdk-go/internal/retry"

"github.com/momentohq/client-sdk-go/auth"
Expand Down Expand Up @@ -61,6 +62,7 @@ type DataClientRequest struct {
type PubSubClientRequest struct {
TopicsConfiguration config.TopicsConfiguration
CredentialProvider auth.CredentialProvider
Log logger.MomentoLogger
}

type PingClientRequest struct {
Expand Down
48 changes: 42 additions & 6 deletions momento/pubsub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package momento

import (
"context"
"fmt"
"math"
"sync/atomic"

"github.com/momentohq/client-sdk-go/config/logger"
"github.com/momentohq/client-sdk-go/internal/grpcmanagers"
"github.com/momentohq/client-sdk-go/internal/models"
"github.com/momentohq/client-sdk-go/internal/momentoerrors"
Expand All @@ -16,12 +18,14 @@ import (
type pubSubClient struct {
streamTopicManagers []*grpcmanagers.TopicGrpcManager
endpoint string
log logger.MomentoLogger
}

var streamTopicManagerCount uint64
var numGrpcStreams int64
var numChannels uint32

func newPubSubClient(request *models.PubSubClientRequest) (*pubSubClient, momentoerrors.MomentoSvcErr) {
var numChannels uint32
numSubscriptions := float64(request.TopicsConfiguration.GetMaxSubscriptions())
numGrpcChannels := request.TopicsConfiguration.GetNumGrpcChannels()

Expand Down Expand Up @@ -56,29 +60,51 @@ func newPubSubClient(request *models.PubSubClientRequest) (*pubSubClient, moment
return &pubSubClient{
streamTopicManagers: streamTopicManagers,
endpoint: request.CredentialProvider.GetCacheEndpoint(),
log: request.Log,
}, nil
}

func (client *pubSubClient) getNextStreamTopicManager() *grpcmanagers.TopicGrpcManager {
nextMangerIndex := atomic.AddUint64(&streamTopicManagerCount, 1)
topicManager := client.streamTopicManagers[nextMangerIndex%uint64(len(client.streamTopicManagers))]
nextManagerIndex := atomic.AddUint64(&streamTopicManagerCount, 1)
topicManager := client.streamTopicManagers[nextManagerIndex%uint64(len(client.streamTopicManagers))]
return topicManager
}

func (client *pubSubClient) topicSubscribe(ctx context.Context, request *TopicSubscribeRequest) (*grpcmanagers.TopicGrpcManager, grpc.ClientStream, error) {
func (client *pubSubClient) topicSubscribe(ctx context.Context, request *TopicSubscribeRequest) (*grpcmanagers.TopicGrpcManager, grpc.ClientStream, context.Context, context.CancelFunc, error) {

checkNumConcurrentStreams(client.log)

// try withCancel on context
cancelContext, cancelFunction := context.WithCancel(ctx)

atomic.AddInt64(&numGrpcStreams, 1)
topicManager := client.getNextStreamTopicManager()
clientStream, err := topicManager.StreamClient.Subscribe(ctx, &pb.XSubscriptionRequest{
clientStream, err := topicManager.StreamClient.Subscribe(cancelContext, &pb.XSubscriptionRequest{
CacheName: request.CacheName,
Topic: request.TopicName,
ResumeAtTopicSequenceNumber: request.ResumeAtTopicSequenceNumber,
})
return topicManager, clientStream, err

if err != nil {
atomic.AddInt64(&numGrpcStreams, -1)
cancelFunction()
return nil, nil, nil, nil, err
}

if numGrpcStreams > 0 && (int64(numChannels*100)-numGrpcStreams < 10) {
client.log.Warn(fmt.Sprintf("WARNING: approaching grpc maximum concurrent stream limit, %d remaining of total %d streams\n", int64(numChannels*100)-numGrpcStreams, numChannels*100))
}

return topicManager, clientStream, cancelContext, cancelFunction, err
}

func (client *pubSubClient) topicPublish(ctx context.Context, request *TopicPublishRequest) error {
checkNumConcurrentStreams(client.log)

topicManager := client.getNextStreamTopicManager()
switch value := request.Value.(type) {
case String:
atomic.AddInt64(&numGrpcStreams, 1)
_, err := topicManager.StreamClient.Publish(ctx, &pb.XPublishRequest{
CacheName: request.CacheName,
Topic: request.TopicName,
Expand All @@ -88,8 +114,10 @@ func (client *pubSubClient) topicPublish(ctx context.Context, request *TopicPubl
},
},
})
atomic.AddInt64(&numGrpcStreams, -1)
return err
case Bytes:
atomic.AddInt64(&numGrpcStreams, 1)
_, err := topicManager.StreamClient.Publish(ctx, &pb.XPublishRequest{
CacheName: request.CacheName,
Topic: request.TopicName,
Expand All @@ -99,6 +127,7 @@ func (client *pubSubClient) topicPublish(ctx context.Context, request *TopicPubl
},
},
})
atomic.AddInt64(&numGrpcStreams, -1)
return err
default:
return momentoerrors.NewMomentoSvcErr(
Expand All @@ -109,7 +138,14 @@ func (client *pubSubClient) topicPublish(ctx context.Context, request *TopicPubl
}

func (client *pubSubClient) close() {
atomic.AddInt64(&numGrpcStreams, -numGrpcStreams)
for clientIndex := range client.streamTopicManagers {
defer client.streamTopicManagers[clientIndex].Close()
}
}

func checkNumConcurrentStreams(log logger.MomentoLogger) {
if numGrpcStreams > 0 && numGrpcStreams >= int64(numChannels*100) {
log.Warn("Already at maximum number of concurrent grpc streams, cannot make new publish or subscribe requests")
}
}
5 changes: 4 additions & 1 deletion momento/topic_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewTopicClient(topicsConfiguration config.TopicsConfiguration, credentialPr
pubSubClient, err := newPubSubClient(&models.PubSubClientRequest{
CredentialProvider: credentialProvider,
TopicsConfiguration: topicsConfiguration,
Log: client.log,
})
if err != nil {
return nil, convertMomentoSvcErrorToCustomerError(momentoerrors.ConvertSvcErr(err))
Expand All @@ -57,7 +58,7 @@ func (c defaultTopicClient) Subscribe(ctx context.Context, request *TopicSubscri
return nil, err
}

topicManager, clientStream, err := c.pubSubClient.topicSubscribe(ctx, &TopicSubscribeRequest{
topicManager, clientStream, cancelContext, cancelFunction, err := c.pubSubClient.topicSubscribe(ctx, &TopicSubscribeRequest{
CacheName: request.CacheName,
TopicName: request.TopicName,
})
Expand Down Expand Up @@ -89,6 +90,8 @@ func (c defaultTopicClient) Subscribe(ctx context.Context, request *TopicSubscri
cacheName: request.CacheName,
topicName: request.TopicName,
log: c.log,
cancelContext: cancelContext,
cancelFunction: cancelFunction,
}, nil
}

Expand Down
102 changes: 102 additions & 0 deletions momento/topic_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,106 @@ var _ = Describe("Pubsub", func() {
).Error().NotTo(HaveOccurred())
})
})

It("Can close individual topics subscriptions without closing the grpc channel", func() {
topic1 := fmt.Sprintf("golang-topics-test-%s", uuid.NewString())
topic2 := fmt.Sprintf("golang-topics-test-%s", uuid.NewString())

// subscribe to one topic
sub1, err := sharedContext.TopicClient.Subscribe(sharedContext.Ctx, &TopicSubscribeRequest{
CacheName: sharedContext.CacheName,
TopicName: topic1,
})
if err != nil {
panic(err)
}

// subscribe to another topic
sub2, err := sharedContext.TopicClient.Subscribe(sharedContext.Ctx, &TopicSubscribeRequest{
CacheName: sharedContext.CacheName,
TopicName: topic2,
})
if err != nil {
panic(err)
}

// publish messages to both
_, err = sharedContext.TopicClient.Publish(sharedContext.Ctx, &TopicPublishRequest{
CacheName: sharedContext.CacheName,
TopicName: topic1,
Value: String("hello-1"),
})
if err != nil {
panic(err)
}
_, err = sharedContext.TopicClient.Publish(sharedContext.Ctx, &TopicPublishRequest{
CacheName: sharedContext.CacheName,
TopicName: topic2,
Value: String("hello-2"),
})
if err != nil {
panic(err)
}

// expect two Item() successes
item, err := sub1.Item(sharedContext.Ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case String:
Expect(msg).To(Equal(String("hello-1")))
case Bytes:
Fail("Expected topic item to be a string")
}

item, err = sub2.Item(sharedContext.Ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case String:
Expect(msg).To(Equal(String("hello-2")))
case Bytes:
Fail("Expected topic item to be a string")
}

// close one subscription
sub1.Close()

// publish messages to both
_, err = sharedContext.TopicClient.Publish(sharedContext.Ctx, &TopicPublishRequest{
CacheName: sharedContext.CacheName,
TopicName: topic1,
Value: String("hello-again-1"),
})
if err != nil {
panic(err)
}

_, err = sharedContext.TopicClient.Publish(sharedContext.Ctx, &TopicPublishRequest{
CacheName: sharedContext.CacheName,
TopicName: topic2,
Value: String("hello-again-2"),
})
if err != nil {
panic(err)
}

// expect one Item() success and one failure
item, err = sub1.Item(sharedContext.Ctx)
Expect(item).To(BeNil())
Expect(err.Error()).To(Equal("context canceled"))

item, err = sub2.Item(sharedContext.Ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case String:
Expect(msg).To(Equal(String("hello-again-2")))
case Bytes:
Fail("Expected topic item to be a string")
}
})
})
16 changes: 15 additions & 1 deletion momento/topic_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package momento
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/momentohq/client-sdk-go/config/logger"
Expand All @@ -14,6 +15,7 @@ import (

type TopicSubscription interface {
Item(ctx context.Context) (TopicValue, error)
Close()
}

type topicSubscription struct {
Expand All @@ -24,6 +26,8 @@ type topicSubscription struct {
topicName string
log logger.MomentoLogger
lastKnownSequenceNumber uint64
cancelContext context.Context
cancelFunction context.CancelFunc
}

func (s *topicSubscription) Item(ctx context.Context) (TopicValue, error) {
Expand All @@ -34,6 +38,9 @@ func (s *topicSubscription) Item(ctx context.Context) (TopicValue, error) {
case <-ctx.Done():
// Context has been canceled, return an error
return nil, ctx.Err()
case <-s.cancelContext.Done():
// Context has been canceled, return an error
return nil, s.cancelContext.Err()
default:
// Proceed as is
}
Expand Down Expand Up @@ -86,7 +93,7 @@ func (s *topicSubscription) attemptReconnect(ctx context.Context) {
for {
s.log.Debug("Attempting reconnecting to client stream")
time.Sleep(seconds)
newTopicManager, newStream, err := s.momentoTopicClient.topicSubscribe(ctx, &TopicSubscribeRequest{
newTopicManager, newStream, cancelContext, cancelFunction, err := s.momentoTopicClient.topicSubscribe(ctx, &TopicSubscribeRequest{
CacheName: s.cacheName,
TopicName: s.topicName,
ResumeAtTopicSequenceNumber: s.lastKnownSequenceNumber,
Expand All @@ -98,7 +105,14 @@ func (s *topicSubscription) attemptReconnect(ctx context.Context) {
s.log.Debug("successfully reconnected to subscription stream")
s.topicManager = newTopicManager
s.grpcClient = newStream
s.cancelContext = cancelContext
s.cancelFunction = cancelFunction
return
}
}
}

func (s *topicSubscription) Close() {
atomic.AddInt64(&numGrpcStreams, -1)
s.cancelFunction()
}

0 comments on commit 312c10d

Please sign in to comment.