Skip to content

Commit

Permalink
Implement query pushdown for a subset of aggregations
Browse files Browse the repository at this point in the history
Certain aggregations can be executed safely on leaf nodes without
worrying about data duplication or overlap. One such example is the max
function which can be computed on local data by the leaves before it is
computed globally by the querier.

This commit implements local aggregation in the Prometheus sidecar for
all functions which are safe to execute locally. The feature can be
enabled by passing the -evaluate-queries flag to the sidecar.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Dec 6, 2021
1 parent c0a3f14 commit f91d036
Show file tree
Hide file tree
Showing 14 changed files with 1,576 additions and 118 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased
- [#4917](https://github.com/thanos-io/thanos/pull/4917) Sidecar: Add flag `--evaluate-queries` to Thanos sidecar to enable local execution of certain queries.

### Added

Expand Down
25 changes: 25 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package main

import (
"fmt"
"net/url"
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand Down Expand Up @@ -216,3 +218,26 @@ func (ac *alertMgrConfig) registerFlag(cmd extflag.FlagClause) *alertMgrConfig {
ac.alertRelabelConfigPath = extflag.RegisterPathOrContent(cmd, "alert.relabel-config", "YAML file that contains alert relabelling configuration.", extflag.WithEnvSubstitution())
return ac
}

type sidecarFeatures struct {
features []string
}

func (c *sidecarFeatures) isEnabled(feature string) bool {
for _, f := range c.features {
if f == feature {
return true
}
}

return false
}

func (c *sidecarFeatures) registerFlag(cmd extkingpin.FlagClause) {
description := fmt.Sprintf(
"Comma separated experimental feature names to enable. The current list of features is [%s].",
strings.Join([]string{sidecarEvaluateQueriesFeature}, ", "),
)
cmd.Flag("enable-feature", description).
Default("").StringsVar(&c.features)
}
9 changes: 7 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (
"github.com/thanos-io/thanos/pkg/tls"
)

const sidecarEvaluateQueriesFeature = "evaluate-queries"

func registerSidecar(app *extkingpin.App) {
cmd := app.Command(component.Sidecar.String(), "Sidecar for Prometheus server.")
conf := &sidecarConfig{}
Expand All @@ -70,7 +72,7 @@ func registerSidecar(app *extkingpin.App) {
RetryInterval: conf.reloader.retryInterval,
})

return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts)
return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts, conf.features.isEnabled(sidecarEvaluateQueriesFeature))
})
}

Expand All @@ -84,6 +86,7 @@ func runSidecar(
conf sidecarConfig,
grpcLogOpts []grpc_logging.Option,
tagOpts []tags.Option,
evaluateQueries bool,
) error {
httpConfContentYaml, err := conf.prometheus.httpClient.Content()
if err != nil {
Expand Down Expand Up @@ -244,7 +247,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, evaluateQueries)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -476,6 +479,7 @@ type sidecarConfig struct {
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
features sidecarFeatures
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -487,6 +491,7 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
sc.objStore = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
sc.shipper.registerFlag(cmd)
sc.features.registerFlag(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
}
15 changes: 15 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) storepb.QueryHints {
return storepb.QueryHints{
StepMillis: hints.Step,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -274,6 +288,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
QueryHints: storeHintsFromPromHints(hints),
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
Step: hints.Step,
Expand Down
66 changes: 62 additions & 4 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"path"
"sort"
"strings"
"sync"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
Expand Down Expand Up @@ -54,6 +56,8 @@ type PrometheusStore struct {
remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram

evaluateQueries bool
}

// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
Expand All @@ -74,6 +78,7 @@ func NewPrometheusStore(
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
evaluateQueries bool,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -98,6 +103,7 @@ func NewPrometheusStore(
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
evaluateQueries: evaluateQueries,
}
return p, nil
}
Expand Down Expand Up @@ -179,6 +185,10 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return nil
}

if p.evaluateQueries && r.QueryHints.IsSafeToExecute() {
return p.queryPrometheus(s, r)
}

q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}
for _, m := range matchers {
pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value}
Expand Down Expand Up @@ -220,18 +230,66 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset)
}

