Skip to content

Commit

Permalink
Support custom lookback delta from request for query api (thanos-io#5607
Browse files Browse the repository at this point in the history
)

* Update prometheus version to v0.38.0

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Add custom lookbackDelta from request support for query api

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Add change to CHANGELOG

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Fix tests

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Fix lints

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Replace engineFactory with lookbackDeltaFactory
Add custom lookback delta support in query grpc api

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Replace TestEngineFactory with TestLookbackDeltaFactory test

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Run formatter

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Fix manager_test.go

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Remove engineFactory and adjust comment

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Go fmt query.go

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Fix test and lints

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Remove self-assignment of engineOpts.ActiveQueryTracker

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Fix query.proto backwards compatibility

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Fix comment indentation

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

* Multiply maxResolution by 1000 in grpc api

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>

Signed-off-by: Oron Sharabi <oron.sh@coralogix.com>
Signed-off-by: Prakul Jain <prakul.jain@udaan.com>
  • Loading branch information
oronsh authored and prajain12 committed Sep 6, 2022
1 parent e27e60c commit 97cd4a3
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 294 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased
- [#5607](https://github.com/thanos-io/thanos/pull/5607) Query: Support custom lookback delta from request in query api.
- [#5453](https://github.com/thanos-io/thanos/pull/5453) Compact: Skip erroneous empty non `*AggrChunk` chunks during 1h downsampling of 5m resolution blocks.

### Fixed
Expand Down
79 changes: 21 additions & 58 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"fmt"
"math"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -586,8 +584,14 @@ func runQuery(
grpcProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)
engineCreator := engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta, activeQueryDir,
maxConcurrentQueries, logger)

// An active query tracker will be added only if the user specifies a non-default path.
// Otherwise, the nil active query tracker from existing engine options will be used.
if activeQueryDir != "" {
engineOpts.ActiveQueryTracker = promql.NewActiveQueryTracker(activeQueryDir, maxConcurrentQueries, logger)
}
engine := promql.NewEngine(engineOpts)
lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta)

