Skip to content

Commit

Permalink
Unify OTLP exporter retry logic (#2095)
Browse files Browse the repository at this point in the history
* Add retry internal package

* Use retry package in connection

* Use retry package for otlpconfig

* Use the retry package in otlptracegrpc

* Use the retry package in otlptracehttp

* Add changes to CHANGELOG

* Lint internal

* Update otlptracehttp tests

* Update dependencies

* Add retry tests to otlptracehttp

* Remove TestRetry from otlptracehttp
  • Loading branch information
MrAlias authored Jul 22, 2021
1 parent abe2243 commit cb607b0
Show file tree
Hide file tree
Showing 16 changed files with 757 additions and 816 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Added `WithOSDescription` resource configuration option to set OS (Operating System) description resource attribute (`os.description`). (#1840)
- Added `WithOS` resource configuration option to set all OS (Operating System) resource attributes at once. (#1840)
- Added the `WithRetry` option to the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package.
This option is a replacement for the removed `WithMaxAttempts` and `WithBackoff` options. (#2095)
- Added API `LinkFromContext` to return Link which encapsulates SpanContext from provided context and also encapsulates attributes. (#2115)

### Changed

- The `SpanModels` function is now exported from the `go.opentelemetry.io/otel/exporters/zipkin` package to convert OpenTelemetry spans into Zipkin model spans. (#2027)
- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc".RetrySettings` to `RetryConfig`. (#2095)
- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp".RetrySettings` to `RetryConfig`. (#2095)

### Deprecated

Expand All @@ -28,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Removed the deprecated package `go.opentelemetry.io/otel/exporters/trace/zipkin`. (#2020)
- Removed the `"go.opentelemetry.io/otel/sdk/resource".WithBuiltinDetectors` function.
The explicit `With*` options for every built-in detector should be used instead. (#2026 #2097)
- Removed the `WithMaxAttempts` and `WithBackoff` options from the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package.
The retry logic of the package has been updated to match the `otlptracegrpc` package and accordingly a `WithRetry` option is added that should be used instead. (#2095)
- Removed metrics test package `go.opentelemetry.io/otel/sdk/export/metric/metrictest`. (#2105)

### Fixed
Expand Down
145 changes: 24 additions & 121 deletions exporters/otlp/otlptrace/internal/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,20 @@ package connection

import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/cenkalti/backoff/v4"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc/encoding/gzip"

"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand All @@ -48,6 +47,7 @@ type Connection struct {
// these fields are read-only after constructor is finished
cfg otlpconfig.Config
SCfg otlpconfig.SignalConfig
requestFunc retry.RequestFunc
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn)

Expand All @@ -66,6 +66,7 @@ func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler
c := new(Connection)
c.newConnectionHandler = handler
c.cfg = cfg
c.requestFunc = cfg.RetryConfig.RequestFunc(evaluate)
c.SCfg = sCfg
if len(c.SCfg.Headers) > 0 {
c.metadata = metadata.New(c.SCfg.Headers)
Expand Down Expand Up @@ -287,143 +288,45 @@ func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, cont
}

func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error {
expBackoff := newExponentialBackoff(c.cfg.RetrySettings)

for {
ctx, cancel := c.ContextWithStop(ctx)
defer cancel()
return c.requestFunc(ctx, func(ctx context.Context) error {
err := fn(ctx)
if err == nil {
// request succeeded.
return nil
}

if !c.cfg.RetrySettings.Enabled {
return err
}

// We have an error, check gRPC status code.
st := status.Convert(err)
if st.Code() == codes.OK {
// Not really an error, still success.
return nil
}

// Now, this is this a real error.

if !shouldRetry(st.Code()) {
// It is not a retryable error, we should not retry.
return err
}

// Need to retry.

throttle := getThrottleDuration(st)

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired: %w", err)
return err
}

var delay time.Duration

if backoffDelay > throttle {
delay = backoffDelay
} else {
if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime {
err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err)
return err
}

// Respect server throttling.
delay = throttle
}

// back-off, but get interrupted when shutting down or request is cancelled or timed out.
err = func() error {
dt := time.NewTimer(delay)
defer dt.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-c.stopCh:
return fmt.Errorf("interrupted due to shutdown: %w", err)
case <-dt.C:
}

// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
return nil
}()

if err != nil {
return err
}

}
return err
})
}

func shouldRetry(code codes.Code) bool {
switch code {
case codes.OK:
// Success. This function should not be called for this code, the best we
// can do is tell the caller not to retry.
return false

// evaluate returns if err is retry-able and a duration to wait for if an
// explicit throttle time is included in err.
func evaluate(err error) (bool, time.Duration) {
s := status.Convert(err)
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true

case codes.Unknown,
codes.InvalidArgument,
codes.Unauthenticated,
codes.PermissionDenied,
codes.NotFound,
codes.AlreadyExists,
codes.FailedPrecondition,
codes.Unimplemented,
codes.Internal:
// These are fatal errors, don't retry.
return false

default:
// Don't retry on unknown codes.
return false
return true, throttleDelay(s)
}

// Not a retry-able error.
return false, 0
}

func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(status *status.Status) time.Duration {
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
return t.RetryDelay.AsDuration()
}
}
return 0
}

func newExponentialBackoff(rs otlpconfig.RetrySettings) *backoff.ExponentialBackOff {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := &backoff.ExponentialBackOff{
InitialInterval: rs.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: rs.MaxInterval,
MaxElapsedTime: rs.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()

return expBackoff
}
Loading

0 comments on commit cb607b0

Please sign in to comment.