From facf2dc097c62b03a20d08f425c81ef69c352d6a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 25 Sep 2023 16:46:12 +0000 Subject: [PATCH 1/3] internal/backoff: add a helper to run a function with backoff --- internal/backoff/backoff.go | 30 ++++++++ orca/producer.go | 28 +++----- .../xdsclient/transport/loadreport.go | 69 +++++++------------ xds/internal/xdsclient/transport/transport.go | 48 ++++--------- 4 files changed, 79 insertions(+), 96 deletions(-) diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index 5fc0ee3da53b..484b50959c60 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -23,6 +23,7 @@ package backoff import ( + "context" "time" grpcbackoff "google.golang.org/grpc/backoff" @@ -71,3 +72,32 @@ func (bc Exponential) Backoff(retries int) time.Duration { } return time.Duration(backoff) } + +// RunF provides a convenient way to run a function f with a caller provided +// backoff. It runs f repeatedly, backing off when f returns false and resetting +// the backoff state when f returns true, until the context expires or f returns +// a non-nil error. +func RunF(ctx context.Context, f func() (bool, error), backoff func(int) time.Duration) { + attempt := 0 + timer := time.NewTimer(0) + for ctx.Err() == nil { + select { + case <-timer.C: + case <-ctx.Done(): + timer.Stop() + return + } + + reset, err := f() + if err != nil { + return + } + if reset { + timer.Reset(0) + attempt = 0 + } else { + timer.Reset(backoff(attempt)) + attempt++ + } + } +} diff --git a/orca/producer.go b/orca/producer.go index 2d58725547fc..119e9cb86b8c 100644 --- a/orca/producer.go +++ b/orca/producer.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/orca/internal" "google.golang.org/grpc/status" @@ -169,36 +170,20 @@ func (p *producer) updateRunLocked() { func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) { defer close(done) - backoffAttempt := 0 - backoffTimer := time.NewTimer(0) - for ctx.Err() == nil { - select { - case <-backoffTimer.C: - case <-ctx.Done(): - return - } - + runStream := func() (bool, error) { resetBackoff, err := p.runStream(ctx, interval) - - if resetBackoff { - backoffTimer.Reset(0) - backoffAttempt = 0 - } else { - backoffTimer.Reset(p.backoff(backoffAttempt)) - backoffAttempt++ - } - switch { case err == nil: // No error was encountered; restart the stream. + return true, nil case ctx.Err() != nil: // Producer was stopped; exit immediately and without logging an // error. - return + return false, ctx.Err() case status.Code(err) == codes.Unimplemented: // Unimplemented; do not retry. logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.") - return + return false, err case status.Code(err) == codes.Unavailable, status.Code(err) == codes.Canceled: // TODO: these codes should ideally log an error, too, but for now // we receive them when shutting down the ClientConn (Unavailable @@ -206,11 +191,14 @@ func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Du // mid-stream). Once we can determine the state or ensure the // producer is stopped before the stream ends, we can log an error // when it's not a natural shutdown. + return resetBackoff, nil default: // Log all other errors. logger.Error("Received unexpected stream error:", err) + return resetBackoff, nil } } + backoff.RunF(ctx, runStream, p.backoff) } // runStream runs a single stream on the subchannel and returns the resulting diff --git a/xds/internal/xdsclient/transport/loadreport.go b/xds/internal/xdsclient/transport/loadreport.go index 89ffc4fcec66..2989d881ac5f 100644 --- a/xds/internal/xdsclient/transport/loadreport.go +++ b/xds/internal/xdsclient/transport/loadreport.go @@ -25,6 +25,7 @@ import ( "time" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/xds/internal" @@ -100,54 +101,36 @@ func (t *Transport) lrsRunner(ctx context.Context) { node := proto.Clone(t.nodeProto).(*v3corepb.Node) node.ClientFeatures = append(node.ClientFeatures, "envoy.lrs.supports_send_all_clusters") - backoffAttempt := 0 - backoffTimer := time.NewTimer(0) - for ctx.Err() == nil { - select { - case <-backoffTimer.C: - case <-ctx.Done(): - backoffTimer.Stop() - return + runLoadReportStream := func() (bool, error) { + // streamCtx is created and canceled in case we terminate the stream + // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring + // goroutine. + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := v3lrsgrpc.NewLoadReportingServiceClient(t.cc).StreamLoadStats(streamCtx) + if err != nil { + t.logger.Warningf("Creating LRS stream to server %q failed: %v", t.serverURI, err) + return false, nil } + t.logger.Infof("Created LRS stream to server %q", t.serverURI) - // We reset backoff state when we successfully receive at least one - // message from the server. - resetBackoff := func() bool { - // streamCtx is created and canceled in case we terminate the stream - // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring - // goroutine. - streamCtx, cancel := context.WithCancel(ctx) - defer cancel() - stream, err := v3lrsgrpc.NewLoadReportingServiceClient(t.cc).StreamLoadStats(streamCtx) - if err != nil { - t.logger.Warningf("Creating LRS stream to server %q failed: %v", t.serverURI, err) - return false - } - t.logger.Infof("Created LRS stream to server %q", t.serverURI) - - if err := t.sendFirstLoadStatsRequest(stream, node); err != nil { - t.logger.Warningf("Sending first LRS request failed: %v", err) - return false - } - - clusters, interval, err := t.recvFirstLoadStatsResponse(stream) - if err != nil { - t.logger.Warningf("Reading from LRS stream failed: %v", err) - return false - } - - t.sendLoads(streamCtx, stream, clusters, interval) - return true - }() + if err := t.sendFirstLoadStatsRequest(stream, node); err != nil { + t.logger.Warningf("Sending first LRS request failed: %v", err) + return false, nil + } - if resetBackoff { - backoffTimer.Reset(0) - backoffAttempt = 0 - } else { - backoffTimer.Reset(t.backoff(backoffAttempt)) - backoffAttempt++ + clusters, interval, err := t.recvFirstLoadStatsResponse(stream) + if err != nil { + t.logger.Warningf("Reading from LRS stream failed: %v", err) + return false, nil } + + // We reset backoff state when we successfully receive at least one + // message from the server. + t.sendLoads(streamCtx, stream, clusters, interval) + return true, nil } + backoff.RunF(ctx, runLoadReportStream, t.backoff) } func (t *Transport) sendLoads(ctx context.Context, stream lrsStream, clusterNames []string, interval time.Duration) { diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index 86803588a7cc..085293ee273e 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -325,43 +325,25 @@ func (t *Transport) adsRunner(ctx context.Context) { go t.send(ctx) - backoffAttempt := 0 - backoffTimer := time.NewTimer(0) - for ctx.Err() == nil { - select { - case <-backoffTimer.C: - case <-ctx.Done(): - backoffTimer.Stop() - return + // We reset backoff state when we successfully receive at least one + // message from the server. + runStreamWithBackoff := func() (bool, error) { + stream, err := t.newAggregatedDiscoveryServiceStream(ctx, t.cc) + if err != nil { + t.onErrorHandler(err) + t.logger.Warningf("Creating new ADS stream failed: %v", err) + return false, nil } + t.logger.Infof("ADS stream created") - // We reset backoff state when we successfully receive at least one - // message from the server. - resetBackoff := func() bool { - stream, err := t.newAggregatedDiscoveryServiceStream(ctx, t.cc) - if err != nil { - t.onErrorHandler(err) - t.logger.Warningf("Creating new ADS stream failed: %v", err) - return false - } - t.logger.Infof("ADS stream created") - - select { - case <-t.adsStreamCh: - default: - } - t.adsStreamCh <- stream - return t.recv(stream) - }() - - if resetBackoff { - backoffTimer.Reset(0) - backoffAttempt = 0 - } else { - backoffTimer.Reset(t.backoff(backoffAttempt)) - backoffAttempt++ + select { + case <-t.adsStreamCh: + default: } + t.adsStreamCh <- stream + return t.recv(stream), nil } + backoff.RunF(ctx, runStreamWithBackoff, t.backoff) } // send is a separate goroutine for sending resource requests on the ADS stream. From 30a11acf5942176b1e1e942a71d1eb6cf2ed9b2a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 25 Sep 2023 20:34:29 +0000 Subject: [PATCH 2/3] change the signature of f --- internal/backoff/backoff.go | 32 +++++++++++-------- orca/producer.go | 31 ++++++++---------- .../xdsclient/transport/loadreport.go | 10 +++--- xds/internal/xdsclient/transport/transport.go | 10 ++++-- 4 files changed, 45 insertions(+), 38 deletions(-) diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index 484b50959c60..fed1c011a325 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -24,6 +24,7 @@ package backoff import ( "context" + "errors" "time" grpcbackoff "google.golang.org/grpc/backoff" @@ -73,11 +74,16 @@ func (bc Exponential) Backoff(retries int) time.Duration { return time.Duration(backoff) } -// RunF provides a convenient way to run a function f with a caller provided -// backoff. It runs f repeatedly, backing off when f returns false and resetting -// the backoff state when f returns true, until the context expires or f returns -// a non-nil error. -func RunF(ctx context.Context, f func() (bool, error), backoff func(int) time.Duration) { +// ErrResetBackoff is the error to be returned by the function executed by RunF, +// to instruct the latter to reset its backoff state. +var ErrResetBackoff = errors.New("reset backoff state") + +// RunF provides a convenient way to run a function f repeatedly until the +// context expires or f returns a non-nil error that is not ErrResetBackoff. +// When f returns ErrResetBackoff, RunF continues to run f, but resets its +// backoff state before doing so. backoff accepts an integer representing the +// number of retries, and returns the amount of time to backoff. +func RunF(ctx context.Context, f func() error, backoff func(int) time.Duration) { attempt := 0 timer := time.NewTimer(0) for ctx.Err() == nil { @@ -88,16 +94,16 @@ func RunF(ctx context.Context, f func() (bool, error), backoff func(int) time.Du return } - reset, err := f() - if err != nil { - return - } - if reset { + err := f() + if errors.Is(err, ErrResetBackoff) { timer.Reset(0) attempt = 0 - } else { - timer.Reset(backoff(attempt)) - attempt++ + continue + } + if err != nil { + return } + timer.Reset(backoff(attempt)) + attempt++ } } diff --git a/orca/producer.go b/orca/producer.go index 119e9cb86b8c..d19e79f2127f 100644 --- a/orca/producer.go +++ b/orca/producer.go @@ -170,33 +170,30 @@ func (p *producer) updateRunLocked() { func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) { defer close(done) - runStream := func() (bool, error) { + runStream := func() error { resetBackoff, err := p.runStream(ctx, interval) - switch { - case err == nil: - // No error was encountered; restart the stream. - return true, nil - case ctx.Err() != nil: - // Producer was stopped; exit immediately and without logging an - // error. - return false, ctx.Err() - case status.Code(err) == codes.Unimplemented: + if status.Code(err) == codes.Unimplemented { // Unimplemented; do not retry. logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.") - return false, err - case status.Code(err) == codes.Unavailable, status.Code(err) == codes.Canceled: + return err + } + // Retry for all other errors. + var retErr error + if resetBackoff { + retErr = backoff.ErrResetBackoff + } + if code := status.Code(err); code == codes.Unavailable || code == codes.Canceled { // TODO: these codes should ideally log an error, too, but for now // we receive them when shutting down the ClientConn (Unavailable // if the stream hasn't started yet, and Canceled if it happens // mid-stream). Once we can determine the state or ensure the // producer is stopped before the stream ends, we can log an error // when it's not a natural shutdown. - return resetBackoff, nil - default: - // Log all other errors. - logger.Error("Received unexpected stream error:", err) - return resetBackoff, nil + return retErr } + // Log all other errors. + logger.Error("Received unexpected stream error:", err) + return retErr } backoff.RunF(ctx, runStream, p.backoff) } diff --git a/xds/internal/xdsclient/transport/loadreport.go b/xds/internal/xdsclient/transport/loadreport.go index 2989d881ac5f..4b8ca29ce93f 100644 --- a/xds/internal/xdsclient/transport/loadreport.go +++ b/xds/internal/xdsclient/transport/loadreport.go @@ -101,7 +101,7 @@ func (t *Transport) lrsRunner(ctx context.Context) { node := proto.Clone(t.nodeProto).(*v3corepb.Node) node.ClientFeatures = append(node.ClientFeatures, "envoy.lrs.supports_send_all_clusters") - runLoadReportStream := func() (bool, error) { + runLoadReportStream := func() error { // streamCtx is created and canceled in case we terminate the stream // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring // goroutine. @@ -110,25 +110,25 @@ func (t *Transport) lrsRunner(ctx context.Context) { stream, err := v3lrsgrpc.NewLoadReportingServiceClient(t.cc).StreamLoadStats(streamCtx) if err != nil { t.logger.Warningf("Creating LRS stream to server %q failed: %v", t.serverURI, err) - return false, nil + return nil } t.logger.Infof("Created LRS stream to server %q", t.serverURI) if err := t.sendFirstLoadStatsRequest(stream, node); err != nil { t.logger.Warningf("Sending first LRS request failed: %v", err) - return false, nil + return nil } clusters, interval, err := t.recvFirstLoadStatsResponse(stream) if err != nil { t.logger.Warningf("Reading from LRS stream failed: %v", err) - return false, nil + return nil } // We reset backoff state when we successfully receive at least one // message from the server. t.sendLoads(streamCtx, stream, clusters, interval) - return true, nil + return backoff.ErrResetBackoff } backoff.RunF(ctx, runLoadReportStream, t.backoff) } diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index 085293ee273e..001552d7b479 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -327,12 +327,12 @@ func (t *Transport) adsRunner(ctx context.Context) { // We reset backoff state when we successfully receive at least one // message from the server. - runStreamWithBackoff := func() (bool, error) { + runStreamWithBackoff := func() error { stream, err := t.newAggregatedDiscoveryServiceStream(ctx, t.cc) if err != nil { t.onErrorHandler(err) t.logger.Warningf("Creating new ADS stream failed: %v", err) - return false, nil + return nil } t.logger.Infof("ADS stream created") @@ -341,7 +341,11 @@ func (t *Transport) adsRunner(ctx context.Context) { default: } t.adsStreamCh <- stream - return t.recv(stream), nil + msgReceived := t.recv(stream) + if msgReceived { + return backoff.ErrResetBackoff + } + return nil } backoff.RunF(ctx, runStreamWithBackoff, t.backoff) } From 5a452cd0ca2621d9a72a9c6a5b807527deafc291 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 26 Sep 2023 17:54:04 +0000 Subject: [PATCH 3/3] minor code simplification --- orca/producer.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/orca/producer.go b/orca/producer.go index d19e79f2127f..04edae6de66f 100644 --- a/orca/producer.go +++ b/orca/producer.go @@ -178,22 +178,19 @@ func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Du return err } // Retry for all other errors. - var retErr error - if resetBackoff { - retErr = backoff.ErrResetBackoff + if code := status.Code(err); code != codes.Unavailable && code != codes.Canceled { + // TODO: Unavailable and Canceled should also ideally log an error, + // but for now we receive them when shutting down the ClientConn + // (Unavailable if the stream hasn't started yet, and Canceled if it + // happens mid-stream). Once we can determine the state or ensure + // the producer is stopped before the stream ends, we can log an + // error when it's not a natural shutdown. + logger.Error("Received unexpected stream error:", err) } - if code := status.Code(err); code == codes.Unavailable || code == codes.Canceled { - // TODO: these codes should ideally log an error, too, but for now - // we receive them when shutting down the ClientConn (Unavailable - // if the stream hasn't started yet, and Canceled if it happens - // mid-stream). Once we can determine the state or ensure the - // producer is stopped before the stream ends, we can log an error - // when it's not a natural shutdown. - return retErr + if resetBackoff { + return backoff.ErrResetBackoff } - // Log all other errors. - logger.Error("Received unexpected stream error:", err) - return retErr + return nil } backoff.RunF(ctx, runStream, p.backoff) }