// Start query API + UI HTTP server.
{
Expand Down Expand Up @@ -617,7 +621,8 @@ func runQuery(
api := apiv1.NewQueryAPI(
logger,
endpoints.GetEndpointStatus,
engineCreator,
engine,
lookbackDeltaCreator,
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
Expand Down Expand Up @@ -692,7 +697,7 @@ func runQuery(
info.WithQueryAPIInfoFunc(),
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, engine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
Expand Down Expand Up @@ -737,82 +742,40 @@ func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus
return deduplicated
}

// engineFactory creates from 1 to 3 promql.Engines depending on
// LookbackDeltaFactory creates from 1 to 3 lookback deltas depending on
// dynamicLookbackDelta and eo.LookbackDelta and returns a function
// that returns appropriate engine for given maxSourceResolutionMillis.
//
// TODO: it seems like a good idea to tweak Prometheus itself
// instead of creating several Engines here.
func engineFactory(
newEngine func(promql.EngineOpts) *promql.Engine,
// that returns appropriate lookback delta for given maxSourceResolutionMillis.
func LookbackDeltaFactory(
eo promql.EngineOpts,
dynamicLookbackDelta bool,
activeQueryDir string,
maxConcurrentQueries int,
logger log.Logger,
) func(int64) *promql.Engine {
) func(int64) time.Duration {
resolutions := []int64{downsample.ResLevel0}
if dynamicLookbackDelta {
resolutions = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2}
}
var (
engines = make([]*promql.Engine, len(resolutions))
ld = eo.LookbackDelta.Milliseconds()
lds = make([]time.Duration, len(resolutions))
ld = eo.LookbackDelta.Milliseconds()
)
wrapReg := func(engineNum int) prometheus.Registerer {
return extprom.WrapRegistererWith(map[string]string{"engine": strconv.Itoa(engineNum)}, eo.Reg)
}

lookbackDelta := eo.LookbackDelta
for i, r := range resolutions {
if ld < r {
lookbackDelta = time.Duration(r) * time.Millisecond
}

newEngineOpts := promql.EngineOpts{
Logger: eo.Logger,
Reg: wrapReg(i),
MaxSamples: eo.MaxSamples,
Timeout: eo.Timeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: eo.NoStepSubqueryIntervalFn,
EnableAtModifier: eo.EnableAtModifier,
EnableNegativeOffset: eo.EnableNegativeOffset,
}
// An active query tracker will be added only if the user specifies a non-default path.
// Otherwise, the nil active query tracker from existing engine options will be used.
if activeQueryDir != "" {
resActiveQueryDir := filepath.Join(activeQueryDir, getActiveQueryDirBasedOnResolution(r))
newEngineOpts.ActiveQueryTracker = promql.NewActiveQueryTracker(resActiveQueryDir, maxConcurrentQueries, logger)
} else {
newEngineOpts.ActiveQueryTracker = eo.ActiveQueryTracker
}

engines[i] = newEngine(newEngineOpts)
lds[i] = lookbackDelta
}
return func(maxSourceResolutionMillis int64) *promql.Engine {
return func(maxSourceResolutionMillis int64) time.Duration {
for i := len(resolutions) - 1; i >= 1; i-- {
left := resolutions[i-1]
if resolutions[i-1] < ld {
left = ld
}
if left < maxSourceResolutionMillis {
return engines[i]
return lds[i]
}
}
return engines[0]
}
}

func getActiveQueryDirBasedOnResolution(resolution int64) string {
if resolution == downsample.ResLevel0 {
return "raw"
}
if resolution == downsample.ResLevel1 {
return "5m"
}
if resolution == downsample.ResLevel2 {
return "1h"
return lds[0]
}
return ""
}
79 changes: 31 additions & 48 deletions cmd/thanos/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,15 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestEngineFactory(t *testing.T) {
var (
engineRaw = promql.NewEngine(promql.EngineOpts{})
engine5m = promql.NewEngine(promql.EngineOpts{LookbackDelta: 5 * time.Minute})
engine1h = promql.NewEngine(promql.EngineOpts{LookbackDelta: 1 * time.Hour})
)
mockNewEngine := func(opts promql.EngineOpts) *promql.Engine {
switch opts.LookbackDelta {
case 1 * time.Hour:
return engine1h
case 5 * time.Minute:
return engine5m
default:
return engineRaw
}
}
func TestLookbackDeltaFactory(t *testing.T) {
type testCase struct {
stepMillis int64
expect *promql.Engine
expect time.Duration
}
var (
minute = time.Minute.Milliseconds()
Expand All @@ -46,67 +30,66 @@ func TestEngineFactory(t *testing.T) {
lookbackDelta: 0,
dynamicLookbackDelta: false,
tcs: []testCase{
{0, engineRaw},
{5 * minute, engineRaw},
{1 * hour, engineRaw},
{0, time.Duration(0)},
{5 * minute, time.Duration(0)},
{1 * hour, time.Duration(0)},
},
},
{

lookbackDelta: 3 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{2 * minute, engineRaw},
{3 * minute, engineRaw},
{4 * minute, engine5m},
{5 * minute, engine5m},
{6 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
{2 * minute, time.Duration(3) * time.Minute},
{3 * minute, time.Duration(3) * time.Minute},
{4 * minute, time.Duration(5) * time.Minute},
{5 * minute, time.Duration(5) * time.Minute},
{6 * minute, time.Duration(1) * time.Hour},
{1 * hour, time.Duration(1) * time.Hour},
{2 * hour, time.Duration(1) * time.Hour},
},
},
{
lookbackDelta: 5 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engine5m},
{5 * minute, engine5m},
{6 * minute, engine1h},
{59 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
{0, time.Duration(5) * time.Minute},
{5 * minute, time.Duration(5) * time.Minute},
{6 * minute, time.Duration(1) * time.Hour},
{59 * minute, time.Duration(1) * time.Hour},
{1 * hour, time.Duration(1) * time.Hour},
{2 * hour, time.Duration(1) * time.Hour},
},
},
{
lookbackDelta: 30 * time.Minute,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engineRaw},
{5 * minute, engineRaw},
{30 * minute, engineRaw},
{31 * minute, engine1h},
{59 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
{0, time.Duration(30) * time.Minute},
{5 * minute, time.Duration(30) * time.Minute},
{30 * minute, time.Duration(30) * time.Minute},
{31 * minute, time.Duration(1) * time.Hour},
{59 * minute, time.Duration(1) * time.Hour},
{1 * hour, time.Duration(1) * time.Hour},
{2 * hour, time.Duration(1) * time.Hour},
},
},
{
lookbackDelta: 1 * time.Hour,
dynamicLookbackDelta: true,
tcs: []testCase{
{0, engine1h},
{5 * minute, engine1h},
{1 * hour, engine1h},
{2 * hour, engine1h},
{0, time.Duration(1) * time.Hour},
{5 * minute, time.Duration(1) * time.Hour},
{1 * hour, time.Duration(1) * time.Hour},
{2 * hour, time.Duration(1) * time.Hour},
},
},
}
)
for _, td := range tData {
e := engineFactory(mockNewEngine, promql.EngineOpts{LookbackDelta: td.lookbackDelta}, td.dynamicLookbackDelta,
"", 1, log.NewNopLogger())
lookbackCreate := LookbackDeltaFactory(promql.EngineOpts{LookbackDelta: td.lookbackDelta}, td.dynamicLookbackDelta)
for _, tc := range td.tcs {
got := e(tc.stepMillis)
got := lookbackCreate(tc.stepMillis)
testutil.Equals(t, tc.expect, got)
}
}
Expand Down
Loading

0 comments on commit 97cd4a3

Please sign in to comment.