Skip to content

Commit

Permalink
Add retry middleware in query-frontend
Browse files Browse the repository at this point in the history
Requests from the query-frontend to the querier might fail when a querier is starting or stopping. Simply retrying this request usually solves this.
  • Loading branch information
Koenraad Verheyden committed Jul 14, 2021
1 parent addfa70 commit c196775
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Microservices jsonnet: resource requests and limits can be set in `$._config`. [#793](https://github.com/grafana/tempo/pull/793) (@kvrhdn)
* [ENHANCEMENT] Add `-config.expand-env` cli flag to support environment variables expansion in config file. [#796](https://github.com/grafana/tempo/pull/796) (@Ashmita152)
* [ENHANCEMENT] Emit traces for ingester flush operations. [#812](https://github.com/grafana/tempo/pull/812) (@bboreham)
* [ENHANCEMENT] Add retry middleware in query-frontend. [#814](https://github.com/grafana/tempo/pull/814) (@kvrhdn)

## v1.0.1

Expand Down
4 changes: 4 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ The Query Frontend is responsible for sharding incoming requests for faster proc
# Query Frontend configuration block
query_frontend:
# number of times to retry a request sent to a querier
# (default: 5)
[max_retries: <int>]
# number of shards to split the query into
# (default: 2)
[query_shards: <int>]
Expand Down
2 changes: 2 additions & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type Config struct {
Config frontend.CombinedFrontendConfig `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
QueryShards int `yaml:"query_shards,omitempty"`
}

Expand All @@ -17,6 +18,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Config.DownstreamURL = ""
cfg.Config.Handler.LogQueriesLongerThan = 0
cfg.Config.FrontendV1.MaxOutstandingPerTenant = 100
cfg.MaxRetries = 5
cfg.QueryShards = 2
}

Expand Down
5 changes: 4 additions & 1 deletion modules/frontend/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ type spanIDDeduper struct {
// Do implements Handler
func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
span, _ := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs")
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs")
defer span.Finish()

// context propagation
req = req.WithContext(ctx)

resp, err := s.next.Do(req)
if err != nil {
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe
}, []string{"tenant"})

return func(next http.RoundTripper) http.RoundTripper {
// We're constructing middleware in this statement. There are two at the moment -
// - the rightmost one (executed first) is ShardingWare which helps to shard queries by splitting the block ID space
// - the leftmost one (executed last) is Deduper which dedupe Span IDs for Zipkin support
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger))
// We're constructing middleware in this statement, each middleware wraps the next one from left-to-right
// - the Deduper dedupes Span IDs for Zipkin support
// - the ShardingWare shards queries by splitting the block ID space
// - the RetryWare retries requests that have failed (error or http status 500)
rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger), RetryWare(cfg.MaxRetries, logger))

return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
start := time.Now()
// tracing instrumentation
Expand Down
5 changes: 4 additions & 1 deletion modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ type shardQuery struct {
// Do implements Handler
func (s shardQuery) Do(r *http.Request) (*http.Response, error) {
ctx := r.Context()
span, _ := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery")
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery")
defer span.Finish()

// context propagation
r = r.WithContext(ctx)

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
Expand Down
57 changes: 57 additions & 0 deletions modules/frontend/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package frontend

import (
"net/http"

"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
)

func RetryWare(maxRetries int, logger log.Logger) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return retryWare{
next: next,
logger: logger,
maxRetries: maxRetries,
}
})
}

type retryWare struct {
next Handler
logger log.Logger
maxRetries int
}

// Do implements Handler
func (r retryWare) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.Retry")
defer span.Finish()

// context propagation
req = req.WithContext(ctx)

triesLeft := r.maxRetries

for {
if ctx.Err() != nil {
return nil, ctx.Err()
}

resp, err := r.next.Do(req)

triesLeft--
if triesLeft == 0 {
return resp, err
}

if err == nil && resp.StatusCode/100 != 5 {
return resp, nil
}

span.LogFields(ot_log.String("msg", "error processing request"), ot_log.Int("try", triesLeft), ot_log.Error(err))
continue
}
}
130 changes: 130 additions & 0 deletions modules/frontend/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package frontend

import (
"context"
"errors"
"net/http"
"net/http/httptest"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type HandlerFunc func(req *http.Request) (*http.Response, error)

// Wrap implements Handler.
func (q HandlerFunc) Do(req *http.Request) (*http.Response, error) {
return q(req)
}

func TestRetry(t *testing.T) {
var try atomic.Int32

for _, tc := range []struct {
name string
handler Handler
expectedTries int32
expectedRes *http.Response
expectedErr error
}{
{
name: "retry until success",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
if try.Inc() == 5 {
return &http.Response{StatusCode: 200}, nil
}
return nil, errors.New("this request failed")
}),
expectedTries: 5,
expectedRes: &http.Response{StatusCode: 200},
expectedErr: nil,
},
{
name: "don't retry 400's",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
return &http.Response{StatusCode: 400}, nil
}),
expectedTries: 1,
expectedRes: &http.Response{StatusCode: 400},
expectedErr: nil,
},
{
name: "retry 500s",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
return &http.Response{StatusCode: 500}, nil
}),
expectedTries: 5,
expectedRes: &http.Response{StatusCode: 500},
expectedErr: nil,
},
{
name: "return last error",
handler: HandlerFunc(func(req *http.Request) (*http.Response, error) {
if try.Inc() == 5 {
return nil, errors.New("request failed")
}
return nil, errors.New("not the last request")
}),
expectedTries: 5,
expectedRes: nil,
expectedErr: errors.New("request failed"),
},
} {
t.Run(tc.name, func(t *testing.T) {
try.Store(0)

retryWare := RetryWare(5, log.NewNopLogger())
handler := retryWare.Wrap(tc.handler)

req := httptest.NewRequest("GET", "http://example.com", nil)

res, err := handler.Do(req)

require.Equal(t, tc.expectedTries, try.Load())
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedRes, res)
})
}
}

func TestRetry_CancelledRequest(t *testing.T) {
var try atomic.Int32

// request is cancelled before first call
ctx, cancel := context.WithCancel(context.Background())
cancel()

req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil)
require.NoError(t, err)

_, err = RetryWare(5, log.NewNopLogger()).
Wrap(HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
return nil, ctx.Err()
})).
Do(req)

require.Equal(t, int32(0), try.Load())
require.Equal(t, ctx.Err(), err)

// request is cancelled after first call
ctx, cancel = context.WithCancel(context.Background())

req, err = http.NewRequestWithContext(ctx, "GET", "http://example.com", nil)
require.NoError(t, err)

_, err = RetryWare(5, log.NewNopLogger()).
Wrap(HandlerFunc(func(req *http.Request) (*http.Response, error) {
try.Inc()
cancel()
return nil, errors.New("this request failed")
})).
Do(req)

require.Equal(t, int32(1), try.Load())
require.Equal(t, ctx.Err(), err)
}

0 comments on commit c196775

Please sign in to comment.