Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds retry interceptor #223

Merged
merged 11 commits into from
Mar 3, 2023
41 changes: 33 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,47 @@ package config
import (
"time"

"github.com/momentohq/client-sdk-go/internal/retry"

"github.com/momentohq/client-sdk-go/config/logger"
)

type ConfigurationProps struct {
LoggerFactory logger.MomentoLoggerFactory
TransportStrategy TransportStrategy
RetryStrategy retry.Strategy
}

type Configuration interface {
// GetLoggerFactory Returns the current configuration options for logging verbosity and format
GetLoggerFactory() logger.MomentoLoggerFactory

// GetRetryStrategy Returns the current configuration options for wire interactions with the Momento service
GetRetryStrategy() retry.Strategy

// GetTransportStrategy Returns the current configuration options for wire interactions with the Momento service
GetTransportStrategy() TransportStrategy

// WithTransportStrategy Copy constructor for overriding TransportStrategy returns a new Configuration object
//with the specified momento.TransportStrategy
WithTransportStrategy(transportStrategy TransportStrategy) Configuration

// GetClientSideTimeout Returns the current configuration options for client side timeout with the Momento service
GetClientSideTimeout() time.Duration

// WithRetryStrategy Copy constructor for overriding TransportStrategy returns a new Configuration object
// with the specified momento.TransportStrategy
WithRetryStrategy(retryStrategy retry.Strategy) Configuration

// WithClientTimeout Copy constructor for overriding TransportStrategy client side timeout. Returns a new
//Configuration object with the specified momento.TransportStrategy using passed client side timeout.
WithClientTimeout(clientTimeout time.Duration) Configuration

// WithTransportStrategy Copy constructor for overriding TransportStrategy returns a new Configuration object
// with the specified momento.TransportStrategy
WithTransportStrategy(transportStrategy TransportStrategy) Configuration
}

type cacheConfiguration struct {
loggerFactory logger.MomentoLoggerFactory
transportStrategy TransportStrategy
retryStrategy retry.Strategy
}

func (s *cacheConfiguration) GetLoggerFactory() logger.MomentoLoggerFactory {
Expand All @@ -47,23 +58,37 @@ func NewCacheConfiguration(props *ConfigurationProps) Configuration {
return &cacheConfiguration{
loggerFactory: props.LoggerFactory,
transportStrategy: props.TransportStrategy,
retryStrategy: props.RetryStrategy,
}
}

func (s *cacheConfiguration) GetTransportStrategy() TransportStrategy {
return s.transportStrategy
}

func (s *cacheConfiguration) WithTransportStrategy(transportStrategy TransportStrategy) Configuration {
func (s *cacheConfiguration) GetRetryStrategy() retry.Strategy {
return s.retryStrategy
}

func (s *cacheConfiguration) WithClientTimeout(clientTimeout time.Duration) Configuration {
return &cacheConfiguration{
loggerFactory: s.loggerFactory,
transportStrategy: transportStrategy,
transportStrategy: s.transportStrategy.WithClientTimeout(clientTimeout),
retryStrategy: s.retryStrategy,
}
}

func (s *cacheConfiguration) WithClientTimeout(clientTimeout time.Duration) Configuration {
func (s *cacheConfiguration) WithRetryStrategy(strategy retry.Strategy) Configuration {
return &cacheConfiguration{
loggerFactory: s.loggerFactory,
transportStrategy: s.transportStrategy.WithClientTimeout(clientTimeout),
transportStrategy: s.transportStrategy,
retryStrategy: strategy,
}
}
func (s *cacheConfiguration) WithTransportStrategy(transportStrategy TransportStrategy) Configuration {
return &cacheConfiguration{
loggerFactory: s.loggerFactory,
transportStrategy: transportStrategy,
retryStrategy: s.retryStrategy,
}
}
20 changes: 11 additions & 9 deletions config/configurations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"time"

"github.com/momentohq/client-sdk-go/internal/retry"

"github.com/momentohq/client-sdk-go/config/logger"
)

Expand All @@ -18,6 +20,7 @@ func LaptopLatest(loggerFactory ...logger.MomentoLoggerFactory) Configuration {
deadline: 5 * time.Second,
}),
}),
RetryStrategy: retry.NewFixedCountRetryStrategy(defaultLoggerFactory),
})
}

