From f5bc4823baf9b779352bb678a06dfd69a27492d0 Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Tue, 30 Nov 2021 16:26:50 +0100 Subject: [PATCH] Implement query pushdown for a subset of aggregations Certain aggregations can be executed safely on leaf nodes without worrying about data duplication or overlap. One such example is the max function which can be computed on local data by the leaves before it is computed globally by the querier. This commit implements local aggregation in the Prometheus sidecar for all functions which are safe to execute locally. The feature can be enabled by passing the `--enable-feature evaluate-queries` flag to the sidecar. Signed-off-by: fpetkovski --- CHANGELOG.md | 1 + cmd/thanos/query.go | 11 +- docs/components/query.md | 3 +- pkg/api/query/v1.go | 13 +- pkg/query/querier.go | 28 +- pkg/query/querier_test.go | 12 +- pkg/query/query_test.go | 2 +- pkg/store/prometheus.go | 80 +- pkg/store/proxy.go | 2 + pkg/store/storepb/custom.go | 24 + pkg/store/storepb/custom_test.go | 142 ++++ pkg/store/storepb/prompb/samples.go | 20 + pkg/store/storepb/query_hints.go | 48 ++ pkg/store/storepb/rpc.pb.go | 1106 ++++++++++++++++++++++++--- pkg/store/storepb/rpc.proto | 64 +- test/e2e/e2ethanos/services.go | 7 +- test/e2e/query_test.go | 215 +++++- 17 files changed, 1646 insertions(+), 132 deletions(-) create mode 100644 pkg/store/storepb/prompb/samples.go create mode 100644 pkg/store/storepb/query_hints.go diff --git a/CHANGELOG.md b/CHANGELOG.md index abd540da48d..041b47b6e6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.) ## Unreleased +- [#4917](https://github.com/thanos-io/thanos/pull/4917) Sidecar: Add an `evaluate-queries` feature to Thanos sidecar to enable local execution of certain queries. ### Added diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 38fb1bb8fc1..25f7c611333 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -58,6 +58,7 @@ import ( const ( promqlNegativeOffset = "promql-negative-offset" promqlAtModifier = "promql-at-modifier" + queryPushdown = "query-pushdown" ) // registerQuery registers a query command. @@ -160,7 +161,7 @@ func registerQuery(app *extkingpin.App) { enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling."). Hidden().Default("true").Bool() - featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+" and "+promqlAtModifier+".").Default("").Strings() + featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+", "+promqlAtModifier+" and "+queryPushdown+".").Default("").Strings() enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling."). Hidden().Default("true").Bool() @@ -181,7 +182,7 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "parse federation labels") } - var enableNegativeOffset, enableAtModifier bool + var enableNegativeOffset, enableAtModifier, enableQueryPushdown bool for _, feature := range *featureList { if feature == promqlNegativeOffset { enableNegativeOffset = true @@ -189,6 +190,9 @@ func registerQuery(app *extkingpin.App) { if feature == promqlAtModifier { enableAtModifier = true } + if feature == queryPushdown { + enableQueryPushdown = true + } } httpLogOpts, err := logging.ParseHTTPOptions(*reqLogDecision, reqLogConfig) @@ -278,6 +282,7 @@ func registerQuery(app *extkingpin.App) { *webDisableCORS, enableAtModifier, enableNegativeOffset, + enableQueryPushdown, *alertQueryURL, component.Query, ) @@ -346,6 +351,7 @@ func runQuery( disableCORS bool, enableAtModifier bool, enableNegativeOffset bool, + enableQueryPushdown bool, alertQueryURL string, comp component.Component, ) error { @@ -614,6 +620,7 @@ func runQuery( enableTargetPartialResponse, enableMetricMetadataPartialResponse, enableExemplarPartialResponse, + enableQueryPushdown, queryReplicaLabels, flagsMap, defaultRangeQueryStep, diff --git a/docs/components/query.md b/docs/components/query.md index 72d22a95322..d178e46e940 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -260,7 +260,8 @@ Flags: in all alerts 'Source' field. --enable-feature= ... Comma separated experimental feature names to enable.The current list of features is - promql-negative-offset and promql-at-modifier. + promql-negative-offset, promql-at-modifier and + query-pushdown. --endpoint= ... Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index b1ca77b5617..53986cfce8d 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -92,6 +92,7 @@ type QueryAPI struct { enableTargetPartialResponse bool enableMetricMetadataPartialResponse bool enableExemplarPartialResponse bool + enableQueryPushdown bool disableCORS bool replicaLabels []string @@ -120,6 +121,7 @@ func NewQueryAPI( enableTargetPartialResponse bool, enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, + enableQueryPushdown bool, replicaLabels []string, flagsMap map[string]string, defaultRangeQueryStep time.Duration, @@ -146,6 +148,7 @@ func NewQueryAPI( enableTargetPartialResponse: enableTargetPartialResponse, enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse, enableExemplarPartialResponse: enableExemplarPartialResponse, + enableQueryPushdown: enableQueryPushdown, replicaLabels: replicaLabels, endpointStatus: endpointStatus, defaultRangeQueryStep: defaultRangeQueryStep, @@ -342,7 +345,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) + qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), ts) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err} } @@ -459,7 +462,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap defer span.Finish() qry, err := qe.NewRangeQuery( - qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), + qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), start, end, @@ -532,7 +535,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true). + q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true). Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} @@ -619,7 +622,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr } - q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true). + q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true). Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} @@ -669,7 +672,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true). + q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true). Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 0684efd2c8e..97eb39fb05d 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -33,7 +33,7 @@ import ( // replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy. -type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable +type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { @@ -41,7 +41,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), ).NewHistogram(gate.DurationHistogramOpts) - return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { + return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable { return &queryable{ logger: logger, replicaLabels: replicaLabels, @@ -56,6 +56,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto }, maxConcurrentSelects: maxConcurrentSelects, selectTimeout: selectTimeout, + enableQueryPushdown: enableQueryPushdown, } } } @@ -72,11 +73,12 @@ type queryable struct { gateProviderFn func() gate.Gate maxConcurrentSelects int selectTimeout time.Duration + enableQueryPushdown bool } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil } type querier struct { @@ -90,6 +92,7 @@ type querier struct { deduplicate bool maxResolutionMillis int64 partialResponse bool + enableQueryPushdown bool skipChunks bool selectGate gate.Gate selectTimeout time.Duration @@ -106,7 +109,7 @@ func newQuerier( proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, - partialResponse, skipChunks bool, + partialResponse, enableQueryPushdown bool, skipChunks bool, selectGate gate.Gate, selectTimeout time.Duration, ) *querier { @@ -135,6 +138,7 @@ func newQuerier( maxResolutionMillis: maxResolutionMillis, partialResponse: partialResponse, skipChunks: skipChunks, + enableQueryPushdown: enableQueryPushdown, } } @@ -193,6 +197,20 @@ func aggrsFromFunc(f string) []storepb.Aggr { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} } +func storeHintsFromPromHints(hints *storage.SelectHints) storepb.QueryHints { + return storepb.QueryHints{ + StepMillis: hints.Step, + Func: &storepb.Func{ + Name: hints.Func, + }, + Grouping: &storepb.Grouping{ + By: hints.By, + Labels: hints.Grouping, + }, + Range: &storepb.Range{Millis: hints.Range}, + } +} + func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { if hints == nil { hints = &storage.SelectHints{ @@ -274,8 +292,10 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . Matchers: sms, MaxResolutionWindow: q.maxResolutionMillis, Aggregates: aggrs, + QueryHints: storeHintsFromPromHints(hints), PartialResponseDisabled: !q.partialResponse, SkipChunks: q.skipChunks, + EnableQueryPushdown: q.enableQueryPushdown, Step: hints.Step, Range: hints.Range, }, resp); err != nil { diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index be5dbd7fddd..bbeb1540174 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false) + queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false, false) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -71,7 +71,7 @@ func TestQuerier_DownsampledData(t *testing.T) { } timeout := 10 * time.Second - q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false) + q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false, false) engine := promql.NewEngine( promql.EngineOpts{ MaxSamples: math.MaxInt32, @@ -362,7 +362,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout) }, } t.Cleanup(func() { @@ -606,7 +606,7 @@ func TestQuerier_Select(t *testing.T) { {dedup: true, expected: []series{tcase.expectedAfterDedup}}, } { g := gate.New(2) - q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { @@ -835,7 +835,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -905,7 +905,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index b381e318e32..3b1d19d0049 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -54,7 +54,7 @@ func TestQuerier_Proxy(t *testing.T) { name: fmt.Sprintf("store number %v", i), }) } - return q(true, nil, nil, 0, false, false) + return q(true, nil, nil, 0, false, false, false) } for _, fn := range files { diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 69010107e76..7ca06146177 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -15,6 +15,9 @@ import ( "sort" "strings" "sync" + "time" + + "github.com/prometheus/common/model" "github.com/blang/semver/v4" "github.com/go-kit/log" @@ -27,9 +30,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/promclient" @@ -38,6 +38,8 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/tracing" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // PrometheusStore implements the store node API on top of the Prometheus remote read API. @@ -179,6 +181,10 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return nil } + if r.EnableQueryPushdown && r.QueryHints.IsSafeToExecute() { + return p.queryPrometheus(s, r) + } + q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} for _, m := range matchers { pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value} @@ -220,18 +226,78 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset) } -func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error { - ctx := s.Context() +func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error { + var matrix model.Matrix + + opts := promclient.QueryOptions{} + step := r.QueryHints.StepMillis / 1000 + if step != 0 { + result, _, err := p.client.QueryRange(s.Context(), p.base, r.ToPromQL(), r.MinTime, r.MaxTime, step, opts) + if err != nil { + return err + } + matrix = result + } else { + vector, _, err := p.client.QueryInstant(s.Context(), p.base, r.ToPromQL(), time.Unix(r.MaxTime/1000, 0), opts) + if err != nil { + return err + } + + matrix = make(model.Matrix, 0, len(vector)) + for _, sample := range vector { + matrix = append(matrix, &model.SampleStream{ + Metric: sample.Metric, + Values: []model.SamplePair{ + { + Timestamp: sample.Timestamp, + Value: sample.Value, + }, + }, + }) + } + } + + externalLbls := p.externalLabelsFn() + for _, vector := range matrix { + lbls := make([]labels.Label, 0, len(externalLbls)+len(vector.Metric)) + // Attach labels from samples + for k, v := range vector.Metric { + lbls = append(lbls, labels.FromStrings(string(k), string(v))...) + } + // Attach external labels for compatibility with remote read + lbls = append(lbls, externalLbls...) + + series := &prompb.TimeSeries{ + Labels: labelpb.ZLabelsFromPromLabels(lbls), + Samples: prompb.SamplesFromPromSamplePairs(vector.Values), + } + + chks, err := p.chunkSamples(series, MaxSamplesPerChunk) + if err != nil { + return err + } + if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ + Labels: series.Labels, + Chunks: chks, + })); err != nil { + return err + } + } + + return nil +} + +func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") - resp, err := p.fetchSampledResponse(ctx, httpResp) + resp, err := p.fetchSampledResponse(s.Context(), httpResp) querySpan.Finish() if err != nil { return err } - span, _ := tracing.StartSpan(ctx, "transform_and_respond") + span, _ := tracing.StartSpan(s.Context(), "transform_and_respond") defer span.Finish() span.SetTag("series_count", len(resp.Results[0].Timeseries)) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c6b0c098d86..63d12004185 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -275,6 +275,8 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe Aggregates: r.Aggregates, MaxResolutionWindow: r.MaxResolutionWindow, SkipChunks: r.SkipChunks, + QueryHints: r.QueryHints, + EnableQueryPushdown: r.EnableQueryPushdown, PartialResponseDisabled: r.PartialResponseDisabled, } wg = &sync.WaitGroup{} diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index d62b82dc3f7..eaa96f1ede6 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -517,3 +517,27 @@ func (c *SeriesStatsCounter) Count(series *Series) { } } } + +func (m *SeriesRequest) ToPromQL() string { + return m.QueryHints.toPromQL(m.Matchers) +} + +// IsSafeToExecute returns true if the function or aggregation from the query hint +// can be safely executed by the underlying Prometheus instance without affecting the +// result of the query. +func (m *QueryHints) IsSafeToExecute() bool { + distributiveOperations := []string{ + "max", + "max_over_time", + "min", + "min_over_time", + "group", + } + for _, op := range distributiveOperations { + if m.Func.Name == op { + return true + } + } + + return false +} diff --git a/pkg/store/storepb/custom_test.go b/pkg/store/storepb/custom_test.go index 8aae81ab22d..6930b188fff 100644 --- a/pkg/store/storepb/custom_test.go +++ b/pkg/store/storepb/custom_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sort" "testing" + "time" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" @@ -526,3 +527,144 @@ func TestMatchersToString_Translate(t *testing.T) { } } + +func TestSeriesRequestToPromQL(t *testing.T) { + ts := []struct { + name string + r *SeriesRequest + expected string + }{ + { + name: "Single matcher regular expression", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + }, + }, + expected: `max ({namespace=~"kube-.+"})`, + }, + { + name: "Single matcher regular expression with grouping", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + Grouping: &Grouping{ + By: false, + Labels: []string{"container", "pod"}, + }, + }, + }, + expected: `max without (container,pod) ({namespace=~"kube-.+"})`, + }, + { + name: "Multiple matchers with grouping", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_EQ, + Name: "__name__", + Value: "kube_pod_info", + }, + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + Grouping: &Grouping{ + By: false, + Labels: []string{"container", "pod"}, + }, + }, + }, + expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"})`, + }, + { + name: "Query with vector range selector", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_EQ, + Name: "__name__", + Value: "kube_pod_info", + }, + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max_over_time", + }, + Range: &Range{ + Millis: 10 * time.Minute.Milliseconds(), + }, + }, + }, + expected: `max_over_time ({__name__="kube_pod_info", namespace=~"kube-.+"}[600000ms])`, + }, + { + name: "Query with grouping and vector range selector", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_EQ, + Name: "__name__", + Value: "kube_pod_info", + }, + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + Grouping: &Grouping{ + By: false, + Labels: []string{"container", "pod"}, + }, + Range: &Range{ + Millis: 10 * time.Minute.Milliseconds(), + }, + }, + }, + expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"}[600000ms])`, + }, + } + + for _, tc := range ts { + t.Run(tc.name, func(t *testing.T) { + actual := tc.r.ToPromQL() + if tc.expected != actual { + t.Fatalf("invalid promql result, got %s, want %s", actual, tc.expected) + } + }) + } +} diff --git a/pkg/store/storepb/prompb/samples.go b/pkg/store/storepb/prompb/samples.go new file mode 100644 index 00000000000..675a137ec42 --- /dev/null +++ b/pkg/store/storepb/prompb/samples.go @@ -0,0 +1,20 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package prompb + +import "github.com/prometheus/common/model" + +// SamplesFromPromSamplePairs converts a slice of model.SamplePair +// to a slice of Sample. +func SamplesFromPromSamplePairs(samples []model.SamplePair) []Sample { + result := make([]Sample, 0, len(samples)) + for _, s := range samples { + result = append(result, Sample{ + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }) + } + + return result +} diff --git a/pkg/store/storepb/query_hints.go b/pkg/store/storepb/query_hints.go new file mode 100644 index 00000000000..d01cb49a5fb --- /dev/null +++ b/pkg/store/storepb/query_hints.go @@ -0,0 +1,48 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb + +import ( + "fmt" + "strings" +) + +func (m *QueryHints) toPromQL(labelMatchers []LabelMatcher) string { + grouping := m.Grouping.toPromQL() + matchers := MatchersToString(labelMatchers...) + queryRange := m.Range.toPromQL() + + query := fmt.Sprintf("%s %s (%s%s)", m.Func.Name, grouping, matchers, queryRange) + // Remove double spaces if some expressions are missing + return strings.Join(strings.Fields(query), " ") +} + +func (m *Grouping) toPromQL() string { + if m == nil { + return "" + } + + if len(m.Labels) == 0 { + return "" + } + var op string + if m.By { + op = "by" + } else { + op = "without" + } + + return fmt.Sprintf("%s (%s)", op, strings.Join(m.Labels, ",")) +} + +func (m *Range) toPromQL() string { + if m == nil { + return "" + } + + if m.Millis == 0 { + return "" + } + return fmt.Sprintf("[%dms]", m.Millis) +} diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 745686a208b..28836dc693c 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -281,9 +281,17 @@ type SeriesRequest struct { // implementation of a specific store. Hints *types.Any `protobuf:"bytes,9,opt,name=hints,proto3" json:"hints,omitempty"` // Query step size in milliseconds. + // Deprecated: Use query_hints instead. Step int64 `protobuf:"varint,10,opt,name=step,proto3" json:"step,omitempty"` // Range vector selector range in milliseconds. + // Deprecated: Use query_hints instead. Range int64 `protobuf:"varint,11,opt,name=range,proto3" json:"range,omitempty"` + // Indicates whether the store should evaluate queries + // before sending back samples. + EnableQueryPushdown bool `protobuf:"varint,12,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` + // query_hints are the hints coming from the PromQL engine when + // requesting a storage.SeriesSet for a given expression. + QueryHints QueryHints `protobuf:"bytes,13,opt,name=query_hints,json=queryHints,proto3" json:"query_hints"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } @@ -319,6 +327,166 @@ func (m *SeriesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_SeriesRequest proto.InternalMessageInfo +// Analogous to storage.SelectHints. +type QueryHints struct { + // Query step size in milliseconds. + StepMillis int64 `protobuf:"varint,1,opt,name=step_millis,json=stepMillis,proto3" json:"step_millis,omitempty"` + // The surrounding function or aggregation. + Func *Func `protobuf:"bytes,2,opt,name=func,proto3" json:"func,omitempty"` + // The grouping expression + Grouping *Grouping `protobuf:"bytes,4,opt,name=grouping,proto3" json:"grouping,omitempty"` + // Range vector selector. + Range *Range `protobuf:"bytes,5,opt,name=range,proto3" json:"range,omitempty"` +} + +func (m *QueryHints) Reset() { *m = QueryHints{} } +func (m *QueryHints) String() string { return proto.CompactTextString(m) } +func (*QueryHints) ProtoMessage() {} +func (*QueryHints) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{5} +} +func (m *QueryHints) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryHints) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryHints.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryHints) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryHints.Merge(m, src) +} +func (m *QueryHints) XXX_Size() int { + return m.Size() +} +func (m *QueryHints) XXX_DiscardUnknown() { + xxx_messageInfo_QueryHints.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryHints proto.InternalMessageInfo + +type Func struct { + // The function or aggregation name + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` +} + +func (m *Func) Reset() { *m = Func{} } +func (m *Func) String() string { return proto.CompactTextString(m) } +func (*Func) ProtoMessage() {} +func (*Func) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{6} +} +func (m *Func) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Func) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Func.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Func) XXX_Merge(src proto.Message) { + xxx_messageInfo_Func.Merge(m, src) +} +func (m *Func) XXX_Size() int { + return m.Size() +} +func (m *Func) XXX_DiscardUnknown() { + xxx_messageInfo_Func.DiscardUnknown(m) +} + +var xxx_messageInfo_Func proto.InternalMessageInfo + +type Grouping struct { + // Indicate whether it is without or by. + By bool `protobuf:"varint,1,opt,name=by,proto3" json:"by,omitempty"` + // List of label names used in the grouping. + Labels []string `protobuf:"bytes,3,rep,name=labels,proto3" json:"labels,omitempty"` +} + +func (m *Grouping) Reset() { *m = Grouping{} } +func (m *Grouping) String() string { return proto.CompactTextString(m) } +func (*Grouping) ProtoMessage() {} +func (*Grouping) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{7} +} +func (m *Grouping) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Grouping) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Grouping.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Grouping) XXX_Merge(src proto.Message) { + xxx_messageInfo_Grouping.Merge(m, src) +} +func (m *Grouping) XXX_Size() int { + return m.Size() +} +func (m *Grouping) XXX_DiscardUnknown() { + xxx_messageInfo_Grouping.DiscardUnknown(m) +} + +var xxx_messageInfo_Grouping proto.InternalMessageInfo + +type Range struct { + Millis int64 `protobuf:"varint,1,opt,name=millis,proto3" json:"millis,omitempty"` +} + +func (m *Range) Reset() { *m = Range{} } +func (m *Range) String() string { return proto.CompactTextString(m) } +func (*Range) ProtoMessage() {} +func (*Range) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{8} +} +func (m *Range) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Range) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Range.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Range) XXX_Merge(src proto.Message) { + xxx_messageInfo_Range.Merge(m, src) +} +func (m *Range) XXX_Size() int { + return m.Size() +} +func (m *Range) XXX_DiscardUnknown() { + xxx_messageInfo_Range.DiscardUnknown(m) +} + +var xxx_messageInfo_Range proto.InternalMessageInfo + type SeriesResponse struct { // Types that are valid to be assigned to Result: // *SeriesResponse_Series @@ -331,7 +499,7 @@ func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } func (m *SeriesResponse) String() string { return proto.CompactTextString(m) } func (*SeriesResponse) ProtoMessage() {} func (*SeriesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{5} + return fileDescriptor_a938d55a388af629, []int{9} } func (m *SeriesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -434,7 +602,7 @@ func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } func (m *LabelNamesRequest) String() string { return proto.CompactTextString(m) } func (*LabelNamesRequest) ProtoMessage() {} func (*LabelNamesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{6} + return fileDescriptor_a938d55a388af629, []int{10} } func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -476,7 +644,7 @@ func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } func (m *LabelNamesResponse) String() string { return proto.CompactTextString(m) } func (*LabelNamesResponse) ProtoMessage() {} func (*LabelNamesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{7} + return fileDescriptor_a938d55a388af629, []int{11} } func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -523,7 +691,7 @@ func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (m *LabelValuesRequest) String() string { return proto.CompactTextString(m) } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{8} + return fileDescriptor_a938d55a388af629, []int{12} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -565,7 +733,7 @@ func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (m *LabelValuesResponse) String() string { return proto.CompactTextString(m) } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{9} + return fileDescriptor_a938d55a388af629, []int{13} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -602,6 +770,10 @@ func init() { proto.RegisterType((*InfoRequest)(nil), "thanos.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.InfoResponse") proto.RegisterType((*SeriesRequest)(nil), "thanos.SeriesRequest") + proto.RegisterType((*QueryHints)(nil), "thanos.QueryHints") + proto.RegisterType((*Func)(nil), "thanos.Func") + proto.RegisterType((*Grouping)(nil), "thanos.Grouping") + proto.RegisterType((*Range)(nil), "thanos.Range") proto.RegisterType((*SeriesResponse)(nil), "thanos.SeriesResponse") proto.RegisterType((*LabelNamesRequest)(nil), "thanos.LabelNamesRequest") proto.RegisterType((*LabelNamesResponse)(nil), "thanos.LabelNamesResponse") @@ -612,75 +784,86 @@ func init() { func init() { proto.RegisterFile("store/storepb/rpc.proto", fileDescriptor_a938d55a388af629) } var fileDescriptor_a938d55a388af629 = []byte{ - // 1083 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0xdf, 0x6e, 0x1a, 0xc7, - 0x17, 0x66, 0x59, 0x76, 0x81, 0x83, 0xed, 0xdf, 0x66, 0x8c, 0x9d, 0x35, 0x91, 0x30, 0x42, 0xfa, - 0x49, 0xc8, 0x4a, 0xa1, 0xa5, 0x55, 0xa4, 0x56, 0xb9, 0x01, 0x9b, 0xd6, 0x56, 0x63, 0xdc, 0x0e, - 0x26, 0x6e, 0x53, 0x55, 0x68, 0xc1, 0x93, 0x65, 0x65, 0xf6, 0x4f, 0x77, 0x86, 0xda, 0xdc, 0xb6, - 0xf7, 0x55, 0xd5, 0xa7, 0xe9, 0x23, 0xf8, 0xae, 0xb9, 0xac, 0x7a, 0x11, 0xb5, 0xf6, 0x03, 0xf4, - 0x15, 0xaa, 0x9d, 0x99, 0x05, 0xd6, 0x75, 0x92, 0x46, 0xce, 0x0d, 0x9a, 0x73, 0xbe, 0x73, 0xce, - 0x7c, 0x73, 0xbe, 0x39, 0xc3, 0xc2, 0x7d, 0xca, 0xfc, 0x90, 0x34, 0xf8, 0x6f, 0x30, 0x6c, 0x84, - 0xc1, 0xa8, 0x1e, 0x84, 0x3e, 0xf3, 0x91, 0xce, 0xc6, 0x96, 0xe7, 0xd3, 0xd2, 0x56, 0x32, 0x80, - 0xcd, 0x02, 0x42, 0x45, 0x48, 0xa9, 0x68, 0xfb, 0xb6, 0xcf, 0x97, 0x8d, 0x68, 0x25, 0xbd, 0x95, - 0x64, 0x42, 0x10, 0xfa, 0xee, 0x8d, 0x3c, 0x59, 0x72, 0x62, 0x0d, 0xc9, 0xe4, 0x26, 0x64, 0xfb, - 0xbe, 0x3d, 0x21, 0x0d, 0x6e, 0x0d, 0xa7, 0xcf, 0x1b, 0x96, 0x37, 0x13, 0x50, 0xf5, 0x7f, 0xb0, - 0x7a, 0x12, 0x3a, 0x8c, 0x60, 0x42, 0x03, 0xdf, 0xa3, 0xa4, 0xfa, 0xa3, 0x02, 0x2b, 0xd2, 0xf3, - 0xdd, 0x94, 0x50, 0x86, 0x5a, 0x00, 0xcc, 0x71, 0x09, 0x25, 0xa1, 0x43, 0xa8, 0xa9, 0x54, 0xd4, - 0x5a, 0xa1, 0xf9, 0x20, 0xca, 0x76, 0x09, 0x1b, 0x93, 0x29, 0x1d, 0x8c, 0xfc, 0x60, 0x56, 0x3f, - 0x76, 0x5c, 0xd2, 0xe3, 0x21, 0xed, 0xcc, 0xe5, 0xcb, 0xed, 0x14, 0x5e, 0x4a, 0x42, 0x9b, 0xa0, - 0x33, 0xe2, 0x59, 0x1e, 0x33, 0xd3, 0x15, 0xa5, 0x96, 0xc7, 0xd2, 0x42, 0x26, 0x64, 0x43, 0x12, - 0x4c, 0x9c, 0x91, 0x65, 0xaa, 0x15, 0xa5, 0xa6, 0xe2, 0xd8, 0xac, 0xae, 0x42, 0xe1, 0xc0, 0x7b, - 0xee, 0x4b, 0x0e, 0xd5, 0x5f, 0xd2, 0xb0, 0x22, 0x6c, 0xc1, 0x12, 0x8d, 0x40, 0xe7, 0x07, 0x8d, - 0x09, 0xad, 0xd6, 0x45, 0x63, 0xeb, 0x4f, 0x22, 0x6f, 0xfb, 0x71, 0x44, 0xe1, 0x8f, 0x97, 0xdb, - 0x1f, 0xd9, 0x0e, 0x1b, 0x4f, 0x87, 0xf5, 0x91, 0xef, 0x36, 0x44, 0xc0, 0x7b, 0x8e, 0x2f, 0x57, - 0x8d, 0xe0, 0xcc, 0x6e, 0x24, 0x7a, 0x56, 0x7f, 0xc6, 0xb3, 0xb1, 0x2c, 0x8d, 0xb6, 0x20, 0xe7, - 0x3a, 0xde, 0x20, 0x3a, 0x08, 0x27, 0xae, 0xe2, 0xac, 0xeb, 0x78, 0xd1, 0x49, 0x39, 0x64, 0x5d, - 0x08, 0x48, 0x52, 0x77, 0xad, 0x0b, 0x0e, 0x35, 0x20, 0xcf, 0xab, 0x1e, 0xcf, 0x02, 0x62, 0x66, - 0x2a, 0x4a, 0x6d, 0xad, 0x79, 0x2f, 0x66, 0xd7, 0x8b, 0x01, 0xbc, 0x88, 0x41, 0x8f, 0x00, 0xf8, - 0x86, 0x03, 0x4a, 0x18, 0x35, 0x35, 0x7e, 0x9e, 0x79, 0x86, 0xa0, 0xd4, 0x23, 0x4c, 0xb6, 0x35, - 0x3f, 0x91, 0x36, 0xad, 0xfe, 0xad, 0xc2, 0xaa, 0x68, 0x79, 0x2c, 0xd5, 0x32, 0x61, 0xe5, 0xd5, - 0x84, 0xd3, 0x49, 0xc2, 0x8f, 0x22, 0x88, 0x8d, 0xc6, 0x24, 0xa4, 0xa6, 0xca, 0x77, 0x2f, 0x26, - 0xba, 0x79, 0x28, 0x40, 0x49, 0x60, 0x1e, 0x8b, 0x9a, 0xb0, 0x11, 0x95, 0x0c, 0x09, 0xf5, 0x27, - 0x53, 0xe6, 0xf8, 0xde, 0xe0, 0xdc, 0xf1, 0x4e, 0xfd, 0x73, 0x7e, 0x68, 0x15, 0xaf, 0xbb, 0xd6, - 0x05, 0x9e, 0x63, 0x27, 0x1c, 0x42, 0x0f, 0x01, 0x2c, 0xdb, 0x0e, 0x89, 0x6d, 0x31, 0x22, 0xce, - 0xba, 0xd6, 0x5c, 0x89, 0x77, 0x6b, 0xd9, 0x76, 0x88, 0x97, 0x70, 0xf4, 0x09, 0x6c, 0x05, 0x56, - 0xc8, 0x1c, 0x6b, 0x12, 0xed, 0xc2, 0x95, 0x1f, 0x9c, 0x3a, 0xd4, 0x1a, 0x4e, 0xc8, 0xa9, 0xa9, - 0x57, 0x94, 0x5a, 0x0e, 0xdf, 0x97, 0x01, 0xf1, 0xcd, 0xd8, 0x93, 0x30, 0xfa, 0xe6, 0x96, 0x5c, - 0xca, 0x42, 0x8b, 0x11, 0x7b, 0x66, 0x66, 0xb9, 0x2c, 0xdb, 0xf1, 0xc6, 0x5f, 0x24, 0x6b, 0xf4, - 0x64, 0xd8, 0xbf, 0x8a, 0xc7, 0x00, 0xda, 0x86, 0x02, 0x3d, 0x73, 0x82, 0xc1, 0x68, 0x3c, 0xf5, - 0xce, 0xa8, 0x99, 0xe3, 0x54, 0x20, 0x72, 0xed, 0x72, 0x0f, 0xda, 0x01, 0x6d, 0xec, 0x78, 0x8c, - 0x9a, 0xf9, 0x8a, 0xc2, 0x1b, 0x2a, 0x26, 0xb0, 0x1e, 0x4f, 0x60, 0xbd, 0xe5, 0xcd, 0xb0, 0x08, - 0x41, 0x08, 0x32, 0x94, 0x91, 0xc0, 0x04, 0xde, 0x36, 0xbe, 0x46, 0x45, 0xd0, 0x42, 0xcb, 0xb3, - 0x89, 0x59, 0xe0, 0x4e, 0x61, 0x54, 0x7f, 0x52, 0x60, 0x2d, 0x56, 0x5c, 0x0e, 0x42, 0x0d, 0xf4, - 0xf9, 0x64, 0x46, 0x3b, 0xad, 0xcd, 0xaf, 0x1a, 0xf7, 0xee, 0xa7, 0xb0, 0xc4, 0x51, 0x09, 0xb2, - 0xe7, 0x56, 0xe8, 0x39, 0x9e, 0x2d, 0xa6, 0x70, 0x3f, 0x85, 0x63, 0x07, 0x7a, 0x18, 0xd3, 0x55, - 0x5f, 0x4d, 0x77, 0x3f, 0x25, 0x09, 0xb7, 0x73, 0xa0, 0x87, 0x84, 0x4e, 0x27, 0xac, 0xfa, 0x6b, - 0x1a, 0xee, 0xf1, 0x3b, 0xd2, 0xb5, 0xdc, 0xc5, 0x35, 0x7c, 0xad, 0x6c, 0xca, 0x1d, 0x64, 0x4b, - 0xdf, 0x51, 0xb6, 0x22, 0x68, 0x94, 0x59, 0x21, 0x93, 0x23, 0x2b, 0x0c, 0x64, 0x80, 0x4a, 0xbc, - 0x53, 0x79, 0x6b, 0xa3, 0xe5, 0x42, 0x3d, 0xed, 0xcd, 0xea, 0x2d, 0x4f, 0x8f, 0xfe, 0xdf, 0xa7, - 0xa7, 0x1a, 0x02, 0x5a, 0xee, 0x9c, 0x94, 0xb3, 0x08, 0x9a, 0x17, 0x39, 0xf8, 0xb3, 0x96, 0xc7, - 0xc2, 0x40, 0x25, 0xc8, 0x49, 0xa5, 0xa8, 0x99, 0xe6, 0xc0, 0xdc, 0x5e, 0x70, 0x55, 0xdf, 0xc8, - 0xb5, 0xfa, 0x5b, 0x5a, 0x6e, 0xfa, 0xd4, 0x9a, 0x4c, 0x17, 0x7a, 0x15, 0x41, 0xe3, 0xaf, 0x0a, - 0xd7, 0x26, 0x8f, 0x85, 0xf1, 0x7a, 0x15, 0xd3, 0x77, 0x50, 0x51, 0x7d, 0x57, 0x2a, 0x66, 0x6e, - 0x51, 0x51, 0xbb, 0x45, 0x45, 0xfd, 0xed, 0x54, 0xcc, 0xbe, 0x85, 0x8a, 0x53, 0x58, 0x4f, 0x34, - 0x54, 0xca, 0xb8, 0x09, 0xfa, 0xf7, 0xdc, 0x23, 0x75, 0x94, 0xd6, 0xbb, 0x12, 0x72, 0xe7, 0x5b, - 0xc8, 0xcf, 0xff, 0x4a, 0x50, 0x01, 0xb2, 0xfd, 0xee, 0xe7, 0xdd, 0xa3, 0x93, 0xae, 0x91, 0x42, - 0x79, 0xd0, 0xbe, 0xec, 0x77, 0xf0, 0xd7, 0x86, 0x82, 0x72, 0x90, 0xc1, 0xfd, 0x27, 0x1d, 0x23, - 0x1d, 0x45, 0xf4, 0x0e, 0xf6, 0x3a, 0xbb, 0x2d, 0x6c, 0xa8, 0x51, 0x44, 0xef, 0xf8, 0x08, 0x77, - 0x8c, 0x4c, 0xe4, 0xc7, 0x9d, 0xdd, 0xce, 0xc1, 0xd3, 0x8e, 0xa1, 0x45, 0xfe, 0xbd, 0x4e, 0xbb, - 0xff, 0x99, 0xa1, 0xef, 0xb4, 0x21, 0x13, 0xbd, 0xc5, 0x28, 0x0b, 0x2a, 0x6e, 0x9d, 0x88, 0xaa, - 0xbb, 0x47, 0xfd, 0xee, 0xb1, 0xa1, 0x44, 0xbe, 0x5e, 0xff, 0xd0, 0x48, 0x47, 0x8b, 0xc3, 0x83, - 0xae, 0xa1, 0xf2, 0x45, 0xeb, 0x2b, 0x51, 0x8e, 0x47, 0x75, 0xb0, 0xa1, 0x35, 0x7f, 0x48, 0x83, - 0xc6, 0x39, 0xa2, 0x0f, 0x20, 0x13, 0xfd, 0x77, 0xa3, 0xf5, 0xb8, 0xa3, 0x4b, 0xff, 0xec, 0xa5, - 0x62, 0xd2, 0x29, 0xfb, 0xf7, 0x31, 0xe8, 0xe2, 0xfd, 0x42, 0x1b, 0xc9, 0xf7, 0x2c, 0x4e, 0xdb, - 0xbc, 0xe9, 0x16, 0x89, 0xef, 0x2b, 0x68, 0x17, 0x60, 0x31, 0x57, 0x68, 0x2b, 0xa1, 0xe2, 0xf2, - 0x2b, 0x55, 0x2a, 0xdd, 0x06, 0xc9, 0xfd, 0x3f, 0x85, 0xc2, 0x92, 0xac, 0x28, 0x19, 0x9a, 0x18, - 0x9e, 0xd2, 0x83, 0x5b, 0x31, 0x51, 0xa7, 0xd9, 0x85, 0x35, 0xfe, 0x2d, 0x15, 0x4d, 0x85, 0x68, - 0xc6, 0x63, 0x28, 0x60, 0xe2, 0xfa, 0x8c, 0x70, 0x3f, 0x9a, 0x1f, 0x7f, 0xf9, 0x93, 0xab, 0xb4, - 0x71, 0xc3, 0x2b, 0x3f, 0xcd, 0x52, 0xed, 0xff, 0x5f, 0xfe, 0x55, 0x4e, 0x5d, 0x5e, 0x95, 0x95, - 0x17, 0x57, 0x65, 0xe5, 0xcf, 0xab, 0xb2, 0xf2, 0xf3, 0x75, 0x39, 0xf5, 0xe2, 0xba, 0x9c, 0xfa, - 0xfd, 0xba, 0x9c, 0x7a, 0x96, 0x95, 0x5f, 0x87, 0x43, 0x9d, 0xdf, 0x99, 0x0f, 0xff, 0x09, 0x00, - 0x00, 0xff, 0xff, 0xa4, 0xea, 0x02, 0x9d, 0x87, 0x0a, 0x00, 0x00, + // 1251 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x5f, 0x6f, 0xd3, 0x56, + 0x14, 0x8f, 0xed, 0x38, 0x7f, 0x4e, 0xda, 0xce, 0xdc, 0x06, 0x70, 0x83, 0x94, 0x46, 0x9e, 0x26, + 0x55, 0x88, 0x25, 0x2c, 0x9b, 0x90, 0x98, 0x78, 0x69, 0x4b, 0xa0, 0xd5, 0x68, 0x80, 0x9b, 0x96, + 0x6e, 0x4c, 0x53, 0xe4, 0xa4, 0x17, 0xc7, 0x22, 0xb1, 0x8d, 0xef, 0xf5, 0x4a, 0x5e, 0xb7, 0xf7, + 0x69, 0xda, 0x47, 0xd8, 0xa7, 0xd8, 0x47, 0xe0, 0x6d, 0x3c, 0x4e, 0xd3, 0x84, 0x36, 0xf8, 0x22, + 0xd3, 0xfd, 0xe3, 0x24, 0xee, 0x02, 0x0c, 0xc1, 0x4b, 0x74, 0xcf, 0xef, 0x77, 0xee, 0xb9, 0xe7, + 0x7f, 0x0c, 0x17, 0x29, 0x0b, 0x63, 0xd2, 0x12, 0xbf, 0xd1, 0xa0, 0x15, 0x47, 0xc3, 0x66, 0x14, + 0x87, 0x2c, 0x44, 0x05, 0x36, 0x72, 0x83, 0x90, 0xd6, 0x36, 0xb2, 0x0a, 0x6c, 0x1a, 0x11, 0x2a, + 0x55, 0x6a, 0x55, 0x2f, 0xf4, 0x42, 0x71, 0x6c, 0xf1, 0x93, 0x42, 0x1b, 0xd9, 0x0b, 0x51, 0x1c, + 0x4e, 0xce, 0xdc, 0x53, 0x26, 0xc7, 0xee, 0x80, 0x8c, 0xcf, 0x52, 0x5e, 0x18, 0x7a, 0x63, 0xd2, + 0x12, 0xd2, 0x20, 0x79, 0xd4, 0x72, 0x83, 0xa9, 0xa4, 0x9c, 0x8f, 0x60, 0xf5, 0x38, 0xf6, 0x19, + 0xc1, 0x84, 0x46, 0x61, 0x40, 0x89, 0xf3, 0xa3, 0x06, 0x2b, 0x0a, 0x79, 0x92, 0x10, 0xca, 0xd0, + 0x36, 0x00, 0xf3, 0x27, 0x84, 0x92, 0xd8, 0x27, 0xd4, 0xd6, 0x1a, 0xc6, 0x56, 0xa5, 0x7d, 0x89, + 0xdf, 0x9e, 0x10, 0x36, 0x22, 0x09, 0xed, 0x0f, 0xc3, 0x68, 0xda, 0x3c, 0xf4, 0x27, 0xa4, 0x27, + 0x54, 0x76, 0xf2, 0xcf, 0x5e, 0x6c, 0xe6, 0xf0, 0xc2, 0x25, 0x74, 0x01, 0x0a, 0x8c, 0x04, 0x6e, + 0xc0, 0x6c, 0xbd, 0xa1, 0x6d, 0x95, 0xb1, 0x92, 0x90, 0x0d, 0xc5, 0x98, 0x44, 0x63, 0x7f, 0xe8, + 0xda, 0x46, 0x43, 0xdb, 0x32, 0x70, 0x2a, 0x3a, 0xab, 0x50, 0xd9, 0x0f, 0x1e, 0x85, 0xca, 0x07, + 0xe7, 0x17, 0x1d, 0x56, 0xa4, 0x2c, 0xbd, 0x44, 0x43, 0x28, 0x88, 0x40, 0x53, 0x87, 0x56, 0x9b, + 0x32, 0xb1, 0xcd, 0x3b, 0x1c, 0xdd, 0xb9, 0xc1, 0x5d, 0xf8, 0xf3, 0xc5, 0xe6, 0x17, 0x9e, 0xcf, + 0x46, 0xc9, 0xa0, 0x39, 0x0c, 0x27, 0x2d, 0xa9, 0xf0, 0xa9, 0x1f, 0xaa, 0x53, 0x2b, 0x7a, 0xec, + 0xb5, 0x32, 0x39, 0x6b, 0x3e, 0x14, 0xb7, 0xb1, 0x32, 0x8d, 0x36, 0xa0, 0x34, 0xf1, 0x83, 0x3e, + 0x0f, 0x44, 0x38, 0x6e, 0xe0, 0xe2, 0xc4, 0x0f, 0x78, 0xa4, 0x82, 0x72, 0x9f, 0x4a, 0x4a, 0xb9, + 0x3e, 0x71, 0x9f, 0x0a, 0xaa, 0x05, 0x65, 0x61, 0xf5, 0x70, 0x1a, 0x11, 0x3b, 0xdf, 0xd0, 0xb6, + 0xd6, 0xda, 0xe7, 0x52, 0xef, 0x7a, 0x29, 0x81, 0xe7, 0x3a, 0xe8, 0x1a, 0x80, 0x78, 0xb0, 0x4f, + 0x09, 0xa3, 0xb6, 0x29, 0xe2, 0x99, 0xdd, 0x90, 0x2e, 0xf5, 0x08, 0x53, 0x69, 0x2d, 0x8f, 0x95, + 0x4c, 0x9d, 0xbf, 0xf2, 0xb0, 0x2a, 0x53, 0x9e, 0x96, 0x6a, 0xd1, 0x61, 0xed, 0xf5, 0x0e, 0xeb, + 0x59, 0x87, 0xaf, 0x71, 0x8a, 0x0d, 0x47, 0x24, 0xa6, 0xb6, 0x21, 0x5e, 0xaf, 0x66, 0xb2, 0x79, + 0x20, 0x49, 0xe5, 0xc0, 0x4c, 0x17, 0xb5, 0xe1, 0x3c, 0x37, 0x19, 0x13, 0x1a, 0x8e, 0x13, 0xe6, + 0x87, 0x41, 0xff, 0xd4, 0x0f, 0x4e, 0xc2, 0x53, 0x11, 0xb4, 0x81, 0xd7, 0x27, 0xee, 0x53, 0x3c, + 0xe3, 0x8e, 0x05, 0x85, 0xae, 0x00, 0xb8, 0x9e, 0x17, 0x13, 0xcf, 0x65, 0x44, 0xc6, 0xba, 0xd6, + 0x5e, 0x49, 0x5f, 0xdb, 0xf6, 0xbc, 0x18, 0x2f, 0xf0, 0xe8, 0x4b, 0xd8, 0x88, 0xdc, 0x98, 0xf9, + 0xee, 0x98, 0xbf, 0x22, 0x2a, 0xdf, 0x3f, 0xf1, 0xa9, 0x3b, 0x18, 0x93, 0x13, 0xbb, 0xd0, 0xd0, + 0xb6, 0x4a, 0xf8, 0xa2, 0x52, 0x48, 0x3b, 0xe3, 0xa6, 0xa2, 0xd1, 0xb7, 0x4b, 0xee, 0x52, 0x16, + 0xbb, 0x8c, 0x78, 0x53, 0xbb, 0x28, 0xca, 0xb2, 0x99, 0x3e, 0x7c, 0x2f, 0x6b, 0xa3, 0xa7, 0xd4, + 0xfe, 0x63, 0x3c, 0x25, 0xd0, 0x26, 0x54, 0xe8, 0x63, 0x3f, 0xea, 0x0f, 0x47, 0x49, 0xf0, 0x98, + 0xda, 0x25, 0xe1, 0x0a, 0x70, 0x68, 0x57, 0x20, 0xe8, 0x32, 0x98, 0x23, 0x3f, 0x60, 0xd4, 0x2e, + 0x37, 0x34, 0x91, 0x50, 0x39, 0x81, 0xcd, 0x74, 0x02, 0x9b, 0xdb, 0xc1, 0x14, 0x4b, 0x15, 0x84, + 0x20, 0x4f, 0x19, 0x89, 0x6c, 0x10, 0x69, 0x13, 0x67, 0x54, 0x05, 0x33, 0x76, 0x03, 0x8f, 0xd8, + 0x15, 0x01, 0x4a, 0x01, 0x5d, 0x85, 0x75, 0x12, 0xf0, 0xf0, 0xee, 0x27, 0x24, 0x9e, 0xde, 0x4b, + 0xe8, 0xe8, 0x24, 0x3c, 0x0d, 0xec, 0x15, 0xf1, 0xfc, 0x32, 0x0a, 0x5d, 0x87, 0xca, 0x13, 0x0e, + 0xf4, 0xa5, 0x37, 0xab, 0xc2, 0x1b, 0x94, 0xc6, 0x2d, 0x74, 0xf7, 0x38, 0x93, 0x0e, 0xed, 0x93, + 0x19, 0xe2, 0xfc, 0xaa, 0x01, 0xcc, 0x15, 0x44, 0xc8, 0x8c, 0x44, 0xfd, 0x89, 0x3f, 0x1e, 0xfb, + 0x54, 0xb5, 0x17, 0x70, 0xe8, 0x40, 0x20, 0xa8, 0x01, 0xf9, 0x47, 0x49, 0x30, 0x14, 0xdd, 0x55, + 0x99, 0x17, 0xf5, 0x56, 0x12, 0x0c, 0xb1, 0x60, 0xd0, 0x15, 0x28, 0x79, 0x71, 0x98, 0x44, 0x7e, + 0xe0, 0x89, 0x1e, 0xa9, 0xb4, 0xad, 0x54, 0xeb, 0xb6, 0xc2, 0xf1, 0x4c, 0x03, 0x7d, 0x9c, 0xa6, + 0xc0, 0x14, 0xaa, 0xb3, 0x09, 0xc7, 0x1c, 0x54, 0x19, 0x71, 0x6a, 0x90, 0xe7, 0x0f, 0xf0, 0x1c, + 0x06, 0xae, 0xea, 0xfa, 0x32, 0x16, 0x67, 0xa7, 0x0d, 0xa5, 0xd4, 0x2c, 0x5a, 0x03, 0x7d, 0x30, + 0x15, 0x6c, 0x09, 0xeb, 0x83, 0x29, 0xdf, 0x48, 0x6a, 0x7f, 0xf0, 0x8e, 0x2f, 0xa7, 0x23, 0xef, + 0x6c, 0x82, 0x29, 0xec, 0x73, 0x85, 0x4c, 0xa4, 0x4a, 0x72, 0x7e, 0xd2, 0x60, 0x2d, 0x1d, 0x3a, + 0xb5, 0x8b, 0xb6, 0xa0, 0x30, 0x5b, 0x8e, 0xdc, 0xd3, 0xb5, 0xd9, 0xb4, 0x0b, 0x74, 0x2f, 0x87, + 0x15, 0x8f, 0x6a, 0x50, 0x3c, 0x75, 0xe3, 0x80, 0xc7, 0x2f, 0x16, 0xe1, 0x5e, 0x0e, 0xa7, 0x00, + 0xba, 0x92, 0x76, 0x8c, 0xf1, 0xfa, 0x8e, 0xd9, 0xcb, 0xa9, 0x9e, 0xd9, 0x29, 0x41, 0x21, 0x26, + 0x34, 0x19, 0x33, 0xe7, 0x37, 0x1d, 0xce, 0x89, 0x31, 0xed, 0xba, 0x93, 0xf9, 0x26, 0x78, 0xe3, + 0xe4, 0x68, 0xef, 0x31, 0x39, 0xfa, 0x7b, 0x4e, 0x4e, 0x15, 0x4c, 0xca, 0xdc, 0x98, 0xa9, 0xad, + 0x29, 0x05, 0x64, 0x81, 0x41, 0x82, 0x13, 0xb5, 0x38, 0xf8, 0x71, 0x3e, 0x40, 0xe6, 0xdb, 0x07, + 0x68, 0x71, 0x81, 0x15, 0xfe, 0xff, 0x02, 0x73, 0x62, 0x40, 0x8b, 0x99, 0x53, 0xe5, 0xac, 0x82, + 0xc9, 0xdb, 0x47, 0xfe, 0xb3, 0x94, 0xb1, 0x14, 0x50, 0x0d, 0x4a, 0xaa, 0x52, 0xd4, 0xd6, 0x05, + 0x31, 0x93, 0xe7, 0xbe, 0x1a, 0x6f, 0xf5, 0xd5, 0xf9, 0x5d, 0x57, 0x8f, 0x3e, 0x70, 0xc7, 0xc9, + 0xbc, 0x5e, 0x55, 0x30, 0x45, 0x07, 0xaa, 0x06, 0x96, 0xc2, 0x9b, 0xab, 0xa8, 0xbf, 0x47, 0x15, + 0x8d, 0x0f, 0x55, 0xc5, 0xfc, 0x92, 0x2a, 0x9a, 0x4b, 0xaa, 0x58, 0x78, 0xb7, 0x2a, 0x16, 0xdf, + 0xa1, 0x8a, 0x09, 0xac, 0x67, 0x12, 0xaa, 0xca, 0x78, 0x01, 0x0a, 0xdf, 0x0b, 0x44, 0xd5, 0x51, + 0x49, 0x1f, 0xaa, 0x90, 0x97, 0xbf, 0x83, 0xf2, 0xec, 0xdf, 0x1c, 0x55, 0xa0, 0x78, 0xd4, 0xfd, + 0xaa, 0x7b, 0xf7, 0xb8, 0x6b, 0xe5, 0x50, 0x19, 0xcc, 0xfb, 0x47, 0x1d, 0xfc, 0x8d, 0xa5, 0xa1, + 0x12, 0xe4, 0xf1, 0xd1, 0x9d, 0x8e, 0xa5, 0x73, 0x8d, 0xde, 0xfe, 0xcd, 0xce, 0xee, 0x36, 0xb6, + 0x0c, 0xae, 0xd1, 0x3b, 0xbc, 0x8b, 0x3b, 0x56, 0x9e, 0xe3, 0xb8, 0xb3, 0xdb, 0xd9, 0x7f, 0xd0, + 0xb1, 0x4c, 0x8e, 0xdf, 0xec, 0xec, 0x1c, 0xdd, 0xb6, 0x0a, 0x97, 0x77, 0x20, 0xcf, 0xff, 0x0e, + 0x51, 0x11, 0x0c, 0xbc, 0x7d, 0x2c, 0xad, 0xee, 0xde, 0x3d, 0xea, 0x1e, 0x5a, 0x1a, 0xc7, 0x7a, + 0x47, 0x07, 0x96, 0xce, 0x0f, 0x07, 0xfb, 0x5d, 0xcb, 0x10, 0x87, 0xed, 0xaf, 0xa5, 0x39, 0xa1, + 0xd5, 0xc1, 0x96, 0xd9, 0xfe, 0x41, 0x07, 0x53, 0xf8, 0x88, 0x3e, 0x83, 0x3c, 0xff, 0x7c, 0x42, + 0xeb, 0x69, 0x46, 0x17, 0x3e, 0xae, 0x6a, 0xd5, 0x2c, 0xa8, 0xf2, 0x77, 0x1d, 0x0a, 0x72, 0x7f, + 0xa1, 0xf3, 0xd9, 0x7d, 0x96, 0x5e, 0xbb, 0x70, 0x16, 0x96, 0x17, 0xaf, 0x6a, 0x68, 0x17, 0x60, + 0x3e, 0x57, 0x68, 0x23, 0x53, 0xc5, 0xc5, 0x2d, 0x55, 0xab, 0x2d, 0xa3, 0xd4, 0xfb, 0xb7, 0xa0, + 0xb2, 0x50, 0x56, 0x94, 0x55, 0xcd, 0x0c, 0x4f, 0xed, 0xd2, 0x52, 0x4e, 0xda, 0x69, 0x77, 0x61, + 0x4d, 0x7c, 0xce, 0xf2, 0xa9, 0x90, 0xc9, 0xb8, 0x01, 0x15, 0x4c, 0x26, 0x21, 0x23, 0x02, 0x47, + 0xb3, 0xf0, 0x17, 0xbf, 0x7a, 0x6b, 0xe7, 0xcf, 0xa0, 0xea, 0xeb, 0x38, 0xb7, 0xf3, 0xc9, 0xb3, + 0x7f, 0xea, 0xb9, 0x67, 0x2f, 0xeb, 0xda, 0xf3, 0x97, 0x75, 0xed, 0xef, 0x97, 0x75, 0xed, 0xe7, + 0x57, 0xf5, 0xdc, 0xf3, 0x57, 0xf5, 0xdc, 0x1f, 0xaf, 0xea, 0xb9, 0x87, 0x45, 0xf5, 0x81, 0x3e, + 0x28, 0x88, 0x9e, 0xf9, 0xfc, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xce, 0x35, 0xfd, 0xdd, 0x0a, + 0x0c, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1180,6 +1363,26 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + { + size, err := m.QueryHints.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x6a + if m.EnableQueryPushdown { + i-- + if m.EnableQueryPushdown { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x60 + } if m.Range != 0 { i = encodeVarintRpc(dAtA, i, uint64(m.Range)) i-- @@ -1228,20 +1431,20 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x30 } if len(m.Aggregates) > 0 { - dAtA3 := make([]byte, len(m.Aggregates)*10) - var j2 int + dAtA4 := make([]byte, len(m.Aggregates)*10) + var j3 int for _, num := range m.Aggregates { for num >= 1<<7 { - dAtA3[j2] = uint8(uint64(num)&0x7f | 0x80) + dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j2++ + j3++ } - dAtA3[j2] = uint8(num) - j2++ + dAtA4[j3] = uint8(num) + j3++ } - i -= j2 - copy(dAtA[i:], dAtA3[:j2]) - i = encodeVarintRpc(dAtA, i, uint64(j2)) + i -= j3 + copy(dAtA[i:], dAtA4[:j3]) + i = encodeVarintRpc(dAtA, i, uint64(j3)) i-- dAtA[i] = 0x2a } @@ -1277,6 +1480,170 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *QueryHints) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryHints) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryHints) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Range != nil { + { + size, err := m.Range.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } + if m.Grouping != nil { + { + size, err := m.Grouping.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + if m.Func != nil { + { + size, err := m.Func.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.StepMillis != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.StepMillis)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Func) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Func) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Func) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Grouping) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Grouping) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Grouping) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Labels[iNdEx]) + copy(dAtA[i:], m.Labels[iNdEx]) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Labels[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } + if m.By { + i-- + if m.By { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Range) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Range) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Range) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Millis != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.Millis)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *SeriesResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1754,41 +2121,113 @@ func (m *SeriesRequest) Size() (n int) { if m.Range != 0 { n += 1 + sovRpc(uint64(m.Range)) } + if m.EnableQueryPushdown { + n += 2 + } + l = m.QueryHints.Size() + n += 1 + l + sovRpc(uint64(l)) return n } -func (m *SeriesResponse) Size() (n int) { +func (m *QueryHints) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.Result != nil { - n += m.Result.Size() + if m.StepMillis != 0 { + n += 1 + sovRpc(uint64(m.StepMillis)) + } + if m.Func != nil { + l = m.Func.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.Grouping != nil { + l = m.Grouping.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.Range != nil { + l = m.Range.Size() + n += 1 + l + sovRpc(uint64(l)) } return n } -func (m *SeriesResponse_Series) Size() (n int) { +func (m *Func) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.Series != nil { - l = m.Series.Size() + l = len(m.Name) + if l > 0 { n += 1 + l + sovRpc(uint64(l)) } return n } -func (m *SeriesResponse_Warning) Size() (n int) { + +func (m *Grouping) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.Warning) - n += 1 + l + sovRpc(uint64(l)) + if m.By { + n += 2 + } + if len(m.Labels) > 0 { + for _, s := range m.Labels { + l = len(s) + n += 1 + l + sovRpc(uint64(l)) + } + } + return n +} + +func (m *Range) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Millis != 0 { + n += 1 + sovRpc(uint64(m.Millis)) + } + return n +} + +func (m *SeriesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Result != nil { + n += m.Result.Size() + } + return n +} + +func (m *SeriesResponse_Series) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Series != nil { + l = m.Series.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} +func (m *SeriesResponse_Warning) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warning) + n += 1 + l + sovRpc(uint64(l)) return n } func (m *SeriesResponse_Hints) Size() (n int) { @@ -2657,6 +3096,489 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { break } } + case 12: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnableQueryPushdown", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnableQueryPushdown = bool(v != 0) + case 13: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field QueryHints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.QueryHints.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryHints) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryHints: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryHints: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StepMillis", wireType) + } + m.StepMillis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StepMillis |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Func", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Func == nil { + m.Func = &Func{} + } + if err := m.Func.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Grouping", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Grouping == nil { + m.Grouping = &Grouping{} + } + if err := m.Grouping.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Range == nil { + m.Range = &Range{} + } + if err := m.Range.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Func) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Func: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Func: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Grouping) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Grouping: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Grouping: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field By", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.By = bool(v != 0) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Range) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Range: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Range: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Millis", wireType) + } + m.Millis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Millis |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 4a9240620d8..c3a55350f18 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -80,18 +80,18 @@ message InfoResponse { repeated Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"]; int64 min_time = 2; int64 max_time = 3; - StoreType storeType = 4; + StoreType storeType = 4; // label_sets is an unsorted list of `ZLabelSet`s. repeated ZLabelSet label_sets = 5 [(gogoproto.nullable) = false]; } message SeriesRequest { - int64 min_time = 1; - int64 max_time = 2; + int64 min_time = 1; + int64 max_time = 2; repeated LabelMatcher matchers = 3 [(gogoproto.nullable) = false]; int64 max_resolution_window = 4; - repeated Aggr aggregates = 5; + repeated Aggr aggregates = 5; // Deprecated. Use partial_response_strategy instead. bool partial_response_disabled = 6; @@ -108,18 +108,60 @@ message SeriesRequest { google.protobuf.Any hints = 9; // Query step size in milliseconds. + // Deprecated: Use query_hints instead. int64 step = 10; // Range vector selector range in milliseconds. + // Deprecated: Use query_hints instead. int64 range = 11; + + // Indicates whether the store should evaluate queries + // before sending back samples. + bool enableQueryPushdown = 12; + + // query_hints are the hints coming from the PromQL engine when + // requesting a storage.SeriesSet for a given expression. + QueryHints query_hints = 13 [(gogoproto.nullable) = false]; +} + +// Analogous to storage.SelectHints. +message QueryHints { + // Query step size in milliseconds. + int64 step_millis = 1; + + // The surrounding function or aggregation. + Func func = 2; + + // The grouping expression + Grouping grouping = 4; + + // Range vector selector. + Range range = 5; +} + +message Func { + // The function or aggregation name + string name = 1; +} + +message Grouping { + // Indicate whether it is without or by. + bool by = 1; + + // List of label names used in the grouping. + repeated string labels = 3; +} + +message Range { + int64 millis = 1; } enum Aggr { - RAW = 0; - COUNT = 1; - SUM = 2; - MIN = 3; - MAX = 4; + RAW = 0; + COUNT = 1; + SUM = 2; + MIN = 3; + MAX = 4; COUNTER = 5; } @@ -160,7 +202,7 @@ message LabelNamesRequest { } message LabelNamesResponse { - repeated string names = 1; + repeated string names = 1; repeated string warnings = 2; /// hints is an opaque data structure that can be used to carry additional information from @@ -190,7 +232,7 @@ message LabelValuesRequest { } message LabelValuesResponse { - repeated string values = 1; + repeated string values = 1; repeated string warnings = 2; /// hints is an opaque data structure that can be used to carry additional information from diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 69667b48d64..930d6bb16ac 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -126,10 +126,10 @@ func NewPrometheus(e e2e.Environment, name, promConfig, webConfig, promImage str } func NewPrometheusWithSidecar(e e2e.Environment, name, promConfig, webConfig, promImage string, enableFeatures ...string) (*e2e.InstrumentedRunnable, *e2e.InstrumentedRunnable, error) { - return NewPrometheusWithSidecarCustomImage(e, name, promConfig, webConfig, promImage, DefaultImage(), enableFeatures...) + return NewPrometheusWithSidecarCustomImage(e, name, promConfig, webConfig, promImage, DefaultImage(), false, enableFeatures...) } -func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, webConfig, promImage string, sidecarImage string, enableFeatures ...string) (*e2e.InstrumentedRunnable, *e2e.InstrumentedRunnable, error) { +func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, webConfig, promImage string, sidecarImage string, sidecarEvaluateQueries bool, enableFeatures ...string) (*e2e.InstrumentedRunnable, *e2e.InstrumentedRunnable, error) { prom, dataDir, err := NewPrometheus(e, name, promConfig, webConfig, promImage, enableFeatures...) if err != nil { return nil, nil, err @@ -144,6 +144,9 @@ func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, we "--tsdb.path": dataDir, "--log.level": infoLogLevel, } + if sidecarEvaluateQueries { + args["--enable-feature"] = "evaluate-queries" + } if len(webConfig) > 0 { args["--prometheus.http-client"] = defaultPromHttpConfig() } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index cd61e991abf..795ee784919 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -15,6 +15,12 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" + "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" "github.com/efficientgo/e2e" @@ -426,6 +432,7 @@ func TestQueryCompatibilityWithPreInfoAPI(t *testing.T) { "", e2ethanos.DefaultPrometheusImage(), tcase.sidecarImage, + false, e2ethanos.FeatureExemplarStorage, ) testutil.Ok(t, err) @@ -574,6 +581,159 @@ config: } } +type fakeMetricSample struct { + label string + value int64 +} + +func newSample(s fakeMetricSample) model.Sample { + return model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "my_fake_metric", + "instance": model.LabelValue(s.label), + }, + Value: model.SampleValue(s.value), + Timestamp: model.Now(), + } +} + +func TestSidecarQueryEvaluation(t *testing.T) { + t.Parallel() + + ts := []struct { + prom1Samples []fakeMetricSample + prom2Samples []fakeMetricSample + query string + result model.Vector + }{ + { + query: "max (my_fake_metric)", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}, {"i3", 9}}, + prom2Samples: []fakeMetricSample{{"i1", 3}, {"i2", 4}, {"i3", 10}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{}, + Value: 10, + }, + }, + }, + { + query: "max by (instance) (my_fake_metric)", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}, {"i3", 9}}, + prom2Samples: []fakeMetricSample{{"i1", 3}, {"i2", 4}, {"i3", 10}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1"}, + Value: 3, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2"}, + Value: 5, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i3"}, + Value: 10, + }, + }, + }, + { + query: "group by (instance) (my_fake_metric)", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}, {"i3", 9}}, + prom2Samples: []fakeMetricSample{{"i1", 3}, {"i2", 4}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i3"}, + Value: 1, + }, + }, + }, + { + query: "max_over_time(my_fake_metric[10m])", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}}, + prom2Samples: []fakeMetricSample{{"i1", 3}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p1"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p2"}, + Value: 3, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2", "prometheus": "p1"}, + Value: 5, + }, + }, + }, + { + query: "min_over_time(my_fake_metric[10m])", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}}, + prom2Samples: []fakeMetricSample{{"i1", 3}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p1"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p2"}, + Value: 3, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2", "prometheus": "p1"}, + Value: 5, + }, + }, + }, + } + + for _, tc := range ts { + t.Run(tc.query, func(t *testing.T) { + e, err := e2e.NewDockerEnvironment("e2e_test_query_pushdown") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "p1", defaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "remote-write-receiver") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) + + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "p2", defaultPromConfig("p2", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "remote-write-receiver") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2)) + + endpoints := []string{ + sidecar1.InternalEndpoint("grpc"), + sidecar2.InternalEndpoint("grpc"), + } + q, err := e2ethanos. + NewQuerierBuilder(e, "1", endpoints...). + WithEnabledFeatures([]string{"query-pushdown"}). + Build() + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, synthesizeSamples(ctx, prom1, tc.prom1Samples)) + testutil.Ok(t, synthesizeSamples(ctx, prom2, tc.prom2Samples)) + + testQuery := func() string { return tc.query } + queryAndAssert(t, ctx, q.Endpoint("http"), testQuery, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, tc.result) + }) + } +} + func checkNetworkRequests(t *testing.T, addr string) { ctx, cancel := chromedp.NewContext(context.Background()) t.Cleanup(cancel) @@ -613,7 +773,7 @@ func mustURLParse(t testing.TB, addr string) *url.URL { return u } -func instantQuery(t *testing.T, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector { +func instantQuery(t testing.TB, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector { t.Helper() fmt.Println("queryAndAssert: Waiting for", expectedSeriesLen, "results for query", q()) @@ -759,3 +919,56 @@ func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, en return nil })) } + +func synthesizeSamples(ctx context.Context, prometheus *e2e.InstrumentedRunnable, testSamples []fakeMetricSample) error { + samples := make([]model.Sample, len(testSamples)) + for i, s := range testSamples { + samples[i] = newSample(s) + } + + remoteWriteURL, err := url.Parse("http://" + prometheus.Endpoint("http") + "/api/v1/write") + if err != nil { + return err + } + + client, err := remote.NewWriteClient("remote-write-client", &remote.ClientConfig{ + URL: &config_util.URL{URL: remoteWriteURL}, + Timeout: model.Duration(30 * time.Second), + }) + if err != nil { + return err + } + + samplespb := make([]prompb.TimeSeries, 0, len(samples)) + for _, sample := range samples { + labelspb := make([]prompb.Label, 0, len(sample.Metric)) + for labelKey, labelValue := range sample.Metric { + labelspb = append(labelspb, prompb.Label{ + Name: string(labelKey), + Value: string(labelValue), + }) + } + samplespb = append(samplespb, prompb.TimeSeries{ + Labels: labelspb, + Samples: []prompb.Sample{ + { + Value: float64(sample.Value), + Timestamp: sample.Timestamp.Time().Unix() * 1000, + }, + }, + }) + } + + sample := &prompb.WriteRequest{ + Timeseries: samplespb, + } + + var buf []byte + pBuf := proto.NewBuffer(nil) + if err := pBuf.Marshal(sample); err != nil { + return err + } + + compressed := snappy.Encode(buf, pBuf.Bytes()) + return client.Store(ctx, compressed) +}