Skip to content

Commit

Permalink
Correctly handle auto downsampling in codec (#3073)
Browse files Browse the repository at this point in the history
* correctly handle auto downsampling in codec

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* address comments

Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Aug 26, 2020
1 parent e9a9753 commit 1424f0c
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 44 deletions.
25 changes: 20 additions & 5 deletions pkg/queryfrontend/cache_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,32 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/querier/queryrange"

"github.com/thanos-io/thanos/pkg/compact/downsample"
)

// constSplitter is a utility for using a constant split interval when determining cache keys.
type constSplitter time.Duration
// thanosCacheKeyGenerator is a utility for using split interval when determining cache keys.
type thanosCacheKeyGenerator struct {
interval time.Duration
resolutions []int64
}

func newThanosCacheKeyGenerator(interval time.Duration) thanosCacheKeyGenerator {
return thanosCacheKeyGenerator{
interval: interval,
resolutions: []int64{downsample.ResLevel2, downsample.ResLevel1, downsample.ResLevel0},
}
}

// TODO(yeya24): Add other request params as request key.
// GenerateCacheKey generates a cache key based on the Request and interval.
func (t constSplitter) GenerateCacheKey(_ string, r queryrange.Request) string {
currentInterval := r.GetStart() / time.Duration(t).Milliseconds()
func (t thanosCacheKeyGenerator) GenerateCacheKey(_ string, r queryrange.Request) string {
currentInterval := r.GetStart() / t.interval.Milliseconds()
if tr, ok := r.(*ThanosRequest); ok {
return fmt.Sprintf("%s:%d:%d:%d", tr.Query, tr.Step, currentInterval, tr.MaxSourceResolution)
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
return fmt.Sprintf("%s:%d:%d:%d", tr.Query, tr.Step, currentInterval, i)
}
return fmt.Sprintf("%s:%d:%d", r.GetQuery(), r.GetStep(), currentInterval)
}
83 changes: 83 additions & 0 deletions pkg/queryfrontend/cache_splitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package queryfrontend

import (
"testing"

"github.com/cortexproject/cortex/pkg/querier/queryrange"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestGenerateCacheKey(t *testing.T) {
splitter := newThanosCacheKeyGenerator(hour)

for _, tc := range []struct {
name string
req queryrange.Request
expected string
}{
{
name: "non thanos req",
req: &queryrange.PrometheusRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
},
expected: "up:60000:0",
},
{
name: "non downsampling resolution specified",
req: &ThanosRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
},
expected: "up:60000:0:2",
},
{
name: "10s step",
req: &ThanosRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
},
expected: "up:10000:0:2",
},
{
name: "1m downsampling resolution",
req: &ThanosRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: 60 * seconds,
},
expected: "up:10000:0:2",
},
{
name: "5m downsampling resolution, different cache key",
req: &ThanosRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: 300 * seconds,
},
expected: "up:10000:0:1",
},
{
name: "1h downsampling resolution, different cache key",
req: &ThanosRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: hour,
},
expected: "up:10000:0:0",
},
} {
key := splitter.GenerateCacheKey("", tc.req)
testutil.Equals(t, tc.expected, key)
}
}
19 changes: 13 additions & 6 deletions pkg/queryfrontend/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,14 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req
return nil, err
}

result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue("max_source_resolution"))
if err != nil {
return nil, err
if r.FormValue("max_source_resolution") == "auto" {
result.AutoDownsampling = true
result.MaxSourceResolution = result.Step / 5
} else {
result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue("max_source_resolution"))
if err != nil {
return nil, err
}
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue("partial_response"), c.partialResponse)
Expand Down Expand Up @@ -118,9 +123,11 @@ func (c codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.R
"replicaLabels[]": thanosReq.ReplicaLabels,
}