Expand All @@ -26,14 +29,13 @@ func InRegionLatest(loggerFactory ...logger.MomentoLoggerFactory) Configuration
if len(loggerFactory) != 0 {
defaultLoggerFactory = loggerFactory[0]
}
return NewCacheConfiguration(
&ConfigurationProps{
LoggerFactory: defaultLoggerFactory,
TransportStrategy: NewStaticTransportStrategy(&TransportStrategyProps{
GrpcConfiguration: NewStaticGrpcConfiguration(&GrpcConfigurationProps{
deadline: 1100 * time.Millisecond,
}),
return NewCacheConfiguration(&ConfigurationProps{
LoggerFactory: defaultLoggerFactory,
TransportStrategy: NewStaticTransportStrategy(&TransportStrategyProps{
GrpcConfiguration: NewStaticGrpcConfiguration(&GrpcConfigurationProps{
deadline: 1100 * time.Millisecond,
}),
},
)
}),
RetryStrategy: retry.NewFixedCountRetryStrategy(defaultLoggerFactory),
})
}
Binary file removed images/gopher.png
Binary file not shown.
6 changes: 5 additions & 1 deletion internal/grpcmanagers/control_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ func NewScsControlGrpcManager(request *models.ControlGrpcManagerRequest) (*ScsCo
}
authToken := request.CredentialProvider.GetAuthToken()
endpoint := fmt.Sprint(request.CredentialProvider.GetControlEndpoint(), ControlPort)
conn, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(credentials.NewTLS(config)), grpc.WithDisableRetry(), grpc.WithUnaryInterceptor(interceptor.AddHeadersInterceptor(authToken)))
conn, err := grpc.Dial(
endpoint,
grpc.WithTransportCredentials(credentials.NewTLS(config)),
grpc.WithUnaryInterceptor(interceptor.AddHeadersInterceptor(authToken)),
)
if err != nil {
return nil, momentoerrors.ConvertSvcErr(err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/grpcmanagers/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewUnaryDataGrpcManager(request *models.DataGrpcManagerRequest) (*DataGrpcM
conn, err := grpc.Dial(
endpoint,
grpc.WithTransportCredentials(credentials.NewTLS(config)),
grpc.WithDisableRetry(),
grpc.WithUnaryInterceptor(interceptor.AddUnaryRetryInterceptor(request.RetryStrategy)),
grpc.WithUnaryInterceptor(interceptor.AddHeadersInterceptor(authToken)),
)
if err != nil {
Expand All @@ -37,7 +37,7 @@ func NewUnaryDataGrpcManager(request *models.DataGrpcManagerRequest) (*DataGrpcM
return &DataGrpcManager{Conn: conn}, nil
}

func NewStreamDataGrpcManager(request *models.DataGrpcManagerRequest) (*DataGrpcManager, momentoerrors.MomentoSvcErr) {
func NewStreamDataGrpcManager(request *models.DataStreamGrpcManagerRequest) (*DataGrpcManager, momentoerrors.MomentoSvcErr) {
config := &tls.Config{
InsecureSkipVerify: false,
}
Expand Down
44 changes: 44 additions & 0 deletions internal/interceptor/retry_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package interceptor

import (
"context"
"time"

"github.com/momentohq/client-sdk-go/internal/retry"

"google.golang.org/grpc"
"google.golang.org/grpc/status"
)

func AddUnaryRetryInterceptor(s retry.Strategy) func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
attempt := 1
for {

// Execute api call
lastErr := invoker(ctx, method, req, reply, cc, opts...)
if lastErr == nil {
// Success no error returned stop interceptor
return nil
}

// Check retry eligibility based off last error received
retryBackoffTime := s.DetermineWhenToRetry(retry.StrategyProps{
GrpcStatusCode: status.Code(lastErr),
GrpcMethod: method,
AttemptNumber: attempt,
})

if retryBackoffTime == nil {
// If nil backoff time don't retry just return last error received
return lastErr
}

// Sleep for recommended time interval and increment attempts before trying again
if *retryBackoffTime > 0 {
time.Sleep(time.Duration(*retryBackoffTime) * time.Second)
}
attempt++
}
}
}
8 changes: 8 additions & 0 deletions internal/models/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ package models
import (
"time"

"github.com/momentohq/client-sdk-go/internal/retry"

"github.com/momentohq/client-sdk-go/auth"
"github.com/momentohq/client-sdk-go/config"
)

type ControlGrpcManagerRequest struct {
CredentialProvider auth.CredentialProvider
RetryStrategy retry.Strategy
}

type DataGrpcManagerRequest struct {
CredentialProvider auth.CredentialProvider
RetryStrategy retry.Strategy
}

type DataStreamGrpcManagerRequest struct {
CredentialProvider auth.CredentialProvider
}

type LocalDataGrpcManagerRequest struct {
Expand Down
56 changes: 56 additions & 0 deletions internal/retry/eligibility_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package retry

import "google.golang.org/grpc/codes"

type EligibilityStrategy interface {
// IsEligibleForRetry Determines whether a grpc call is able to be retried. The determination is based on the result
// of the last invocation of the call and whether the call is idempotent.
IsEligibleForRetry(props StrategyProps) bool
}

var retryableStatusCodes = map[codes.Code]bool{
codes.Internal: true,
codes.Unavailable: true,
}

var retryableRequestMethods = map[string]bool{
"/cache_client.Scs/Set": true,
"/cache_client.Scs/Get": true,
"/cache_client.Scs/Delete": true,
// not idempotent "/cache_client.Scs/Increment"
"/cache_client.Scs/DictionarySet": true,
// not idempotent: "/cache_client.Scs/DictionaryIncrement",
"/cache_client.Scs/DictionaryGet": true,
"/cache_client.Scs/DictionaryFetch": true,
"/cache_client.Scs/DictionaryDelete": true,
"/cache_client.Scs/SetUnion": true,
"/cache_client.Scs/SetDifference": true,
"/cache_client.Scs/SetFetch": true,
// not idempotent: "/cache_client.Scs/SetIfNotExists"
// not idempotent: "/cache_client.Scs/ListPushFront",
// not idempotent: "/cache_client.Scs/ListPushBack",
// not idempotent: "/cache_client.Scs/ListPopFront",
// not idempotent: "/cache_client.Scs/ListPopBack",
"/cache_client.Scs/ListFetch": true,
// Warning: in the future, this may not be idempotent
// Currently it supports removing all occurrences of a value.
// In the future, we may also add "the first/last N occurrences of a value".
// In the latter case it is not idempotent.
"/cache_client.Scs/ListRemove": true,
"/cache_client.Scs/ListLength": true,
// not idempotent: "/cache_client.Scs/ListConcatenateFront",
// not idempotent: "/cache_client.Scs/ListConcatenateBack"
}

type DefaultEligibilityStrategy struct{}

func (s DefaultEligibilityStrategy) IsEligibleForRetry(props StrategyProps) bool {
if !retryableStatusCodes[props.GrpcStatusCode] {
return false
}

if !retryableRequestMethods[props.GrpcMethod] {
return false
}
return true
}
79 changes: 79 additions & 0 deletions internal/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package retry

import (
"strconv"

"github.com/momentohq/client-sdk-go/config/logger"
"google.golang.org/grpc/codes"
)

type StrategyProps struct {
GrpcStatusCode codes.Code
GrpcMethod string
AttemptNumber int
}
type Strategy interface {
// DetermineWhenToRetry Determines whether a grpc call can be retried and how long to wait before that retry.
//
// RetryableProps - Information about the grpc call, its last invocation, and how many times the call
// has been made.
//
// Returns The time in seconds before the next retry should occur or nil if no retry should be attempted.
DetermineWhenToRetry(props StrategyProps) *int
}

type fixedCountRetryStrategy struct {
eligibilityStrategy EligibilityStrategy
maxAttempts int
log logger.MomentoLogger
}

func NewFixedCountRetryStrategy(logFactory logger.MomentoLoggerFactory) Strategy {
return fixedCountRetryStrategy{
eligibilityStrategy: DefaultEligibilityStrategy{},
maxAttempts: 3,
log: logFactory.GetLogger("fixed-count-retry-strategy"),
}
}

func (r fixedCountRetryStrategy) WithMaxAttempts(attempts int) Strategy {
r.maxAttempts = attempts
return r
}

func (r fixedCountRetryStrategy) WithEligibilityStrategy(s EligibilityStrategy) Strategy {
r.eligibilityStrategy = s
return r
}

func (r fixedCountRetryStrategy) DetermineWhenToRetry(props StrategyProps) *int {
if !r.eligibilityStrategy.IsEligibleForRetry(props) {
r.log.Debug(
"Request is not retryable",
"method", props.GrpcMethod,
"status", props.GrpcStatusCode.String(),
)
return nil
}

if props.AttemptNumber > r.maxAttempts {
r.log.Debug(
"Exceeded max retry attempts not retrying",
"method", props.GrpcMethod,
"status", props.GrpcStatusCode.String(),
"attempt_count", strconv.Itoa(props.AttemptNumber),
"max_attempts", strconv.Itoa(r.maxAttempts),
)
return nil
}

r.log.Debug(
"Determined request is retryable retrying now",
"method", props.GrpcMethod,
"status", props.GrpcStatusCode.String(),
"attempt_count", strconv.Itoa(props.AttemptNumber),
"max_attempts", strconv.Itoa(r.maxAttempts),
)
timeTilNextRetry := 0
return &timeTilNextRetry
}
1 change: 1 addition & 0 deletions internal/services/scs_control_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type ScsControlClient struct {
func NewScsControlClient(request *models.ControlClientRequest) (*ScsControlClient, momentoerrors.MomentoSvcErr) {
controlManager, err := grpcmanagers.NewScsControlGrpcManager(&models.ControlGrpcManagerRequest{
CredentialProvider: request.CredentialProvider,
RetryStrategy: request.Configuration.GetRetryStrategy(),
})
if err != nil {
return nil, momentoerrors.ConvertSvcErr(err)
Expand Down
3 changes: 2 additions & 1 deletion momento/pubsub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ type pubSubClient struct {
}

func newPubSubClient(request *models.PubSubClientRequest) (*pubSubClient, momentoerrors.MomentoSvcErr) {
streamDataManager, err := grpcmanagers.NewStreamDataGrpcManager(&models.DataGrpcManagerRequest{
streamDataManager, err := grpcmanagers.NewStreamDataGrpcManager(&models.DataStreamGrpcManagerRequest{
CredentialProvider: request.CredentialProvider,
})
if err != nil {
return nil, err
}
unaryDataManager, err := grpcmanagers.NewUnaryDataGrpcManager(&models.DataGrpcManagerRequest{
CredentialProvider: request.CredentialProvider,
RetryStrategy: request.Configuration.GetRetryStrategy(),
})
if err != nil {
return nil, err
Expand Down
Loading