Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/backoff: add a helper to run a function with backoff #6661

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package backoff

import (
"context"
"time"

grpcbackoff "google.golang.org/grpc/backoff"
Expand Down Expand Up @@ -71,3 +72,32 @@ func (bc Exponential) Backoff(retries int) time.Duration {
}
return time.Duration(backoff)
}

// RunF provides a convenient way to run a function f with a caller provided
// backoff. It runs f repeatedly, backing off when f returns false and resetting
// the backoff state when f returns true, until the context expires or f returns
// a non-nil error.
func RunF(ctx context.Context, f func() (bool, error), backoff func(int) time.Duration) {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
dfawley marked this conversation as resolved.
Show resolved Hide resolved
attempt := 0
timer := time.NewTimer(0)
for ctx.Err() == nil {
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return
}

reset, err := f()
if err != nil {
return
}
if reset {
timer.Reset(0)
attempt = 0
} else {
timer.Reset(backoff(attempt))
attempt++
}
}
}
28 changes: 8 additions & 20 deletions orca/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -169,48 +170,35 @@ func (p *producer) updateRunLocked() {
func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) {
defer close(done)

backoffAttempt := 0
backoffTimer := time.NewTimer(0)
for ctx.Err() == nil {
select {
case <-backoffTimer.C:
case <-ctx.Done():
return
}

runStream := func() (bool, error) {
resetBackoff, err := p.runStream(ctx, interval)

if resetBackoff {
backoffTimer.Reset(0)
backoffAttempt = 0
} else {
backoffTimer.Reset(p.backoff(backoffAttempt))
backoffAttempt++
}

switch {
case err == nil:
// No error was encountered; restart the stream.
return true, nil
case ctx.Err() != nil:
// Producer was stopped; exit immediately and without logging an
// error.
return
return false, ctx.Err()
case status.Code(err) == codes.Unimplemented:
// Unimplemented; do not retry.
logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
return
return false, err
case status.Code(err) == codes.Unavailable, status.Code(err) == codes.Canceled:
// TODO: these codes should ideally log an error, too, but for now
// we receive them when shutting down the ClientConn (Unavailable
// if the stream hasn't started yet, and Canceled if it happens
// mid-stream). Once we can determine the state or ensure the
// producer is stopped before the stream ends, we can log an error
// when it's not a natural shutdown.
return resetBackoff, nil
default:
// Log all other errors.
logger.Error("Received unexpected stream error:", err)
return resetBackoff, nil
}
}
backoff.RunF(ctx, runStream, p.backoff)
}

// runStream runs a single stream on the subchannel and returns the resulting
Expand Down
69 changes: 26 additions & 43 deletions xds/internal/xdsclient/transport/loadreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal"
Expand Down Expand Up @@ -100,54 +101,36 @@ func (t *Transport) lrsRunner(ctx context.Context) {
node := proto.Clone(t.nodeProto).(*v3corepb.Node)
node.ClientFeatures = append(node.ClientFeatures, "envoy.lrs.supports_send_all_clusters")

backoffAttempt := 0
backoffTimer := time.NewTimer(0)
for ctx.Err() == nil {
select {
case <-backoffTimer.C:
case <-ctx.Done():
backoffTimer.Stop()
return
runLoadReportStream := func() (bool, error) {
// streamCtx is created and canceled in case we terminate the stream
// early for any reason, to avoid gRPC-Go leaking the RPC's monitoring
// goroutine.
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := v3lrsgrpc.NewLoadReportingServiceClient(t.cc).StreamLoadStats(streamCtx)
if err != nil {
t.logger.Warningf("Creating LRS stream to server %q failed: %v", t.serverURI, err)
return false, nil
}
t.logger.Infof("Created LRS stream to server %q", t.serverURI)

// We reset backoff state when we successfully receive at least one
// message from the server.
resetBackoff := func() bool {
// streamCtx is created and canceled in case we terminate the stream
// early for any reason, to avoid gRPC-Go leaking the RPC's monitoring
// goroutine.
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := v3lrsgrpc.NewLoadReportingServiceClient(t.cc).StreamLoadStats(streamCtx)
if err != nil {
t.logger.Warningf("Creating LRS stream to server %q failed: %v", t.serverURI, err)
return false
}
t.logger.Infof("Created LRS stream to server %q", t.serverURI)

if err := t.sendFirstLoadStatsRequest(stream, node); err != nil {
t.logger.Warningf("Sending first LRS request failed: %v", err)
return false
}

clusters, interval, err := t.recvFirstLoadStatsResponse(stream)
if err != nil {
t.logger.Warningf("Reading from LRS stream failed: %v", err)
return false
}

t.sendLoads(streamCtx, stream, clusters, interval)
return true
}()
if err := t.sendFirstLoadStatsRequest(stream, node); err != nil {
t.logger.Warningf("Sending first LRS request failed: %v", err)
return false, nil
}

if resetBackoff {
backoffTimer.Reset(0)
backoffAttempt = 0
} else {
backoffTimer.Reset(t.backoff(backoffAttempt))
backoffAttempt++
clusters, interval, err := t.recvFirstLoadStatsResponse(stream)
if err != nil {
t.logger.Warningf("Reading from LRS stream failed: %v", err)
return false, nil
}

// We reset backoff state when we successfully receive at least one
// message from the server.
t.sendLoads(streamCtx, stream, clusters, interval)
return true, nil
}
backoff.RunF(ctx, runLoadReportStream, t.backoff)
}

func (t *Transport) sendLoads(ctx context.Context, stream lrsStream, clusterNames []string, interval time.Duration) {
Expand Down
48 changes: 15 additions & 33 deletions xds/internal/xdsclient/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,43 +325,25 @@ func (t *Transport) adsRunner(ctx context.Context) {

go t.send(ctx)

backoffAttempt := 0
backoffTimer := time.NewTimer(0)
for ctx.Err() == nil {
select {
case <-backoffTimer.C:
case <-ctx.Done():
backoffTimer.Stop()
return
// We reset backoff state when we successfully receive at least one
// message from the server.
runStreamWithBackoff := func() (bool, error) {
stream, err := t.newAggregatedDiscoveryServiceStream(ctx, t.cc)
if err != nil {
t.onErrorHandler(err)
t.logger.Warningf("Creating new ADS stream failed: %v", err)
return false, nil
}
t.logger.Infof("ADS stream created")

// We reset backoff state when we successfully receive at least one
// message from the server.
resetBackoff := func() bool {
stream, err := t.newAggregatedDiscoveryServiceStream(ctx, t.cc)
if err != nil {
t.onErrorHandler(err)
t.logger.Warningf("Creating new ADS stream failed: %v", err)
return false
}
t.logger.Infof("ADS stream created")

select {
case <-t.adsStreamCh:
default:
}
t.adsStreamCh <- stream
return t.recv(stream)
}()

if resetBackoff {
backoffTimer.Reset(0)
backoffAttempt = 0
} else {
backoffTimer.Reset(t.backoff(backoffAttempt))
backoffAttempt++
select {
case <-t.adsStreamCh:
default:
}
t.adsStreamCh <- stream
return t.recv(stream), nil
}
backoff.RunF(ctx, runStreamWithBackoff, t.backoff)
}

// send is a separate goroutine for sending resource requests on the ADS stream.
Expand Down