func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error {
ctx := s.Context()
func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error {
step := math.Max(float64(r.QueryHints.StepMillis/1000), 30)
minTime := math.Max(float64(r.MinTime), float64(r.MaxTime-5*time.Minute.Milliseconds()))
opts := promclient.QueryOptions{}
matrix, _, err := p.client.QueryRange(s.Context(), p.base, r.ToPromQL(), int64(minTime), r.MaxTime, int64(step), opts)
if err != nil {
return err
}

for _, vector := range matrix {
var lbls []labels.Label
for k, v := range vector.Metric {
lbls = append(lbls, labels.Label{
Name: string(k),
Value: string(v),
})
}
// Attach external labels for compatibility with remote read
for _, lbl := range p.externalLabelsFn() {
lbls = append(lbls, lbl)
}

series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(lbls),
Samples: make([]prompb.Sample, len(vector.Values)),
}

for i, sample := range vector.Values {
series.Samples[i] = prompb.Sample{
Value: float64(sample.Value),
Timestamp: int64(sample.Timestamp),
}
}

chks, err := p.chunkSamples(series, MaxSamplesPerChunk)
if err != nil {
return err
}

if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{
Labels: series.Labels,
Chunks: chks,
})); err != nil {
return err
}
}

return nil
}

func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error {
level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.")

resp, err := p.fetchSampledResponse(ctx, httpResp)
resp, err := p.fetchSampledResponse(s.Context(), httpResp)
querySpan.Finish()
if err != nil {
return err
}

span, _ := tracing.StartSpan(ctx, "transform_and_respond")
span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
defer span.Finish()
span.SetTag("series_count", len(resp.Results[0].Timeseries))

Expand Down
12 changes: 6 additions & 6 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
limitMinT := int64(0)
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 }, nil) // Maxt does not matter.
func() (int64, int64) { return limitMinT, -1 }, nil, false) // Maxt does not matter.
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil)
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil, false)
testutil.Ok(t, err)

for _, tcase := range []struct {
Expand Down Expand Up @@ -361,7 +361,7 @@ func TestPrometheusStore_LabelAPIs(t *testing.T) {

promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels {
return extLset
}, nil, func() string { return version })
}, nil, func() string { return version }, false)
testutil.Ok(t, err)

return promStore
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
func() (int64, int64) { return 0, math.MaxInt64 }, nil, false)
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -438,7 +438,7 @@ func TestPrometheusStore_Info(t *testing.T) {

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 123, 456 }, nil)
func() (int64, int64) { return 123, 456 }, nil, false)
testutil.Ok(t, err)

resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin

proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
func() (int64, int64) { return 0, math.MaxInt64 }, nil, false)
testutil.Ok(t, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
SkipChunks: r.SkipChunks,
QueryHints: r.QueryHints,
PartialResponseDisabled: r.PartialResponseDisabled,
}
wg = &sync.WaitGroup{}
Expand Down
24 changes: 24 additions & 0 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,27 @@ func (c *SeriesStatsCounter) Count(series *Series) {
}
}
}

func (m *SeriesRequest) ToPromQL() string {
return m.QueryHints.toPromQL(m.Matchers)
}

// IsSafeToExecute returns true if the function or aggregation from the query hint
// can be safely executed by the underlying Prometheus instance without affecting the
// result of the query.
func (m *QueryHints) IsSafeToExecute() bool {
distributiveOperations := []string{
"max",
"max_over_time",
"min",
"min_over_time",
"group",
}
for _, op := range distributiveOperations {
if m.Func.Name == op {
return true
}
}

return false
}
Loading

0 comments on commit f91d036

Please sign in to comment.