Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query-frontend: Performance Improvements and Refactor #3888

Merged
merged 11 commits into from
Jul 23, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Add vParquet4 support to the tempo-cli analyse blocks command [#3868](https://github.com/grafana/tempo/pull/3868) (@stoewer)
* [ENHANCEMENT] Improve trace id lookup from Tempo Vulture by selecting a date range [#3874](https://github.com/grafana/tempo/pull/3874) (@javiermolinar)
* [ENHANCEMENT] Add native histograms for internal metrics[#3870](https://github.com/grafana/tempo/pull/3870) (@zalegrala)
* [ENHANCEMENT] Reduce memory consumption of query-frontend[#3888](https://github.com/grafana/tempo/pull/3888) (@joe-elliott)
* [BUGFIX] Fix panic in certain metrics queries using `rate()` with `by` [#3847](https://github.com/grafana/tempo/pull/3847) (@stoewer)
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
* [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio)
Expand Down
27 changes: 16 additions & 11 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"

Expand All @@ -23,6 +22,13 @@ import (
"github.com/grafana/tempo/tempodb"
)

type RoundTripperFunc func(*http.Request) (*http.Response, error)

// RoundTrip implememnts http.RoundTripper
func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return fn(req)
}

// these handler funcs could likely be removed and the code written directly into the respective
// gRPC functions
type (
Expand Down Expand Up @@ -202,7 +208,7 @@ func (q *QueryFrontend) MetricsQueryInstant(req *tempopb.QueryInstantRequest, sr

// newSpanMetricsMiddleware creates a new frontend middleware to handle metrics-generator requests.
func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper {
return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
tenant, err := user.ExtractOrgID(req.Context())
if err != nil {
level.Error(logger).Log("msg", "metrics summary: failed to extract tenant id", "err", err)
Expand All @@ -212,14 +218,14 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR
Body: io.NopCloser(strings.NewReader(err.Error())),
}, nil
}
prepareRequestForQueriers(req, tenant, req.RequestURI, nil)
prepareRequestForQueriers(req, tenant)

level.Info(logger).Log(
"msg", "metrics summary request",
"tenant", tenant,
"path", req.URL.Path)

resps, err := next.RoundTrip(req)
resps, err := next.RoundTrip(pipeline.NewHTTPRequest(req))
if err != nil {
return nil, err
}
Expand All @@ -239,20 +245,19 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR
// prepareRequestForQueriers modifies the request so they will be farmed correctly to the queriers
// - adds the tenant header
// - sets the requesturi (see below for details)
func prepareRequestForQueriers(req *http.Request, tenant string, originalURI string, params url.Values) {
func prepareRequestForQueriers(req *http.Request, tenant string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this function pretty obscure. Why don't we return a new http.Request based on the original one?

func getQuerierHTTPRequest(origin_req *http.Request, tenant string) *http.Request{
}

That way we will reduce the cognitive overload reading the code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer modifying in place to reduce allocs

// set the tenant header
req.Header.Set(user.OrgIDHeaderName, tenant)

// build and set the request uri
// copy the url (which is correct) to the RequestURI
// we do this because dskit/common uses the RequestURI field to translate from http.Request to httpgrpc.Request
// https://github.com/grafana/dskit/blob/740f56bd293423c5147773ce97264519f9fddc58/httpgrpc/server/server.go#L59
// https://github.com/grafana/dskit/blob/f5bd38371e1cfae5479b2c23b3893c1a97868bdf/httpgrpc/httpgrpc.go#L53
const queryDelimiter = "?"

uri := path.Join(api.PathPrefixQuerier, originalURI)
if len(params) > 0 {
uri += queryDelimiter + params.Encode()
uri := path.Join(api.PathPrefixQuerier, req.URL.Path)
if len(req.URL.RawQuery) > 0 {
uri += queryDelimiter + req.URL.RawQuery
}

req.RequestURI = uri
}

Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/metrics_query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newQueryInstantStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTri
func newMetricsQueryInstantHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper {
postSLOHook := metricsSLOPostHook(cfg.Metrics.SLO)

return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
tenant, _ := user.ExtractOrgID(req.Context())
start := time.Now()

Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp
func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper {
postSLOHook := metricsSLOPostHook(cfg.Metrics.SLO)

return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
tenant, _ := user.ExtractOrgID(req.Context())
start := time.Now()

Expand Down
32 changes: 19 additions & 13 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg
})
}

func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) {
func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) {
r := pipelineRequest.HTTPRequest()

span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.QueryRangeSharder")
defer span.Finish()

Expand Down Expand Up @@ -112,10 +114,10 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin
}

generatorReq := s.generatorRequest(*req, r, tenantID, cutoff)
reqCh := make(chan *http.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics
reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics

if generatorReq != nil {
reqCh <- generatorReq
reqCh <- pipeline.NewHTTPRequest(generatorReq)
}

var (
Expand Down Expand Up @@ -171,7 +173,7 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
return metas
}

func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan pipeline.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand Down Expand Up @@ -244,7 +246,7 @@ func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID
return
}

func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request) {
func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan pipeline.Request) {
defer close(reqCh)

var (
Expand Down Expand Up @@ -281,18 +283,20 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten
shardR.ShardID = i
shardR.ShardCount = shards
httpReq := s.toUpstreamRequest(ctx, shardR, parent, tenantID)

pipelineR := pipeline.NewHTTPRequest(httpReq)
if samplingRate != 1.0 {
shardR.ShardID *= uint32(1.0 / samplingRate)
shardR.ShardCount *= uint32(1.0 / samplingRate)

// Set final sampling rate after integer rounding
samplingRate = float64(shards) / float64(shardR.ShardCount)

httpReq = pipeline.ContextAddResponseDataForResponse(samplingRate, httpReq)
pipelineR.SetResponseData(samplingRate)
}

select {
case reqCh <- httpReq:
case reqCh <- pipelineR:
case <-ctx.Done():
return
}
Expand All @@ -302,7 +306,7 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten
}
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan *http.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand Down Expand Up @@ -348,7 +352,7 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- *http.Request) {
func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- pipeline.Request) {
defer close(reqCh)

queryHash := hashForQueryRangeRequest(&searchReq)
Expand Down Expand Up @@ -404,15 +408,17 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
subR = api.BuildQueryRangeRequest(subR, queryRangeReq)
subR.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf)

prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query())
prepareRequestForQueriers(subR, tenantID)
pipelineR := pipeline.NewHTTPRequest(subR)

// TODO: Handle sampling rate
key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch))
if len(key) > 0 {
subR = pipeline.ContextAddCacheKey(key, subR)
pipelineR.SetCacheKey(key)
}

select {
case reqCh <- subR:
case reqCh <- pipelineR:
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -440,7 +446,7 @@ func (s *queryRangeSharder) toUpstreamRequest(ctx context.Context, req tempopb.Q
subR := parent.Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &req)

prepareRequestForQueriers(subR, tenantID, parent.URL.Path, subR.URL.Query())
prepareRequestForQueriers(subR, tenantID)
return subR
}

Expand Down
17 changes: 9 additions & 8 deletions modules/frontend/pipeline/async_handler_multitenant.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package pipeline

import (
"context"
"errors"
"net/http"
"strings"

"github.com/go-kit/log"
Expand Down Expand Up @@ -34,7 +32,7 @@ func NewMultiTenantMiddleware(logger log.Logger) AsyncMiddleware[combiner.Pipeli
})
}

func (t *tenantRoundTripper) RoundTrip(req *http.Request) (Responses[combiner.PipelineResponse], error) {
func (t *tenantRoundTripper) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) {
// extract tenant ids, this will normalize and de-duplicate tenant ids
tenants, err := t.resolver.TenantIDs(req.Context())
if err != nil {
Expand All @@ -51,21 +49,24 @@ func (t *tenantRoundTripper) RoundTrip(req *http.Request) (Responses[combiner.Pi
// join tenants for logger because list value type is unsupported.
_ = level.Debug(t.logger).Log("msg", "handling multi-tenant query", "tenants", strings.Join(tenants, ","))

return NewAsyncSharderFunc(req.Context(), 0, len(tenants), func(tenantIdx int) *http.Request {
return NewAsyncSharderFunc(req.Context(), 0, len(tenants), func(tenantIdx int) Request {
if tenantIdx >= len(tenants) {
return nil
}
return requestForTenant(req.Context(), req, tenants[tenantIdx])
return requestForTenant(req, tenants[tenantIdx])
}, t.next), nil
}

// requestForTenant makes a copy of request and injects the tenant id into context and Header.
// this allows us to keep all multi-tenant logic in query frontend and keep other components single tenant
func requestForTenant(ctx context.Context, r *http.Request, tenant string) *http.Request {
func requestForTenant(req Request, tenant string) Request {
r := req.HTTPRequest()
ctx := r.Context()

ctx = user.InjectOrgID(ctx, tenant)
rCopy := r.Clone(ctx)
rCopy.Header.Set(user.OrgIDHeaderName, tenant)
return rCopy
return NewHTTPRequest(rCopy)
}

type unsupportedRoundTripper struct {
Expand All @@ -85,7 +86,7 @@ func NewMultiTenantUnsupportedMiddleware(logger log.Logger) AsyncMiddleware[comb
})
}

func (t *unsupportedRoundTripper) RoundTrip(req *http.Request) (Responses[combiner.PipelineResponse], error) {
func (t *unsupportedRoundTripper) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) {
// extract tenant ids
tenants, err := t.resolver.TenantIDs(req.Context())
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions modules/frontend/pipeline/async_handler_multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMultiTenant(t *testing.T) {
trace := test.MakeTrace(10, traceID)

once := sync.Once{}
next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(req *http.Request) (Responses[combiner.PipelineResponse], error) {
next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(req Request) (Responses[combiner.PipelineResponse], error) {
reqCount.Inc() // Count the number of requests.

// Check if the tenant is in the list of tenants.
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestMultiTenant(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), tc.tenants)
req = req.WithContext(ctx)

resps, err := rt.RoundTrip(req)
resps, err := rt.RoundTrip(NewHTTPRequest(req))
require.NoError(t, err)

for {
Expand Down Expand Up @@ -153,12 +153,12 @@ func TestMultiTenantNotSupported(t *testing.T) {
}

test := NewMultiTenantUnsupportedMiddleware(log.NewNopLogger())
next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(req *http.Request) (Responses[combiner.PipelineResponse], error) {
next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(_ Request) (Responses[combiner.PipelineResponse], error) {
return NewSuccessfulResponse("foo"), nil
})

rt := test.Wrap(next)
resps, err := rt.RoundTrip(req)
resps, err := rt.RoundTrip(NewHTTPRequest(req))
require.NoError(t, err) // no error expected. tenant unsupported should be passed back as a bad request. errors bubble up as 5xx

r, done, err := resps.Next(context.Background())
Expand Down
4 changes: 1 addition & 3 deletions modules/frontend/pipeline/async_handler_noop.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package pipeline

import (
"net/http"

"github.com/grafana/tempo/modules/frontend/combiner"
)

// NewNoopMiddleware returns a middleware that is a passthrough only
func NewNoopMiddleware() AsyncMiddleware[combiner.PipelineResponse] {
return AsyncMiddlewareFunc[combiner.PipelineResponse](func(next AsyncRoundTripper[combiner.PipelineResponse]) AsyncRoundTripper[combiner.PipelineResponse] {
return AsyncRoundTripperFunc[combiner.PipelineResponse](func(req *http.Request) (Responses[combiner.PipelineResponse], error) {
return AsyncRoundTripperFunc[combiner.PipelineResponse](func(req Request) (Responses[combiner.PipelineResponse], error) {
return next.RoundTrip(req)
})
})
Expand Down
7 changes: 3 additions & 4 deletions modules/frontend/pipeline/async_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pipeline

import (
"context"
"net/http"
"sync"

"github.com/grafana/tempo/modules/frontend/combiner"
Expand All @@ -17,7 +16,7 @@ type waitGroup interface {

// NewAsyncSharderFunc creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse]. It creates one
// goroutine per concurrent request.
func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) *http.Request, next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] {
func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) Request, next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] {
var wg waitGroup
if concurrentReqs <= 0 {
wg = &sync.WaitGroup{}
Expand All @@ -43,7 +42,7 @@ func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, req
}

wg.Add(1)
go func(r *http.Request) {
go func(r Request) {
defer wg.Done()

resp, err := next.RoundTrip(r)
Expand All @@ -63,7 +62,7 @@ func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, req
}

// NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines.
func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan *http.Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] {
func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] {
if concurrentReqs == 0 {
panic("NewAsyncSharderChan: concurrentReqs must be greater than 0")
}
Expand Down
Loading
Loading