Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix retry code to handle grpc status codes. updated newer stats retries to be wrapped with spans #13592

Merged
merged 1 commit into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions pkg/querier/queryrange/queryrangebase/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/grafana/dskit/grpcutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"

"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand Down Expand Up @@ -97,8 +96,13 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
return nil, ctx.Err()
}

// Retry if we get a HTTP 500 or an unknown error.
if code := grpcutil.ErrorToStatusCode(err); code == codes.Unknown || code/100 == 5 {
code := grpcutil.ErrorToStatusCode(err)
// Error handling is tricky... There are many places we wrap any error and set an HTTP style status code
// but there are also places where we return an existing GRPC object which will use GRPC status codes
// If the code is < 100 it's a gRPC status code, currently we retry all of these, even codes.Canceled
// because when our pools close connections they do so with a cancel and we want to retry these
// If it's > 100, it's an HTTP code and we only retry 5xx
if code < 100 || code/100 == 5 {
lastErr = err
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
Expand All @@ -112,10 +116,13 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
"end_delta", time.Since(end),
"length", end.Sub(start),
"retry_in", bk.NextDelay(),
"code", code,
"err", err,
)
bk.Wait()
continue
} else {
level.Warn(util_log.WithContext(ctx, r.log)).Log("msg", "received an error but not a retryable code, this is possibly a bug.", "code", code, "err", err)
}

return nil, err
Expand Down
23 changes: 14 additions & 9 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,9 @@ func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log.
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := mw.Wrap(next)
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace)
statsHandler = rm.Wrap(statsHandler)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
}
splitter := newDefaultSplitter(limits, iqo)

Expand Down Expand Up @@ -559,9 +560,10 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -631,9 +633,10 @@ func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logg
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -874,9 +877,10 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -1003,9 +1007,10 @@ func NewInstantMetricTripperware(
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
tr := base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics)
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
statsHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(statsHandler)
retryNextHandler = queryrangebase.MergeMiddlewares(tr, rm).Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
Expand Down
Loading