diff --git a/CHANGELOG.md b/CHANGELOG.md index f7e9ab494f6..6cac2e9ab03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.) ## Unreleased +- [#4917](https://github.com/thanos-io/thanos/pull/4917) Sidecar: Add an `evaluate-queries` feature to Thanos sidecar to enable local execution of certain queries. ### Added diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 38fb1bb8fc1..25f7c611333 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -58,6 +58,7 @@ import ( const ( promqlNegativeOffset = "promql-negative-offset" promqlAtModifier = "promql-at-modifier" + queryPushdown = "query-pushdown" ) // registerQuery registers a query command. @@ -160,7 +161,7 @@ func registerQuery(app *extkingpin.App) { enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling."). Hidden().Default("true").Bool() - featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+" and "+promqlAtModifier+".").Default("").Strings() + featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+", "+promqlAtModifier+" and "+queryPushdown+".").Default("").Strings() enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling."). Hidden().Default("true").Bool() @@ -181,7 +182,7 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "parse federation labels") } - var enableNegativeOffset, enableAtModifier bool + var enableNegativeOffset, enableAtModifier, enableQueryPushdown bool for _, feature := range *featureList { if feature == promqlNegativeOffset { enableNegativeOffset = true @@ -189,6 +190,9 @@ func registerQuery(app *extkingpin.App) { if feature == promqlAtModifier { enableAtModifier = true } + if feature == queryPushdown { + enableQueryPushdown = true + } } httpLogOpts, err := logging.ParseHTTPOptions(*reqLogDecision, reqLogConfig) @@ -278,6 +282,7 @@ func registerQuery(app *extkingpin.App) { *webDisableCORS, enableAtModifier, enableNegativeOffset, + enableQueryPushdown, *alertQueryURL, component.Query, ) @@ -346,6 +351,7 @@ func runQuery( disableCORS bool, enableAtModifier bool, enableNegativeOffset bool, + enableQueryPushdown bool, alertQueryURL string, comp component.Component, ) error { @@ -614,6 +620,7 @@ func runQuery( enableTargetPartialResponse, enableMetricMetadataPartialResponse, enableExemplarPartialResponse, + enableQueryPushdown, queryReplicaLabels, flagsMap, defaultRangeQueryStep, diff --git a/docs/components/query.md b/docs/components/query.md index 72d22a95322..d178e46e940 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -260,7 +260,8 @@ Flags: in all alerts 'Source' field. --enable-feature= ... Comma separated experimental feature names to enable.The current list of features is - promql-negative-offset and promql-at-modifier. + promql-negative-offset, promql-at-modifier and + query-pushdown. --endpoint= ... Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index b1ca77b5617..53986cfce8d 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -92,6 +92,7 @@ type QueryAPI struct { enableTargetPartialResponse bool enableMetricMetadataPartialResponse bool enableExemplarPartialResponse bool + enableQueryPushdown bool disableCORS bool replicaLabels []string @@ -120,6 +121,7 @@ func NewQueryAPI( enableTargetPartialResponse bool, enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, + enableQueryPushdown bool, replicaLabels []string, flagsMap map[string]string, defaultRangeQueryStep time.Duration, @@ -146,6 +148,7 @@ func NewQueryAPI( enableTargetPartialResponse: enableTargetPartialResponse, enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse, enableExemplarPartialResponse: enableExemplarPartialResponse, + enableQueryPushdown: enableQueryPushdown, replicaLabels: replicaLabels, endpointStatus: endpointStatus, defaultRangeQueryStep: defaultRangeQueryStep, @@ -342,7 +345,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) + qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), ts) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err} } @@ -459,7 +462,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap defer span.Finish() qry, err := qe.NewRangeQuery( - qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), + qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), start, end, @@ -532,7 +535,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true). + q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true). Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} @@ -619,7 +622,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr } - q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true). + q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true). Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} @@ -669,7 +672,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true). + q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true). Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 0684efd2c8e..5de0f90a822 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -33,7 +33,7 @@ import ( // replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy. -type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable +type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { @@ -41,7 +41,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), ).NewHistogram(gate.DurationHistogramOpts) - return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { + return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable { return &queryable{ logger: logger, replicaLabels: replicaLabels, @@ -56,6 +56,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto }, maxConcurrentSelects: maxConcurrentSelects, selectTimeout: selectTimeout, + enableQueryPushdown: enableQueryPushdown, } } } @@ -72,11 +73,12 @@ type queryable struct { gateProviderFn func() gate.Gate maxConcurrentSelects int selectTimeout time.Duration + enableQueryPushdown bool } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil } type querier struct { @@ -90,6 +92,7 @@ type querier struct { deduplicate bool maxResolutionMillis int64 partialResponse bool + enableQueryPushdown bool skipChunks bool selectGate gate.Gate selectTimeout time.Duration @@ -106,7 +109,7 @@ func newQuerier( proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, - partialResponse, skipChunks bool, + partialResponse, enableQueryPushdown bool, skipChunks bool, selectGate gate.Gate, selectTimeout time.Duration, ) *querier { @@ -135,6 +138,7 @@ func newQuerier( maxResolutionMillis: maxResolutionMillis, partialResponse: partialResponse, skipChunks: skipChunks, + enableQueryPushdown: enableQueryPushdown, } } @@ -193,6 +197,20 @@ func aggrsFromFunc(f string) []storepb.Aggr { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} } +func storeHintsFromPromHints(hints *storage.SelectHints) storepb.QueryHints { + return storepb.QueryHints{ + StepMillis: hints.Step, + Func: &storepb.Func{ + Name: hints.Func, + }, + Grouping: &storepb.Grouping{ + By: hints.By, + Labels: hints.Grouping, + }, + Range: &storepb.Range{Millis: hints.Range}, + } +} + func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { if hints == nil { hints = &storage.SelectHints{ @@ -268,12 +286,17 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . // TODO(bwplotka): Use inprocess gRPC. resp := &seriesServer{ctx: ctx} + var queryHints storepb.QueryHints + if q.enableQueryPushdown { + queryHints = storeHintsFromPromHints(hints) + } if err := q.proxy.Series(&storepb.SeriesRequest{ MinTime: hints.Start, MaxTime: hints.End, Matchers: sms, MaxResolutionWindow: q.maxResolutionMillis, Aggregates: aggrs, + QueryHints: &queryHints, PartialResponseDisabled: !q.partialResponse, SkipChunks: q.skipChunks, Step: hints.Step, diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index be5dbd7fddd..bbeb1540174 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false) + queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false, false) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -71,7 +71,7 @@ func TestQuerier_DownsampledData(t *testing.T) { } timeout := 10 * time.Second - q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false) + q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false, false) engine := promql.NewEngine( promql.EngineOpts{ MaxSamples: math.MaxInt32, @@ -362,7 +362,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout) }, } t.Cleanup(func() { @@ -606,7 +606,7 @@ func TestQuerier_Select(t *testing.T) { {dedup: true, expected: []series{tcase.expectedAfterDedup}}, } { g := gate.New(2) - q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { @@ -835,7 +835,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -905,7 +905,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index b381e318e32..3b1d19d0049 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -54,7 +54,7 @@ func TestQuerier_Proxy(t *testing.T) { name: fmt.Sprintf("store number %v", i), }) } - return q(true, nil, nil, 0, false, false) + return q(true, nil, nil, 0, false, false, false) } for _, fn := range files { diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 69010107e76..cb7219a1f8f 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -7,6 +7,8 @@ import ( "bytes" "context" "fmt" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/timestamp" "io" "io/ioutil" "net/http" @@ -27,9 +29,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/promclient" @@ -38,6 +37,8 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/tracing" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // PrometheusStore implements the store node API on top of the Prometheus remote read API. @@ -179,6 +180,10 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return nil } + if r.QueryHints != nil && r.QueryHints.IsSafeToExecute() { + return p.queryPrometheus(s, r) + } + q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} for _, m := range matchers { pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value} @@ -220,18 +225,78 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset) } -func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error { - ctx := s.Context() +func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error { + var matrix model.Matrix + + opts := promclient.QueryOptions{} + step := r.QueryHints.StepMillis / 1000 + if step != 0 { + result, _, err := p.client.QueryRange(s.Context(), p.base, r.ToPromQL(), r.MinTime, r.MaxTime, step, opts) + if err != nil { + return err + } + matrix = result + } else { + vector, _, err := p.client.QueryInstant(s.Context(), p.base, r.ToPromQL(), timestamp.Time(r.MaxTime), opts) + if err != nil { + return err + } + matrix = make(model.Matrix, 0, len(vector)) + for _, sample := range vector { + matrix = append(matrix, &model.SampleStream{ + Metric: sample.Metric, + Values: []model.SamplePair{ + { + Timestamp: sample.Timestamp, + Value: sample.Value, + }, + }, + }) + } + } + + externalLbls := p.externalLabelsFn() + for _, vector := range matrix { + lbls := make([]labels.Label, 0, len(externalLbls)+len(vector.Metric)) + // Attach labels from samples. + for k, v := range vector.Metric { + lbls = append(lbls, labels.FromStrings(string(k), string(v))...) + } + // Attach external labels for compatibility with remote read. + lbls = append(lbls, externalLbls...) + + series := &prompb.TimeSeries{ + Labels: labelpb.ZLabelsFromPromLabels(lbls), + Samples: prompb.SamplesFromSamplePairs(vector.Values), + } + + chks, err := p.chunkSamples(series, MaxSamplesPerChunk) + if err != nil { + return err + } + + if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ + Labels: series.Labels, + Chunks: chks, + })); err != nil { + return err + } + } + + return nil +} + +func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") - resp, err := p.fetchSampledResponse(ctx, httpResp) + resp, err := p.fetchSampledResponse(s.Context(), httpResp) querySpan.Finish() if err != nil { return err } - span, _ := tracing.StartSpan(ctx, "transform_and_respond") + span, _ := tracing.StartSpan(s.Context(), "transform_and_respond") defer span.Finish() span.SetTag("series_count", len(resp.Results[0].Timeseries)) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c6b0c098d86..330cab85b6c 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -275,6 +275,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe Aggregates: r.Aggregates, MaxResolutionWindow: r.MaxResolutionWindow, SkipChunks: r.SkipChunks, + QueryHints: r.QueryHints, PartialResponseDisabled: r.PartialResponseDisabled, } wg = &sync.WaitGroup{} diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index d62b82dc3f7..eaa96f1ede6 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -517,3 +517,27 @@ func (c *SeriesStatsCounter) Count(series *Series) { } } } + +func (m *SeriesRequest) ToPromQL() string { + return m.QueryHints.toPromQL(m.Matchers) +} + +// IsSafeToExecute returns true if the function or aggregation from the query hint +// can be safely executed by the underlying Prometheus instance without affecting the +// result of the query. +func (m *QueryHints) IsSafeToExecute() bool { + distributiveOperations := []string{ + "max", + "max_over_time", + "min", + "min_over_time", + "group", + } + for _, op := range distributiveOperations { + if m.Func.Name == op { + return true + } + } + + return false +} diff --git a/pkg/store/storepb/custom_test.go b/pkg/store/storepb/custom_test.go index 8aae81ab22d..6930b188fff 100644 --- a/pkg/store/storepb/custom_test.go +++ b/pkg/store/storepb/custom_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sort" "testing" + "time" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" @@ -526,3 +527,144 @@ func TestMatchersToString_Translate(t *testing.T) { } } + +func TestSeriesRequestToPromQL(t *testing.T) { + ts := []struct { + name string + r *SeriesRequest + expected string + }{ + { + name: "Single matcher regular expression", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + }, + }, + expected: `max ({namespace=~"kube-.+"})`, + }, + { + name: "Single matcher regular expression with grouping", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + Grouping: &Grouping{ + By: false, + Labels: []string{"container", "pod"}, + }, + }, + }, + expected: `max without (container,pod) ({namespace=~"kube-.+"})`, + }, + { + name: "Multiple matchers with grouping", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_EQ, + Name: "__name__", + Value: "kube_pod_info", + }, + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + Grouping: &Grouping{ + By: false, + Labels: []string{"container", "pod"}, + }, + }, + }, + expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"})`, + }, + { + name: "Query with vector range selector", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_EQ, + Name: "__name__", + Value: "kube_pod_info", + }, + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max_over_time", + }, + Range: &Range{ + Millis: 10 * time.Minute.Milliseconds(), + }, + }, + }, + expected: `max_over_time ({__name__="kube_pod_info", namespace=~"kube-.+"}[600000ms])`, + }, + { + name: "Query with grouping and vector range selector", + r: &SeriesRequest{ + Matchers: []LabelMatcher{ + { + Type: LabelMatcher_EQ, + Name: "__name__", + Value: "kube_pod_info", + }, + { + Type: LabelMatcher_RE, + Name: "namespace", + Value: "kube-.+", + }, + }, + QueryHints: QueryHints{ + Func: &Func{ + Name: "max", + }, + Grouping: &Grouping{ + By: false, + Labels: []string{"container", "pod"}, + }, + Range: &Range{ + Millis: 10 * time.Minute.Milliseconds(), + }, + }, + }, + expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"}[600000ms])`, + }, + } + + for _, tc := range ts { + t.Run(tc.name, func(t *testing.T) { + actual := tc.r.ToPromQL() + if tc.expected != actual { + t.Fatalf("invalid promql result, got %s, want %s", actual, tc.expected) + } + }) + } +} diff --git a/pkg/store/storepb/prompb/samples.go b/pkg/store/storepb/prompb/samples.go new file mode 100644 index 00000000000..4ae1da98f09 --- /dev/null +++ b/pkg/store/storepb/prompb/samples.go @@ -0,0 +1,20 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package prompb + +import "github.com/prometheus/common/model" + +// SamplesFromSamplePairs converts a slice of model.SamplePair +// to a slice of Sample. +func SamplesFromSamplePairs(samples []model.SamplePair) []Sample { + result := make([]Sample, 0, len(samples)) + for _, s := range samples { + result = append(result, Sample{ + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }) + } + + return result +} diff --git a/pkg/store/storepb/query_hints.go b/pkg/store/storepb/query_hints.go new file mode 100644 index 00000000000..956481a584a --- /dev/null +++ b/pkg/store/storepb/query_hints.go @@ -0,0 +1,48 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb + +import ( + "fmt" + "strings" +) + +func (m *QueryHints) toPromQL(labelMatchers []LabelMatcher) string { + grouping := m.Grouping.toPromQL() + matchers := MatchersToString(labelMatchers...) + queryRange := m.Range.toPromQL() + + query := fmt.Sprintf("%s %s (%s%s)", m.Func.Name, grouping, matchers, queryRange) + // Remove double spaces if some expressions are missing. + return strings.Join(strings.Fields(query), " ") +} + +func (m *Grouping) toPromQL() string { + if m == nil { + return "" + } + + if len(m.Labels) == 0 { + return "" + } + var op string + if m.By { + op = "by" + } else { + op = "without" + } + + return fmt.Sprintf("%s (%s)", op, strings.Join(m.Labels, ",")) +} + +func (m *Range) toPromQL() string { + if m == nil { + return "" + } + + if m.Millis == 0 { + return "" + } + return fmt.Sprintf("[%dms]", m.Millis) +} diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 745686a208b..bc2e98b2725 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -281,9 +281,14 @@ type SeriesRequest struct { // implementation of a specific store. Hints *types.Any `protobuf:"bytes,9,opt,name=hints,proto3" json:"hints,omitempty"` // Query step size in milliseconds. + // Deprecated: Use query_hints instead. Step int64 `protobuf:"varint,10,opt,name=step,proto3" json:"step,omitempty"` // Range vector selector range in milliseconds. + // Deprecated: Use query_hints instead. Range int64 `protobuf:"varint,11,opt,name=range,proto3" json:"range,omitempty"` + // query_hints are the hints coming from the PromQL engine when + // requesting a storage.SeriesSet for a given expression. + QueryHints *QueryHints `protobuf:"bytes,12,opt,name=query_hints,json=queryHints,proto3" json:"query_hints,omitempty"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } @@ -319,6 +324,166 @@ func (m *SeriesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_SeriesRequest proto.InternalMessageInfo +// Analogous to storage.SelectHints. +type QueryHints struct { + // Query step size in milliseconds. + StepMillis int64 `protobuf:"varint,1,opt,name=step_millis,json=stepMillis,proto3" json:"step_millis,omitempty"` + // The surrounding function or aggregation. + Func *Func `protobuf:"bytes,2,opt,name=func,proto3" json:"func,omitempty"` + // The grouping expression + Grouping *Grouping `protobuf:"bytes,4,opt,name=grouping,proto3" json:"grouping,omitempty"` + // Range vector selector. + Range *Range `protobuf:"bytes,5,opt,name=range,proto3" json:"range,omitempty"` +} + +func (m *QueryHints) Reset() { *m = QueryHints{} } +func (m *QueryHints) String() string { return proto.CompactTextString(m) } +func (*QueryHints) ProtoMessage() {} +func (*QueryHints) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{5} +} +func (m *QueryHints) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryHints) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryHints.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryHints) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryHints.Merge(m, src) +} +func (m *QueryHints) XXX_Size() int { + return m.Size() +} +func (m *QueryHints) XXX_DiscardUnknown() { + xxx_messageInfo_QueryHints.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryHints proto.InternalMessageInfo + +type Func struct { + // The function or aggregation name + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` +} + +func (m *Func) Reset() { *m = Func{} } +func (m *Func) String() string { return proto.CompactTextString(m) } +func (*Func) ProtoMessage() {} +func (*Func) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{6} +} +func (m *Func) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Func) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Func.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Func) XXX_Merge(src proto.Message) { + xxx_messageInfo_Func.Merge(m, src) +} +func (m *Func) XXX_Size() int { + return m.Size() +} +func (m *Func) XXX_DiscardUnknown() { + xxx_messageInfo_Func.DiscardUnknown(m) +} + +var xxx_messageInfo_Func proto.InternalMessageInfo + +type Grouping struct { + // Indicate whether it is without or by. + By bool `protobuf:"varint,1,opt,name=by,proto3" json:"by,omitempty"` + // List of label names used in the grouping. + Labels []string `protobuf:"bytes,3,rep,name=labels,proto3" json:"labels,omitempty"` +} + +func (m *Grouping) Reset() { *m = Grouping{} } +func (m *Grouping) String() string { return proto.CompactTextString(m) } +func (*Grouping) ProtoMessage() {} +func (*Grouping) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{7} +} +func (m *Grouping) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Grouping) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Grouping.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Grouping) XXX_Merge(src proto.Message) { + xxx_messageInfo_Grouping.Merge(m, src) +} +func (m *Grouping) XXX_Size() int { + return m.Size() +} +func (m *Grouping) XXX_DiscardUnknown() { + xxx_messageInfo_Grouping.DiscardUnknown(m) +} + +var xxx_messageInfo_Grouping proto.InternalMessageInfo + +type Range struct { + Millis int64 `protobuf:"varint,1,opt,name=millis,proto3" json:"millis,omitempty"` +} + +func (m *Range) Reset() { *m = Range{} } +func (m *Range) String() string { return proto.CompactTextString(m) } +func (*Range) ProtoMessage() {} +func (*Range) Descriptor() ([]byte, []int) { + return fileDescriptor_a938d55a388af629, []int{8} +} +func (m *Range) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Range) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Range.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Range) XXX_Merge(src proto.Message) { + xxx_messageInfo_Range.Merge(m, src) +} +func (m *Range) XXX_Size() int { + return m.Size() +} +func (m *Range) XXX_DiscardUnknown() { + xxx_messageInfo_Range.DiscardUnknown(m) +} + +var xxx_messageInfo_Range proto.InternalMessageInfo + type SeriesResponse struct { // Types that are valid to be assigned to Result: // *SeriesResponse_Series @@ -331,7 +496,7 @@ func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } func (m *SeriesResponse) String() string { return proto.CompactTextString(m) } func (*SeriesResponse) ProtoMessage() {} func (*SeriesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{5} + return fileDescriptor_a938d55a388af629, []int{9} } func (m *SeriesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -434,7 +599,7 @@ func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } func (m *LabelNamesRequest) String() string { return proto.CompactTextString(m) } func (*LabelNamesRequest) ProtoMessage() {} func (*LabelNamesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{6} + return fileDescriptor_a938d55a388af629, []int{10} } func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -476,7 +641,7 @@ func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } func (m *LabelNamesResponse) String() string { return proto.CompactTextString(m) } func (*LabelNamesResponse) ProtoMessage() {} func (*LabelNamesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{7} + return fileDescriptor_a938d55a388af629, []int{11} } func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -523,7 +688,7 @@ func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (m *LabelValuesRequest) String() string { return proto.CompactTextString(m) } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{8} + return fileDescriptor_a938d55a388af629, []int{12} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -565,7 +730,7 @@ func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (m *LabelValuesResponse) String() string { return proto.CompactTextString(m) } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a938d55a388af629, []int{9} + return fileDescriptor_a938d55a388af629, []int{13} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -602,6 +767,10 @@ func init() { proto.RegisterType((*InfoRequest)(nil), "thanos.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.InfoResponse") proto.RegisterType((*SeriesRequest)(nil), "thanos.SeriesRequest") + proto.RegisterType((*QueryHints)(nil), "thanos.QueryHints") + proto.RegisterType((*Func)(nil), "thanos.Func") + proto.RegisterType((*Grouping)(nil), "thanos.Grouping") + proto.RegisterType((*Range)(nil), "thanos.Range") proto.RegisterType((*SeriesResponse)(nil), "thanos.SeriesResponse") proto.RegisterType((*LabelNamesRequest)(nil), "thanos.LabelNamesRequest") proto.RegisterType((*LabelNamesResponse)(nil), "thanos.LabelNamesResponse") @@ -612,75 +781,84 @@ func init() { func init() { proto.RegisterFile("store/storepb/rpc.proto", fileDescriptor_a938d55a388af629) } var fileDescriptor_a938d55a388af629 = []byte{ - // 1083 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0xdf, 0x6e, 0x1a, 0xc7, - 0x17, 0x66, 0x59, 0x76, 0x81, 0x83, 0xed, 0xdf, 0x66, 0x8c, 0x9d, 0x35, 0x91, 0x30, 0x42, 0xfa, - 0x49, 0xc8, 0x4a, 0xa1, 0xa5, 0x55, 0xa4, 0x56, 0xb9, 0x01, 0x9b, 0xd6, 0x56, 0x63, 0xdc, 0x0e, - 0x26, 0x6e, 0x53, 0x55, 0x68, 0xc1, 0x93, 0x65, 0x65, 0xf6, 0x4f, 0x77, 0x86, 0xda, 0xdc, 0xb6, - 0xf7, 0x55, 0xd5, 0xa7, 0xe9, 0x23, 0xf8, 0xae, 0xb9, 0xac, 0x7a, 0x11, 0xb5, 0xf6, 0x03, 0xf4, - 0x15, 0xaa, 0x9d, 0x99, 0x05, 0xd6, 0x75, 0x92, 0x46, 0xce, 0x0d, 0x9a, 0x73, 0xbe, 0x73, 0xce, - 0x7c, 0x73, 0xbe, 0x39, 0xc3, 0xc2, 0x7d, 0xca, 0xfc, 0x90, 0x34, 0xf8, 0x6f, 0x30, 0x6c, 0x84, - 0xc1, 0xa8, 0x1e, 0x84, 0x3e, 0xf3, 0x91, 0xce, 0xc6, 0x96, 0xe7, 0xd3, 0xd2, 0x56, 0x32, 0x80, - 0xcd, 0x02, 0x42, 0x45, 0x48, 0xa9, 0x68, 0xfb, 0xb6, 0xcf, 0x97, 0x8d, 0x68, 0x25, 0xbd, 0x95, - 0x64, 0x42, 0x10, 0xfa, 0xee, 0x8d, 0x3c, 0x59, 0x72, 0x62, 0x0d, 0xc9, 0xe4, 0x26, 0x64, 0xfb, - 0xbe, 0x3d, 0x21, 0x0d, 0x6e, 0x0d, 0xa7, 0xcf, 0x1b, 0x96, 0x37, 0x13, 0x50, 0xf5, 0x7f, 0xb0, - 0x7a, 0x12, 0x3a, 0x8c, 0x60, 0x42, 0x03, 0xdf, 0xa3, 0xa4, 0xfa, 0xa3, 0x02, 0x2b, 0xd2, 0xf3, - 0xdd, 0x94, 0x50, 0x86, 0x5a, 0x00, 0xcc, 0x71, 0x09, 0x25, 0xa1, 0x43, 0xa8, 0xa9, 0x54, 0xd4, - 0x5a, 0xa1, 0xf9, 0x20, 0xca, 0x76, 0x09, 0x1b, 0x93, 0x29, 0x1d, 0x8c, 0xfc, 0x60, 0x56, 0x3f, - 0x76, 0x5c, 0xd2, 0xe3, 0x21, 0xed, 0xcc, 0xe5, 0xcb, 0xed, 0x14, 0x5e, 0x4a, 0x42, 0x9b, 0xa0, - 0x33, 0xe2, 0x59, 0x1e, 0x33, 0xd3, 0x15, 0xa5, 0x96, 0xc7, 0xd2, 0x42, 0x26, 0x64, 0x43, 0x12, - 0x4c, 0x9c, 0x91, 0x65, 0xaa, 0x15, 0xa5, 0xa6, 0xe2, 0xd8, 0xac, 0xae, 0x42, 0xe1, 0xc0, 0x7b, - 0xee, 0x4b, 0x0e, 0xd5, 0x5f, 0xd2, 0xb0, 0x22, 0x6c, 0xc1, 0x12, 0x8d, 0x40, 0xe7, 0x07, 0x8d, - 0x09, 0xad, 0xd6, 0x45, 0x63, 0xeb, 0x4f, 0x22, 0x6f, 0xfb, 0x71, 0x44, 0xe1, 0x8f, 0x97, 0xdb, - 0x1f, 0xd9, 0x0e, 0x1b, 0x4f, 0x87, 0xf5, 0x91, 0xef, 0x36, 0x44, 0xc0, 0x7b, 0x8e, 0x2f, 0x57, - 0x8d, 0xe0, 0xcc, 0x6e, 0x24, 0x7a, 0x56, 0x7f, 0xc6, 0xb3, 0xb1, 0x2c, 0x8d, 0xb6, 0x20, 0xe7, - 0x3a, 0xde, 0x20, 0x3a, 0x08, 0x27, 0xae, 0xe2, 0xac, 0xeb, 0x78, 0xd1, 0x49, 0x39, 0x64, 0x5d, - 0x08, 0x48, 0x52, 0x77, 0xad, 0x0b, 0x0e, 0x35, 0x20, 0xcf, 0xab, 0x1e, 0xcf, 0x02, 0x62, 0x66, - 0x2a, 0x4a, 0x6d, 0xad, 0x79, 0x2f, 0x66, 0xd7, 0x8b, 0x01, 0xbc, 0x88, 0x41, 0x8f, 0x00, 0xf8, - 0x86, 0x03, 0x4a, 0x18, 0x35, 0x35, 0x7e, 0x9e, 0x79, 0x86, 0xa0, 0xd4, 0x23, 0x4c, 0xb6, 0x35, - 0x3f, 0x91, 0x36, 0xad, 0xfe, 0xad, 0xc2, 0xaa, 0x68, 0x79, 0x2c, 0xd5, 0x32, 0x61, 0xe5, 0xd5, - 0x84, 0xd3, 0x49, 0xc2, 0x8f, 0x22, 0x88, 0x8d, 0xc6, 0x24, 0xa4, 0xa6, 0xca, 0x77, 0x2f, 0x26, - 0xba, 0x79, 0x28, 0x40, 0x49, 0x60, 0x1e, 0x8b, 0x9a, 0xb0, 0x11, 0x95, 0x0c, 0x09, 0xf5, 0x27, - 0x53, 0xe6, 0xf8, 0xde, 0xe0, 0xdc, 0xf1, 0x4e, 0xfd, 0x73, 0x7e, 0x68, 0x15, 0xaf, 0xbb, 0xd6, - 0x05, 0x9e, 0x63, 0x27, 0x1c, 0x42, 0x0f, 0x01, 0x2c, 0xdb, 0x0e, 0x89, 0x6d, 0x31, 0x22, 0xce, - 0xba, 0xd6, 0x5c, 0x89, 0x77, 0x6b, 0xd9, 0x76, 0x88, 0x97, 0x70, 0xf4, 0x09, 0x6c, 0x05, 0x56, - 0xc8, 0x1c, 0x6b, 0x12, 0xed, 0xc2, 0x95, 0x1f, 0x9c, 0x3a, 0xd4, 0x1a, 0x4e, 0xc8, 0xa9, 0xa9, - 0x57, 0x94, 0x5a, 0x0e, 0xdf, 0x97, 0x01, 0xf1, 0xcd, 0xd8, 0x93, 0x30, 0xfa, 0xe6, 0x96, 0x5c, - 0xca, 0x42, 0x8b, 0x11, 0x7b, 0x66, 0x66, 0xb9, 0x2c, 0xdb, 0xf1, 0xc6, 0x5f, 0x24, 0x6b, 0xf4, - 0x64, 0xd8, 0xbf, 0x8a, 0xc7, 0x00, 0xda, 0x86, 0x02, 0x3d, 0x73, 0x82, 0xc1, 0x68, 0x3c, 0xf5, - 0xce, 0xa8, 0x99, 0xe3, 0x54, 0x20, 0x72, 0xed, 0x72, 0x0f, 0xda, 0x01, 0x6d, 0xec, 0x78, 0x8c, - 0x9a, 0xf9, 0x8a, 0xc2, 0x1b, 0x2a, 0x26, 0xb0, 0x1e, 0x4f, 0x60, 0xbd, 0xe5, 0xcd, 0xb0, 0x08, - 0x41, 0x08, 0x32, 0x94, 0x91, 0xc0, 0x04, 0xde, 0x36, 0xbe, 0x46, 0x45, 0xd0, 0x42, 0xcb, 0xb3, - 0x89, 0x59, 0xe0, 0x4e, 0x61, 0x54, 0x7f, 0x52, 0x60, 0x2d, 0x56, 0x5c, 0x0e, 0x42, 0x0d, 0xf4, - 0xf9, 0x64, 0x46, 0x3b, 0xad, 0xcd, 0xaf, 0x1a, 0xf7, 0xee, 0xa7, 0xb0, 0xc4, 0x51, 0x09, 0xb2, - 0xe7, 0x56, 0xe8, 0x39, 0x9e, 0x2d, 0xa6, 0x70, 0x3f, 0x85, 0x63, 0x07, 0x7a, 0x18, 0xd3, 0x55, - 0x5f, 0x4d, 0x77, 0x3f, 0x25, 0x09, 0xb7, 0x73, 0xa0, 0x87, 0x84, 0x4e, 0x27, 0xac, 0xfa, 0x6b, - 0x1a, 0xee, 0xf1, 0x3b, 0xd2, 0xb5, 0xdc, 0xc5, 0x35, 0x7c, 0xad, 0x6c, 0xca, 0x1d, 0x64, 0x4b, - 0xdf, 0x51, 0xb6, 0x22, 0x68, 0x94, 0x59, 0x21, 0x93, 0x23, 0x2b, 0x0c, 0x64, 0x80, 0x4a, 0xbc, - 0x53, 0x79, 0x6b, 0xa3, 0xe5, 0x42, 0x3d, 0xed, 0xcd, 0xea, 0x2d, 0x4f, 0x8f, 0xfe, 0xdf, 0xa7, - 0xa7, 0x1a, 0x02, 0x5a, 0xee, 0x9c, 0x94, 0xb3, 0x08, 0x9a, 0x17, 0x39, 0xf8, 0xb3, 0x96, 0xc7, - 0xc2, 0x40, 0x25, 0xc8, 0x49, 0xa5, 0xa8, 0x99, 0xe6, 0xc0, 0xdc, 0x5e, 0x70, 0x55, 0xdf, 0xc8, - 0xb5, 0xfa, 0x5b, 0x5a, 0x6e, 0xfa, 0xd4, 0x9a, 0x4c, 0x17, 0x7a, 0x15, 0x41, 0xe3, 0xaf, 0x0a, - 0xd7, 0x26, 0x8f, 0x85, 0xf1, 0x7a, 0x15, 0xd3, 0x77, 0x50, 0x51, 0x7d, 0x57, 0x2a, 0x66, 0x6e, - 0x51, 0x51, 0xbb, 0x45, 0x45, 0xfd, 0xed, 0x54, 0xcc, 0xbe, 0x85, 0x8a, 0x53, 0x58, 0x4f, 0x34, - 0x54, 0xca, 0xb8, 0x09, 0xfa, 0xf7, 0xdc, 0x23, 0x75, 0x94, 0xd6, 0xbb, 0x12, 0x72, 0xe7, 0x5b, - 0xc8, 0xcf, 0xff, 0x4a, 0x50, 0x01, 0xb2, 0xfd, 0xee, 0xe7, 0xdd, 0xa3, 0x93, 0xae, 0x91, 0x42, - 0x79, 0xd0, 0xbe, 0xec, 0x77, 0xf0, 0xd7, 0x86, 0x82, 0x72, 0x90, 0xc1, 0xfd, 0x27, 0x1d, 0x23, - 0x1d, 0x45, 0xf4, 0x0e, 0xf6, 0x3a, 0xbb, 0x2d, 0x6c, 0xa8, 0x51, 0x44, 0xef, 0xf8, 0x08, 0x77, - 0x8c, 0x4c, 0xe4, 0xc7, 0x9d, 0xdd, 0xce, 0xc1, 0xd3, 0x8e, 0xa1, 0x45, 0xfe, 0xbd, 0x4e, 0xbb, - 0xff, 0x99, 0xa1, 0xef, 0xb4, 0x21, 0x13, 0xbd, 0xc5, 0x28, 0x0b, 0x2a, 0x6e, 0x9d, 0x88, 0xaa, - 0xbb, 0x47, 0xfd, 0xee, 0xb1, 0xa1, 0x44, 0xbe, 0x5e, 0xff, 0xd0, 0x48, 0x47, 0x8b, 0xc3, 0x83, - 0xae, 0xa1, 0xf2, 0x45, 0xeb, 0x2b, 0x51, 0x8e, 0x47, 0x75, 0xb0, 0xa1, 0x35, 0x7f, 0x48, 0x83, - 0xc6, 0x39, 0xa2, 0x0f, 0x20, 0x13, 0xfd, 0x77, 0xa3, 0xf5, 0xb8, 0xa3, 0x4b, 0xff, 0xec, 0xa5, - 0x62, 0xd2, 0x29, 0xfb, 0xf7, 0x31, 0xe8, 0xe2, 0xfd, 0x42, 0x1b, 0xc9, 0xf7, 0x2c, 0x4e, 0xdb, - 0xbc, 0xe9, 0x16, 0x89, 0xef, 0x2b, 0x68, 0x17, 0x60, 0x31, 0x57, 0x68, 0x2b, 0xa1, 0xe2, 0xf2, - 0x2b, 0x55, 0x2a, 0xdd, 0x06, 0xc9, 0xfd, 0x3f, 0x85, 0xc2, 0x92, 0xac, 0x28, 0x19, 0x9a, 0x18, - 0x9e, 0xd2, 0x83, 0x5b, 0x31, 0x51, 0xa7, 0xd9, 0x85, 0x35, 0xfe, 0x2d, 0x15, 0x4d, 0x85, 0x68, - 0xc6, 0x63, 0x28, 0x60, 0xe2, 0xfa, 0x8c, 0x70, 0x3f, 0x9a, 0x1f, 0x7f, 0xf9, 0x93, 0xab, 0xb4, - 0x71, 0xc3, 0x2b, 0x3f, 0xcd, 0x52, 0xed, 0xff, 0x5f, 0xfe, 0x55, 0x4e, 0x5d, 0x5e, 0x95, 0x95, - 0x17, 0x57, 0x65, 0xe5, 0xcf, 0xab, 0xb2, 0xf2, 0xf3, 0x75, 0x39, 0xf5, 0xe2, 0xba, 0x9c, 0xfa, - 0xfd, 0xba, 0x9c, 0x7a, 0x96, 0x95, 0x5f, 0x87, 0x43, 0x9d, 0xdf, 0x99, 0x0f, 0xff, 0x09, 0x00, - 0x00, 0xff, 0xff, 0xa4, 0xea, 0x02, 0x9d, 0x87, 0x0a, 0x00, 0x00, + // 1230 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x5f, 0x6f, 0x13, 0x47, + 0x10, 0xf7, 0xdd, 0xf9, 0xfc, 0x67, 0x9c, 0xa4, 0xc7, 0x62, 0xe0, 0x62, 0x24, 0xc7, 0xba, 0xaa, + 0x52, 0x84, 0xa8, 0xdd, 0x9a, 0x0a, 0xa9, 0x15, 0x2f, 0x49, 0x30, 0x24, 0x2a, 0x31, 0x65, 0x9d, + 0x90, 0x96, 0xaa, 0xb2, 0xce, 0xce, 0x72, 0x3e, 0xe1, 0xfb, 0xc3, 0xed, 0x5e, 0xc1, 0xaf, 0xed, + 0x7b, 0x55, 0xf5, 0x23, 0x54, 0xfd, 0x10, 0xfd, 0x08, 0xbc, 0x95, 0xc7, 0xaa, 0x0f, 0xa8, 0x85, + 0x2f, 0x52, 0xed, 0x9f, 0xb3, 0x7d, 0x69, 0x80, 0xa2, 0xf0, 0x72, 0xda, 0x99, 0xdf, 0xec, 0xec, + 0xcc, 0xfc, 0x76, 0xe6, 0x16, 0x2e, 0x51, 0x16, 0x25, 0xa4, 0x23, 0xbe, 0xf1, 0xa8, 0x93, 0xc4, + 0xe3, 0x76, 0x9c, 0x44, 0x2c, 0x42, 0x25, 0x36, 0x71, 0xc3, 0x88, 0x36, 0xd6, 0xf3, 0x06, 0x6c, + 0x16, 0x13, 0x2a, 0x4d, 0x1a, 0x75, 0x2f, 0xf2, 0x22, 0xb1, 0xec, 0xf0, 0x95, 0xd2, 0xb6, 0xf2, + 0x1b, 0xe2, 0x24, 0x0a, 0x4e, 0xec, 0x53, 0x2e, 0xa7, 0xee, 0x88, 0x4c, 0x4f, 0x42, 0x5e, 0x14, + 0x79, 0x53, 0xd2, 0x11, 0xd2, 0x28, 0x7d, 0xd8, 0x71, 0xc3, 0x99, 0x84, 0x9c, 0x0f, 0x60, 0xf5, + 0x28, 0xf1, 0x19, 0xc1, 0x84, 0xc6, 0x51, 0x48, 0x89, 0xf3, 0xa3, 0x06, 0x2b, 0x4a, 0xf3, 0x38, + 0x25, 0x94, 0xa1, 0x2d, 0x00, 0xe6, 0x07, 0x84, 0x92, 0xc4, 0x27, 0xd4, 0xd6, 0x5a, 0xc6, 0x66, + 0xad, 0x7b, 0x99, 0xef, 0x0e, 0x08, 0x9b, 0x90, 0x94, 0x0e, 0xc7, 0x51, 0x3c, 0x6b, 0x1f, 0xf8, + 0x01, 0x19, 0x08, 0x93, 0xed, 0xe2, 0xb3, 0x17, 0x1b, 0x05, 0xbc, 0xb4, 0x09, 0x5d, 0x84, 0x12, + 0x23, 0xa1, 0x1b, 0x32, 0x5b, 0x6f, 0x69, 0x9b, 0x55, 0xac, 0x24, 0x64, 0x43, 0x39, 0x21, 0xf1, + 0xd4, 0x1f, 0xbb, 0xb6, 0xd1, 0xd2, 0x36, 0x0d, 0x9c, 0x89, 0xce, 0x2a, 0xd4, 0xf6, 0xc2, 0x87, + 0x91, 0x8a, 0xc1, 0xf9, 0x45, 0x87, 0x15, 0x29, 0xcb, 0x28, 0xd1, 0x18, 0x4a, 0x22, 0xd1, 0x2c, + 0xa0, 0xd5, 0xb6, 0x2c, 0x6c, 0xfb, 0x0e, 0xd7, 0x6e, 0xdf, 0xe0, 0x21, 0xfc, 0xf5, 0x62, 0xe3, + 0x33, 0xcf, 0x67, 0x93, 0x74, 0xd4, 0x1e, 0x47, 0x41, 0x47, 0x1a, 0x7c, 0xec, 0x47, 0x6a, 0xd5, + 0x89, 0x1f, 0x79, 0x9d, 0x5c, 0xcd, 0xda, 0x0f, 0xc4, 0x6e, 0xac, 0x5c, 0xa3, 0x75, 0xa8, 0x04, + 0x7e, 0x38, 0xe4, 0x89, 0x88, 0xc0, 0x0d, 0x5c, 0x0e, 0xfc, 0x90, 0x67, 0x2a, 0x20, 0xf7, 0xa9, + 0x84, 0x54, 0xe8, 0x81, 0xfb, 0x54, 0x40, 0x1d, 0xa8, 0x0a, 0xaf, 0x07, 0xb3, 0x98, 0xd8, 0xc5, + 0x96, 0xb6, 0xb9, 0xd6, 0x3d, 0x97, 0x45, 0x37, 0xc8, 0x00, 0xbc, 0xb0, 0x41, 0xd7, 0x01, 0xc4, + 0x81, 0x43, 0x4a, 0x18, 0xb5, 0x4d, 0x91, 0xcf, 0x7c, 0x87, 0x0c, 0x69, 0x40, 0x98, 0x2a, 0x6b, + 0x75, 0xaa, 0x64, 0xea, 0xfc, 0x56, 0x84, 0x55, 0x59, 0xf2, 0x8c, 0xaa, 0xe5, 0x80, 0xb5, 0xd7, + 0x07, 0xac, 0xe7, 0x03, 0xbe, 0xce, 0x21, 0x36, 0x9e, 0x90, 0x84, 0xda, 0x86, 0x38, 0xbd, 0x9e, + 0xab, 0xe6, 0xbe, 0x04, 0x55, 0x00, 0x73, 0x5b, 0xd4, 0x85, 0x0b, 0xdc, 0x65, 0x42, 0x68, 0x34, + 0x4d, 0x99, 0x1f, 0x85, 0xc3, 0x27, 0x7e, 0x78, 0x1c, 0x3d, 0x11, 0x49, 0x1b, 0xf8, 0x7c, 0xe0, + 0x3e, 0xc5, 0x73, 0xec, 0x48, 0x40, 0xe8, 0x2a, 0x80, 0xeb, 0x79, 0x09, 0xf1, 0x5c, 0x46, 0x64, + 0xae, 0x6b, 0xdd, 0x95, 0xec, 0xb4, 0x2d, 0xcf, 0x4b, 0xf0, 0x12, 0x8e, 0xbe, 0x80, 0xf5, 0xd8, + 0x4d, 0x98, 0xef, 0x4e, 0xf9, 0x29, 0x82, 0xf9, 0xe1, 0xb1, 0x4f, 0xdd, 0xd1, 0x94, 0x1c, 0xdb, + 0xa5, 0x96, 0xb6, 0x59, 0xc1, 0x97, 0x94, 0x41, 0x76, 0x33, 0x6e, 0x2a, 0x18, 0x7d, 0x7b, 0xca, + 0x5e, 0xca, 0x12, 0x97, 0x11, 0x6f, 0x66, 0x97, 0x05, 0x2d, 0x1b, 0xd9, 0xc1, 0x5f, 0xe5, 0x7d, + 0x0c, 0x94, 0xd9, 0x7f, 0x9c, 0x67, 0x00, 0xda, 0x80, 0x1a, 0x7d, 0xe4, 0xc7, 0xc3, 0xf1, 0x24, + 0x0d, 0x1f, 0x51, 0xbb, 0x22, 0x42, 0x01, 0xae, 0xda, 0x11, 0x1a, 0x74, 0x05, 0xcc, 0x89, 0x1f, + 0x32, 0x6a, 0x57, 0x5b, 0x9a, 0x28, 0xa8, 0xec, 0xc0, 0x76, 0xd6, 0x81, 0xed, 0xad, 0x70, 0x86, + 0xa5, 0x09, 0x42, 0x50, 0xa4, 0x8c, 0xc4, 0x36, 0x88, 0xb2, 0x89, 0x35, 0xaa, 0x83, 0x99, 0xb8, + 0xa1, 0x47, 0xec, 0x9a, 0x50, 0x4a, 0x01, 0x5d, 0x83, 0xda, 0xe3, 0x94, 0x24, 0xb3, 0xa1, 0xf4, + 0xbd, 0x22, 0x7c, 0xa3, 0x2c, 0x8b, 0x7b, 0x1c, 0xda, 0xe5, 0x08, 0x86, 0xc7, 0xf3, 0xb5, 0xf3, + 0xab, 0x06, 0xb0, 0x80, 0x44, 0xe8, 0x8c, 0xc4, 0xc3, 0xc0, 0x9f, 0x4e, 0x7d, 0xaa, 0xae, 0x09, + 0x70, 0xd5, 0xbe, 0xd0, 0xa0, 0x16, 0x14, 0x1f, 0xa6, 0xe1, 0x58, 0xdc, 0x92, 0xda, 0x82, 0x9c, + 0x5b, 0x69, 0x38, 0xc6, 0x02, 0x41, 0x57, 0xa1, 0xe2, 0x25, 0x51, 0x1a, 0xfb, 0xa1, 0x27, 0xb8, + 0xae, 0x75, 0xad, 0xcc, 0xea, 0xb6, 0xd2, 0xe3, 0xb9, 0x05, 0xfa, 0x30, 0x4b, 0xc5, 0x14, 0xa6, + 0xf3, 0x4e, 0xc5, 0x5c, 0xa9, 0x32, 0x73, 0x1a, 0x50, 0xe4, 0x07, 0xf0, 0x5a, 0x84, 0xae, 0xba, + 0xbd, 0x55, 0x2c, 0xd6, 0x4e, 0x17, 0x2a, 0x99, 0x5b, 0xb4, 0x06, 0xfa, 0x68, 0x26, 0xd0, 0x0a, + 0xd6, 0x47, 0x33, 0x3e, 0x59, 0xd4, 0x1c, 0xe0, 0x37, 0xb7, 0x9a, 0xb5, 0xae, 0xb3, 0x01, 0xa6, + 0xf0, 0xcf, 0x0d, 0x72, 0x99, 0x2a, 0xc9, 0xf9, 0x49, 0x83, 0xb5, 0xac, 0x79, 0xd4, 0x4c, 0xd9, + 0x84, 0xd2, 0x7c, 0xc8, 0xf1, 0x48, 0xd7, 0xe6, 0x5d, 0x2b, 0xb4, 0xbb, 0x05, 0xac, 0x70, 0xd4, + 0x80, 0xf2, 0x13, 0x37, 0x09, 0x79, 0xfe, 0x62, 0xa0, 0xed, 0x16, 0x70, 0xa6, 0x40, 0x57, 0x33, + 0xe6, 0x8d, 0xd7, 0x33, 0xbf, 0x5b, 0x50, 0xdc, 0x6f, 0x57, 0xa0, 0x94, 0x10, 0x9a, 0x4e, 0x99, + 0xf3, 0xbb, 0x0e, 0xe7, 0x44, 0xbb, 0xf5, 0xdd, 0x60, 0xd1, 0xd1, 0x6f, 0xec, 0x00, 0xed, 0x0c, + 0x1d, 0xa0, 0x9f, 0xb1, 0x03, 0xea, 0x60, 0x52, 0xe6, 0x26, 0x4c, 0x4d, 0x3f, 0x29, 0x20, 0x0b, + 0x0c, 0x12, 0x1e, 0xab, 0x01, 0xc0, 0x97, 0x8b, 0x46, 0x30, 0xdf, 0xde, 0x08, 0xcb, 0x83, 0xa8, + 0xf4, 0xff, 0x07, 0x91, 0x93, 0x00, 0x5a, 0xae, 0x9c, 0xa2, 0xb3, 0x0e, 0x26, 0xbf, 0x3e, 0xf2, + 0x0f, 0x51, 0xc5, 0x52, 0x40, 0x0d, 0xa8, 0x28, 0xa6, 0xa8, 0xad, 0x0b, 0x60, 0x2e, 0x2f, 0x62, + 0x35, 0xde, 0x1a, 0xab, 0xf3, 0x87, 0xae, 0x0e, 0xbd, 0xef, 0x4e, 0xd3, 0x05, 0x5f, 0x75, 0x30, + 0xc5, 0x0d, 0x54, 0x17, 0x58, 0x0a, 0x6f, 0x66, 0x51, 0x3f, 0x03, 0x8b, 0xc6, 0xfb, 0x62, 0xb1, + 0x78, 0x0a, 0x8b, 0xe6, 0x29, 0x2c, 0x96, 0xde, 0x8d, 0xc5, 0xf2, 0x3b, 0xb0, 0x98, 0xc2, 0xf9, + 0x5c, 0x41, 0x15, 0x8d, 0x17, 0xa1, 0xf4, 0xbd, 0xd0, 0x28, 0x1e, 0x95, 0xf4, 0xbe, 0x88, 0xbc, + 0xf2, 0x1d, 0x54, 0xe7, 0x7f, 0x65, 0x54, 0x83, 0xf2, 0x61, 0xff, 0xcb, 0xfe, 0xdd, 0xa3, 0xbe, + 0x55, 0x40, 0x55, 0x30, 0xef, 0x1d, 0xf6, 0xf0, 0x37, 0x96, 0x86, 0x2a, 0x50, 0xc4, 0x87, 0x77, + 0x7a, 0x96, 0xce, 0x2d, 0x06, 0x7b, 0x37, 0x7b, 0x3b, 0x5b, 0xd8, 0x32, 0xb8, 0xc5, 0xe0, 0xe0, + 0x2e, 0xee, 0x59, 0x45, 0xae, 0xc7, 0xbd, 0x9d, 0xde, 0xde, 0xfd, 0x9e, 0x65, 0x72, 0xfd, 0xcd, + 0xde, 0xf6, 0xe1, 0x6d, 0xab, 0x74, 0x65, 0x1b, 0x8a, 0xfc, 0xb7, 0x86, 0xca, 0x60, 0xe0, 0xad, + 0x23, 0xe9, 0x75, 0xe7, 0xee, 0x61, 0xff, 0xc0, 0xd2, 0xb8, 0x6e, 0x70, 0xb8, 0x6f, 0xe9, 0x7c, + 0xb1, 0xbf, 0xd7, 0xb7, 0x0c, 0xb1, 0xd8, 0xfa, 0x5a, 0xba, 0x13, 0x56, 0x3d, 0x6c, 0x99, 0xdd, + 0x1f, 0x74, 0x30, 0x45, 0x8c, 0xe8, 0x53, 0x28, 0xf2, 0x67, 0x10, 0x3a, 0x9f, 0x55, 0x74, 0xe9, + 0x91, 0xd4, 0xa8, 0xe7, 0x95, 0xaa, 0x7e, 0x9f, 0x43, 0x49, 0xce, 0x2f, 0x74, 0x21, 0x3f, 0xcf, + 0xb2, 0x6d, 0x17, 0x4f, 0xaa, 0xe5, 0xc6, 0x4f, 0x34, 0xb4, 0x03, 0xb0, 0xe8, 0x2b, 0xb4, 0x9e, + 0x63, 0x71, 0x79, 0x4a, 0x35, 0x1a, 0xa7, 0x41, 0xea, 0xfc, 0x5b, 0x50, 0x5b, 0xa2, 0x15, 0xe5, + 0x4d, 0x73, 0xcd, 0xd3, 0xb8, 0x7c, 0x2a, 0x26, 0xfd, 0x74, 0xfb, 0xb0, 0x26, 0x9e, 0xa5, 0xbc, + 0x2b, 0x64, 0x31, 0x6e, 0x40, 0x0d, 0x93, 0x20, 0x62, 0x44, 0xe8, 0xd1, 0x3c, 0xfd, 0xe5, 0xd7, + 0x6b, 0xe3, 0xc2, 0x09, 0xad, 0x7a, 0xe5, 0x16, 0xb6, 0x3f, 0x7a, 0xf6, 0x4f, 0xb3, 0xf0, 0xec, + 0x65, 0x53, 0x7b, 0xfe, 0xb2, 0xa9, 0xfd, 0xfd, 0xb2, 0xa9, 0xfd, 0xfc, 0xaa, 0x59, 0x78, 0xfe, + 0xaa, 0x59, 0xf8, 0xf3, 0x55, 0xb3, 0xf0, 0xa0, 0xac, 0x1e, 0xda, 0xa3, 0x92, 0xb8, 0x33, 0xd7, + 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x5a, 0xb7, 0x5e, 0x64, 0xd2, 0x0b, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1180,6 +1358,18 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.QueryHints != nil { + { + size, err := m.QueryHints.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x62 + } if m.Range != 0 { i = encodeVarintRpc(dAtA, i, uint64(m.Range)) i-- @@ -1228,20 +1418,20 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x30 } if len(m.Aggregates) > 0 { - dAtA3 := make([]byte, len(m.Aggregates)*10) - var j2 int + dAtA4 := make([]byte, len(m.Aggregates)*10) + var j3 int for _, num := range m.Aggregates { for num >= 1<<7 { - dAtA3[j2] = uint8(uint64(num)&0x7f | 0x80) + dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j2++ + j3++ } - dAtA3[j2] = uint8(num) - j2++ + dAtA4[j3] = uint8(num) + j3++ } - i -= j2 - copy(dAtA[i:], dAtA3[:j2]) - i = encodeVarintRpc(dAtA, i, uint64(j2)) + i -= j3 + copy(dAtA[i:], dAtA4[:j3]) + i = encodeVarintRpc(dAtA, i, uint64(j3)) i-- dAtA[i] = 0x2a } @@ -1277,6 +1467,170 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *QueryHints) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryHints) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryHints) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Range != nil { + { + size, err := m.Range.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } + if m.Grouping != nil { + { + size, err := m.Grouping.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + if m.Func != nil { + { + size, err := m.Func.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.StepMillis != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.StepMillis)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Func) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Func) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Func) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Grouping) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Grouping) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Grouping) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Labels[iNdEx]) + copy(dAtA[i:], m.Labels[iNdEx]) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Labels[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } + if m.By { + i-- + if m.By { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Range) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Range) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Range) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Millis != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.Millis)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *SeriesResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1754,44 +2108,115 @@ func (m *SeriesRequest) Size() (n int) { if m.Range != 0 { n += 1 + sovRpc(uint64(m.Range)) } + if m.QueryHints != nil { + l = m.QueryHints.Size() + n += 1 + l + sovRpc(uint64(l)) + } return n } -func (m *SeriesResponse) Size() (n int) { +func (m *QueryHints) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.Result != nil { - n += m.Result.Size() + if m.StepMillis != 0 { + n += 1 + sovRpc(uint64(m.StepMillis)) + } + if m.Func != nil { + l = m.Func.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.Grouping != nil { + l = m.Grouping.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if m.Range != nil { + l = m.Range.Size() + n += 1 + l + sovRpc(uint64(l)) } return n } -func (m *SeriesResponse_Series) Size() (n int) { +func (m *Func) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.Series != nil { - l = m.Series.Size() + l = len(m.Name) + if l > 0 { n += 1 + l + sovRpc(uint64(l)) } return n } -func (m *SeriesResponse_Warning) Size() (n int) { + +func (m *Grouping) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.Warning) - n += 1 + l + sovRpc(uint64(l)) - return n -} -func (m *SeriesResponse_Hints) Size() (n int) { + if m.By { + n += 2 + } + if len(m.Labels) > 0 { + for _, s := range m.Labels { + l = len(s) + n += 1 + l + sovRpc(uint64(l)) + } + } + return n +} + +func (m *Range) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Millis != 0 { + n += 1 + sovRpc(uint64(m.Millis)) + } + return n +} + +func (m *SeriesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Result != nil { + n += m.Result.Size() + } + return n +} + +func (m *SeriesResponse_Series) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Series != nil { + l = m.Series.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} +func (m *SeriesResponse_Warning) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warning) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *SeriesResponse_Hints) Size() (n int) { if m == nil { return 0 } @@ -2657,6 +3082,472 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { break } } + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field QueryHints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.QueryHints == nil { + m.QueryHints = &QueryHints{} + } + if err := m.QueryHints.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryHints) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryHints: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryHints: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StepMillis", wireType) + } + m.StepMillis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StepMillis |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Func", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Func == nil { + m.Func = &Func{} + } + if err := m.Func.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Grouping", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Grouping == nil { + m.Grouping = &Grouping{} + } + if err := m.Grouping.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Range == nil { + m.Range = &Range{} + } + if err := m.Range.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Func) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Func: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Func: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Grouping) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Grouping: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Grouping: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field By", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.By = bool(v != 0) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Range) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Range: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Range: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Millis", wireType) + } + m.Millis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Millis |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 4a9240620d8..71f004463b3 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -80,18 +80,18 @@ message InfoResponse { repeated Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"]; int64 min_time = 2; int64 max_time = 3; - StoreType storeType = 4; + StoreType storeType = 4; // label_sets is an unsorted list of `ZLabelSet`s. repeated ZLabelSet label_sets = 5 [(gogoproto.nullable) = false]; } message SeriesRequest { - int64 min_time = 1; - int64 max_time = 2; + int64 min_time = 1; + int64 max_time = 2; repeated LabelMatcher matchers = 3 [(gogoproto.nullable) = false]; int64 max_resolution_window = 4; - repeated Aggr aggregates = 5; + repeated Aggr aggregates = 5; // Deprecated. Use partial_response_strategy instead. bool partial_response_disabled = 6; @@ -108,18 +108,56 @@ message SeriesRequest { google.protobuf.Any hints = 9; // Query step size in milliseconds. + // Deprecated: Use query_hints instead. int64 step = 10; // Range vector selector range in milliseconds. + // Deprecated: Use query_hints instead. int64 range = 11; + + // query_hints are the hints coming from the PromQL engine when + // requesting a storage.SeriesSet for a given expression. + QueryHints query_hints = 12; +} + +// Analogous to storage.SelectHints. +message QueryHints { + // Query step size in milliseconds. + int64 step_millis = 1; + + // The surrounding function or aggregation. + Func func = 2; + + // The grouping expression + Grouping grouping = 4; + + // Range vector selector. + Range range = 5; +} + +message Func { + // The function or aggregation name + string name = 1; +} + +message Grouping { + // Indicate whether it is without or by. + bool by = 1; + + // List of label names used in the grouping. + repeated string labels = 3; +} + +message Range { + int64 millis = 1; } enum Aggr { - RAW = 0; - COUNT = 1; - SUM = 2; - MIN = 3; - MAX = 4; + RAW = 0; + COUNT = 1; + SUM = 2; + MIN = 3; + MAX = 4; COUNTER = 5; } @@ -160,7 +198,7 @@ message LabelNamesRequest { } message LabelNamesResponse { - repeated string names = 1; + repeated string names = 1; repeated string warnings = 2; /// hints is an opaque data structure that can be used to carry additional information from @@ -190,7 +228,7 @@ message LabelValuesRequest { } message LabelValuesResponse { - repeated string values = 1; + repeated string values = 1; repeated string warnings = 2; /// hints is an opaque data structure that can be used to carry additional information from diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 7f5e4b0f37b..8eb58778a08 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -16,6 +16,12 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" + "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" "github.com/efficientgo/e2e" @@ -609,6 +615,159 @@ config: } } +type fakeMetricSample struct { + label string + value int64 +} + +func newSample(s fakeMetricSample) model.Sample { + return model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + "__name__": "my_fake_metric", + "instance": model.LabelValue(s.label), + }, + Value: model.SampleValue(s.value), + Timestamp: model.Now(), + } +} + +func TestSidecarQueryEvaluation(t *testing.T) { + t.Parallel() + + ts := []struct { + prom1Samples []fakeMetricSample + prom2Samples []fakeMetricSample + query string + result model.Vector + }{ + { + query: "max (my_fake_metric)", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}, {"i3", 9}}, + prom2Samples: []fakeMetricSample{{"i1", 3}, {"i2", 4}, {"i3", 10}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{}, + Value: 10, + }, + }, + }, + { + query: "max by (instance) (my_fake_metric)", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}, {"i3", 9}}, + prom2Samples: []fakeMetricSample{{"i1", 3}, {"i2", 4}, {"i3", 10}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1"}, + Value: 3, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2"}, + Value: 5, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i3"}, + Value: 10, + }, + }, + }, + { + query: "group by (instance) (my_fake_metric)", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}, {"i3", 9}}, + prom2Samples: []fakeMetricSample{{"i1", 3}, {"i2", 4}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i3"}, + Value: 1, + }, + }, + }, + { + query: "max_over_time(my_fake_metric[10m])", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}}, + prom2Samples: []fakeMetricSample{{"i1", 3}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p1"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p2"}, + Value: 3, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2", "prometheus": "p1"}, + Value: 5, + }, + }, + }, + { + query: "min_over_time(my_fake_metric[10m])", + prom1Samples: []fakeMetricSample{{"i1", 1}, {"i2", 5}}, + prom2Samples: []fakeMetricSample{{"i1", 3}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p1"}, + Value: 1, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p2"}, + Value: 3, + }, + { + Metric: map[model.LabelName]model.LabelValue{"instance": "i2", "prometheus": "p1"}, + Value: 5, + }, + }, + }, + } + + for _, tc := range ts { + t.Run(tc.query, func(t *testing.T) { + e, err := e2e.NewDockerEnvironment("e2e_test_query_pushdown") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "p1", defaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "remote-write-receiver") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) + + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "p2", defaultPromConfig("p2", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "remote-write-receiver") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2)) + + endpoints := []string{ + sidecar1.InternalEndpoint("grpc"), + sidecar2.InternalEndpoint("grpc"), + } + q, err := e2ethanos. + NewQuerierBuilder(e, "1", endpoints...). + WithEnabledFeatures([]string{"query-pushdown"}). + Build() + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, synthesizeSamples(ctx, prom1, tc.prom1Samples)) + testutil.Ok(t, synthesizeSamples(ctx, prom2, tc.prom2Samples)) + + testQuery := func() string { return tc.query } + queryAndAssert(t, ctx, q.Endpoint("http"), testQuery, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, tc.result) + }) + } +} + func checkNetworkRequests(t *testing.T, addr string) { ctx, cancel := chromedp.NewContext(context.Background()) t.Cleanup(cancel) @@ -648,7 +807,7 @@ func mustURLParse(t testing.TB, addr string) *url.URL { return u } -func instantQuery(t *testing.T, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector { +func instantQuery(t testing.TB, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector { t.Helper() fmt.Println("queryAndAssert: Waiting for", expectedSeriesLen, "results for query", q()) @@ -794,3 +953,56 @@ func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, en return nil })) } + +func synthesizeSamples(ctx context.Context, prometheus *e2e.InstrumentedRunnable, testSamples []fakeMetricSample) error { + samples := make([]model.Sample, len(testSamples)) + for i, s := range testSamples { + samples[i] = newSample(s) + } + + remoteWriteURL, err := url.Parse("http://" + prometheus.Endpoint("http") + "/api/v1/write") + if err != nil { + return err + } + + client, err := remote.NewWriteClient("remote-write-client", &remote.ClientConfig{ + URL: &config_util.URL{URL: remoteWriteURL}, + Timeout: model.Duration(30 * time.Second), + }) + if err != nil { + return err + } + + samplespb := make([]prompb.TimeSeries, 0, len(samples)) + for _, sample := range samples { + labelspb := make([]prompb.Label, 0, len(sample.Metric)) + for labelKey, labelValue := range sample.Metric { + labelspb = append(labelspb, prompb.Label{ + Name: string(labelKey), + Value: string(labelValue), + }) + } + samplespb = append(samplespb, prompb.TimeSeries{ + Labels: labelspb, + Samples: []prompb.Sample{ + { + Value: float64(sample.Value), + Timestamp: sample.Timestamp.Time().Unix() * 1000, + }, + }, + }) + } + + sample := &prompb.WriteRequest{ + Timeseries: samplespb, + } + + var buf []byte + pBuf := proto.NewBuffer(nil) + if err := pBuf.Marshal(sample); err != nil { + return err + } + + compressed := snappy.Encode(buf, pBuf.Bytes()) + return client.Store(ctx, compressed) +}