Skip to content

Commit

Permalink
add connection retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
  • Loading branch information
Kavindu-Dodan committed Feb 8, 2023
1 parent b1f827a commit 0d5a588
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 22 deletions.
105 changes: 88 additions & 17 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import (
"context"
"fmt"
"io"
"math"
"strings"
"time"

"google.golang.org/grpc/credentials/insecure"

"buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1"
Expand All @@ -13,12 +17,21 @@ import (
"github.com/open-feature/flagd/pkg/sync"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate
// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
const Prefix = "grpc://"
const (
// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate
// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
Prefix = "grpc://"

// Connection retry constants
// Backoff period is calculated with backOffBase ^ #retry-iteration. However, when backoffLimit is reached, fallback
// to constantBackoffDelay

backoffLimit = 3
backOffBase = 4
constantBackoffDelay = 60
)

type Sync struct {
Target string
Expand All @@ -27,29 +40,88 @@ type Sync struct {
}

func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials()))
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}

// initial dial and connection. Failure here must result in a startup failure
dial, err := grpc.DialContext(ctx, g.Target, options...)
if err != nil {
g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error()))
g.Logger.Error(fmt.Sprintf("Error establishing grpc connection: %s", err.Error()))
return err
}

return g.streamListener(ctx, dial, dataSync)
serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial)
syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error()))
return err
}

// initial stream listening
err = g.streamListener(ctx, syncClient, dataSync)
if err != nil {
g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error()))
}

g.Logger.Warn(fmt.Sprintf("Connection re-establishment attempt in-progress for grpc target: %s", g.Target))

// retry connection establishment
for {
syncClient = g.connectWithRetry(ctx, options...)

err = g.streamListener(ctx, syncClient, dataSync)
if err != nil {
g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error()))
continue
}
}
}

// streamListener performs the grpc listening on provided client connection and push updates through dataSync channel
func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSync chan<- sync.DataSync) error {
group, localContext := errgroup.WithContext(ctx)
// connectWithRetry is a helper to perform exponential backoff till provided configurations and then retry connection
// periodically till a successful connection get established.
func (g *Sync) connectWithRetry(
ctx context.Context, options ...grpc.DialOption,
) syncv1grpc.FlagSyncService_SyncFlagsClient {
var i int

group.Go(func() error {
serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial)
for {
var sleep time.Duration

if i >= backoffLimit {
sleep = constantBackoffDelay
} else {
i++
sleep = time.Duration(math.Pow(backOffBase, float64(i)))
}

time.Sleep(sleep * time.Second)

dial, err := grpc.DialContext(ctx, g.Target, options...)
if err != nil {
g.Logger.Debug(fmt.Sprintf("Error dialing target: %s", err.Error()))
continue
}

serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial)
syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID})
if err != nil {
g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error()))
return err
g.Logger.Debug(fmt.Sprintf("Error openning service client: %s", err.Error()))
continue
}

return g.handleFlagSync(syncClient, dataSync)
g.Logger.Info(fmt.Sprintf("Connection re-established with grpc target: %s", g.Target))
return syncClient
}
}

// streamListener wraps the grpc listening on provided stream and push updates through dataSync channel
func (g *Sync) streamListener(
ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync,
) error {
group, localContext := errgroup.WithContext(ctx)
group.Go(func() error {
return g.handleFlagSync(stream, dataSync)
})

<-localContext.Done()
Expand All @@ -67,7 +139,6 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
for {
data, err := stream.Recv()
if err != nil {
g.Logger.Warn(fmt.Sprintf("Error with stream response: %s", err.Error()))
return err
}

Expand Down Expand Up @@ -107,7 +178,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
case v1.SyncState_SYNC_STATE_PING:
g.Logger.Debug("received server ping")
default:
g.Logger.Warn(fmt.Sprintf("receivied unknown state: %s", data.State.String()))
g.Logger.Debug(fmt.Sprintf("receivied unknown state: %s", data.State.String()))
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/sync/grpc/grpc_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func Test_StreamListener(t *testing.T) {
// start server
go serve(&bufServer)

grpcSync := Sync{
Target: target,
ProviderID: "",
Logger: logger.NewLogger(nil, false),
}

// initialize client
dial, err := grpc.Dial(target,
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
Expand All @@ -237,17 +243,17 @@ func Test_StreamListener(t *testing.T) {
t.Errorf("Error setting up client connection: %s", err.Error())
}

grpcSync := Sync{
Target: target,
ProviderID: "",
Logger: logger.NewLogger(nil, false),
serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial)
syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: grpcSync.ProviderID})
if err != nil {
t.Errorf("Error opening client stream: %s", err.Error())
}

syncChan := make(chan sync.DataSync, 1)

// listen to stream
go func() {
err := grpcSync.streamListener(context.Background(), dial, syncChan)
err := grpcSync.streamListener(context.Background(), syncClient, syncChan)
if err != nil {
// must ignore EOF as this is returned for stream end
if err != io.EOF {
Expand Down

0 comments on commit 0d5a588

Please sign in to comment.