Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement query pushdown for a subset of aggregations #4917

Merged
merged 4 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4903](https://github.com/thanos-io/thanos/pull/4903) Compactor: Added tracing support for compaction.
- [#4909](https://github.com/thanos-io/thanos/pull/4909) Compactor: Add flag --max-time / --min-time to filter blocks that are ready to be compacted.
- [#4942](https://github.com/thanos-io/thanos/pull/4942) Tracing: add `traceid_128bit` support for jaeger.
- [#4917](https://github.com/thanos-io/thanos/pull/4917) Query: add initial query pushdown for a subset of aggregations. Can be enabled with `--enable-feature=query-pushdown` on Thanos Query.
- [#4888](https://github.com/thanos-io/thanos/pull/4888) Cache: support redis cache backend.

### Fixed
Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
queryPushdown = "query-pushdown"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -160,7 +161,7 @@ func registerQuery(app *extkingpin.App) {
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+" and "+promqlAtModifier+".").Default("").Strings()
featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+", "+promqlAtModifier+" and "+queryPushdown+".").Default("").Strings()

enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling.").
Hidden().Default("true").Bool()
Expand All @@ -181,14 +182,17 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "parse federation labels")
}

var enableNegativeOffset, enableAtModifier bool
var enableNegativeOffset, enableAtModifier, enableQueryPushdown bool
for _, feature := range *featureList {
if feature == promqlNegativeOffset {
enableNegativeOffset = true
}
if feature == promqlAtModifier {
enableAtModifier = true
}
if feature == queryPushdown {
enableQueryPushdown = true
}
}

httpLogOpts, err := logging.ParseHTTPOptions(*reqLogDecision, reqLogConfig)
Expand Down Expand Up @@ -278,6 +282,7 @@ func registerQuery(app *extkingpin.App) {
*webDisableCORS,
enableAtModifier,
enableNegativeOffset,
enableQueryPushdown,
*alertQueryURL,
component.Query,
)
Expand Down Expand Up @@ -346,6 +351,7 @@ func runQuery(
disableCORS bool,
enableAtModifier bool,
enableNegativeOffset bool,
enableQueryPushdown bool,
alertQueryURL string,
comp component.Component,
) error {
Expand Down Expand Up @@ -614,6 +620,7 @@ func runQuery(
enableTargetPartialResponse,
enableMetricMetadataPartialResponse,
enableExemplarPartialResponse,
enableQueryPushdown,
queryReplicaLabels,
flagsMap,
defaultRangeQueryStep,
Expand Down
3 changes: 2 additions & 1 deletion docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ Flags:
in all alerts 'Source' field.
--enable-feature= ... Comma separated experimental feature names to
enable.The current list of features is
promql-negative-offset and promql-at-modifier.
promql-negative-offset, promql-at-modifier and
query-pushdown.
--endpoint=<endpoint> ... Addresses of statically configured Thanos API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type QueryAPI struct {
enableTargetPartialResponse bool
enableMetricMetadataPartialResponse bool
enableExemplarPartialResponse bool
enableQueryPushdown bool
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
disableCORS bool

replicaLabels []string
Expand Down Expand Up @@ -120,6 +121,7 @@ func NewQueryAPI(
enableTargetPartialResponse bool,
enableMetricMetadataPartialResponse bool,
enableExemplarPartialResponse bool,
enableQueryPushdown bool,
replicaLabels []string,
flagsMap map[string]string,
defaultRangeQueryStep time.Duration,
Expand All @@ -146,6 +148,7 @@ func NewQueryAPI(
enableTargetPartialResponse: enableTargetPartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
enableExemplarPartialResponse: enableExemplarPartialResponse,
enableQueryPushdown: enableQueryPushdown,
replicaLabels: replicaLabels,
endpointStatus: endpointStatus,
defaultRangeQueryStep: defaultRangeQueryStep,
Expand Down Expand Up @@ -342,7 +345,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -459,7 +462,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false),
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -532,7 +535,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -619,7 +622,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true).
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -669,7 +672,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
31 changes: 27 additions & 4 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
duration := promauto.With(
extprom.WrapRegistererWithPrefix("concurrent_selects_", reg),
).NewHistogram(gate.DurationHistogramOpts)

return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
Expand All @@ -56,6 +56,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto
},
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
enableQueryPushdown: enableQueryPushdown,
}
}
}
Expand All @@ -72,11 +73,12 @@ type queryable struct {
gateProviderFn func() gate.Gate
maxConcurrentSelects int
selectTimeout time.Duration
enableQueryPushdown bool
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil
}

type querier struct {
Expand All @@ -90,6 +92,7 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
enableQueryPushdown bool
skipChunks bool
selectGate gate.Gate
selectTimeout time.Duration
Expand All @@ -106,7 +109,7 @@ func newQuerier(
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
partialResponse, skipChunks bool,
partialResponse, enableQueryPushdown bool, skipChunks bool,
selectGate gate.Gate,
selectTimeout time.Duration,
) *querier {
Expand Down Expand Up @@ -135,6 +138,7 @@ func newQuerier(
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
enableQueryPushdown: enableQueryPushdown,
}
}

Expand Down Expand Up @@ -193,6 +197,20 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) *storepb.QueryHints {
return &storepb.QueryHints{
StepMillis: hints.Step,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -268,12 +286,17 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// TODO(bwplotka): Use inprocess gRPC.
resp := &seriesServer{ctx: ctx}
var queryHints *storepb.QueryHints
if q.enableQueryPushdown {
queryHints = storeHintsFromPromHints(hints)
}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
QueryHints: queryHints,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
Step: hints.Step,
Expand Down
12 changes: 6 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand All @@ -71,7 +71,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
}

timeout := 10 * time.Second
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false)
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false, false)
engine := promql.NewEngine(
promql.EngineOpts{
MaxSamples: math.MaxInt32,
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) {
g := gate.New(2)
mq := &mockedQueryable{
Creator: func(mint, maxt int64) storage.Querier {
return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout)
},
}
t.Cleanup(func() {
Expand Down Expand Up @@ -606,7 +606,7 @@ func TestQuerier_Select(t *testing.T) {
{dedup: true, expected: []series{tcase.expectedAfterDedup}},
} {
g := gate.New(2)
q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout)
t.Cleanup(func() { testutil.Ok(t, q.Close()) })

t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) {
Expand Down Expand Up @@ -835,7 +835,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 100 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 5 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestQuerier_Proxy(t *testing.T) {
name: fmt.Sprintf("store number %v", i),
})
}
return q(true, nil, nil, 0, false, false)
return q(true, nil, nil, 0, false, false, false)
}

for _, fn := range files {
Expand Down
Loading