// Add this param only if it is set. Set to 0 will impact
// auto-downsampling in the querier.
if thanosReq.MaxSourceResolution != 0 {
if thanosReq.AutoDownsampling {
params["max_source_resolution"] = []string{"auto"}
} else if thanosReq.MaxSourceResolution != 0 {
// Add this param only if it is set. Set to 0 will impact
// auto-downsampling in the querier.
params["max_source_resolution"] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)}
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/queryfrontend/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ func TestCodec_DecodeRequest(t *testing.T) {
partialResponse: false,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "negative max_source_resolution is not accepted. Try a positive integer"),
},
{
name: "auto downsampling enabled",
url: "/api/v1/query_range?start=123&end=456&step=10&max_source_resolution=auto",
expectedRequest: &ThanosRequest{
Path: "/api/v1/query_range",
Start: 123000,
End: 456000,
Step: 10000,
MaxSourceResolution: 2000,
AutoDownsampling: true,
Dedup: true,
StoreMatchers: [][]storepb.LabelMatcher{},
},
},
{
name: "cannot parse partial_response",
url: "/api/v1/query_range?start=123&end=456&step=1&partial_response=bar",
Expand Down
9 changes: 7 additions & 2 deletions pkg/queryfrontend/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ThanosRequest struct {
Query string
Dedup bool
PartialResponse bool
AutoDownsampling bool
MaxSourceResolution int64
ReplicaLabels []string
StoreMatchers [][]storepb.LabelMatcher
Expand Down Expand Up @@ -64,7 +65,7 @@ func (r *ThanosRequest) WithQuery(query string) queryrange.Request {

// LogToSpan writes information about this request to an OpenTracing span.
func (r *ThanosRequest) LogToSpan(sp opentracing.Span) {
sp.LogFields(
fields := []otlog.Field{
otlog.String("query", r.GetQuery()),
otlog.String("start", timestamp.Time(r.GetStart()).String()),
otlog.String("end", timestamp.Time(r.GetEnd()).String()),
Expand All @@ -73,7 +74,11 @@ func (r *ThanosRequest) LogToSpan(sp opentracing.Span) {
otlog.Bool("partial_response", r.PartialResponse),
otlog.Object("replicaLabels", r.ReplicaLabels),
otlog.Object("storeMatchers", r.StoreMatchers),
)
otlog.Bool("auto-downsampling", r.AutoDownsampling),
otlog.Int64("max_source_resolution (ms)", r.MaxSourceResolution),
}

sp.LogFields(fields...)
}

// Reset implements proto.Message interface required by queryrange.Request,
Expand Down
2 changes: 1 addition & 1 deletion pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewTripperWare(
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*cacheConfig,
constSplitter(splitQueryInterval),
newThanosCacheKeyGenerator(splitQueryInterval),
limits,
codec,
cacheExtractor,
Expand Down
51 changes: 21 additions & 30 deletions pkg/queryfrontend/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestRoundTripRetryMiddleware(t *testing.T) {
Step: 10 * seconds,
},
handlerAndResult: counter,
// not go through tripperware so no error.
// Not go through tripperware so no error.
expectedError: false,
expected: 1,
},
Expand All @@ -117,7 +117,7 @@ func TestRoundTripRetryMiddleware(t *testing.T) {
Step: 10 * seconds,
},
handlerAndResult: counter,
// not go through tripperware so no error.
// Not go through tripperware so no error.
expectedError: false,
expected: 1,
},
Expand Down Expand Up @@ -282,16 +282,16 @@ func TestRoundTripCacheMiddleware(t *testing.T) {
MaxSourceResolution: 1 * seconds,
}

// non query range request, won't be cached.
// Non query range request, won't be cached.
testRequest2 := &ThanosRequest{
Path: "/api/v1/query",
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
}

// same query params as testRequest, but with different maxSourceResolution,
// so won't be cached in this case.
// Same query params as testRequest, different maxSourceResolution
// but still in the same downsampling level, so it will be cached in this case.
testRequest3 := &ThanosRequest{
Path: "/api/v1/query_range",
Start: 0,
Expand All @@ -300,6 +300,16 @@ func TestRoundTripCacheMiddleware(t *testing.T) {
MaxSourceResolution: 10 * seconds,
}

// Same query params as testRequest, different maxSourceResolution
// and downsampling level so it won't be cached in this case.
testRequest4 := &ThanosRequest{
Path: "/api/v1/query_range",
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
MaxSourceResolution: 1 * hour,
}

cacheConf := &queryrange.ResultsCacheConfig{
CacheConfig: cortexcache.Config{
EnableFifoCache: true,
Expand Down Expand Up @@ -329,31 +339,12 @@ func TestRoundTripCacheMiddleware(t *testing.T) {
handlerAndResult func() (*int, http.Handler)
expected int
}{
{
name: "first request",
req: testRequest,
expected: 1,
},
{
name: "same request as the first one, directly use cache",
req: testRequest,
expected: 1,
},
{
name: "non query range request won't be cached",
req: testRequest2,
expected: 2,
},
{
name: "different max source resolution won't be cached",
req: testRequest3,
expected: 3,
},
{
name: "do it again",
req: testRequest2,
expected: 4,
},
{name: "first request", req: testRequest, expected: 1},
{name: "same request as the first one, directly use cache", req: testRequest, expected: 1},
{name: "non query range request won't be cached", req: testRequest2, expected: 2},
{name: "do it again", req: testRequest2, expected: 3},
{name: "different max source resolution but still same level", req: testRequest3, expected: 3},
{name: "different max source resolution and different level", req: testRequest4, expected: 4},
{
name: "request but will be partitioned",
req: &ThanosRequest{
Expand Down

0 comments on commit 1424f0c

Please sign in to comment.