Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: fix querying with interleaved data #5035

Merged
merged 2 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
promgate "github.com/prometheus/prometheus/util/gate"
Expand Down Expand Up @@ -310,6 +311,22 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
warns = append(warns, errors.New(w))
}

// Delete the metric's name from the result because that's what the
// PromQL does either way and we want our iterator to work with data
// that was either pushed down or not.
if q.enableQueryPushdown && (hints.Func == "max_over_time" || hints.Func == "min_over_time") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why promql does this. Isn't it a bug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it is a feature. Think of these functions like they are calculating the maximum over all input, not just for each unique labelset. I think this is where it happens: https://github.com/prometheus/prometheus/blob/d677aa4b29a8a0cf9f61af04bbf5bfdce893cf23/promql/engine.go#L1304-L1310.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once more functions are being pushed down, we will need to improve the logic here. Probably we will need to refactor this into a separate struct? I don't know yet

for i := range resp.seriesSet {
lbls := resp.seriesSet[i].Labels
for j, lbl := range lbls {
if lbl.Name != model.MetricNameLabel {
continue
}
resp.seriesSet[i].Labels = append(resp.seriesSet[i].Labels[:j], resp.seriesSet[i].Labels[j+1:]...)
break
}
}
}

if !q.isDedupEnabled() {
// Return data without any deduplication.
return &promSeriesSet{
Expand Down
140 changes: 140 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"path/filepath"
"sort"
"strings"
Expand All @@ -25,21 +26,27 @@ import (
"github.com/chromedp/cdproto/network"
"github.com/chromedp/chromedp"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/rules"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/objstore/s3"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

Expand Down Expand Up @@ -631,6 +638,139 @@ func newSample(s fakeMetricSample) model.Sample {
}
}

// Regression test for https://github.com/thanos-io/thanos/issues/5033.
// Tests whether queries work with mixed sources, and with functions
// that we are pushing down: min, max, min_over_time, max_over_time,
// group.
func TestSidecarStorePushdown(t *testing.T) {
t.Parallel()

// Build up.
e, err := e2e.NewDockerEnvironment("e2e_sidecar_store_pushdown")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "p1", defaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "remote-write-receiver")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended to enable remote write receiver?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test synthesizes samples by remote writing them, which is why the receiver feature is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fpetkovski. I thought it was using data from the block only. Didn't notice the remote write part.

testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1))

const bucket = "store_gateway_test"
m := e2ethanos.NewMinio(e, "thanos-minio", bucket)
testutil.Ok(t, e2e.StartAndWaitReady(m))

dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm))

series := []labels.Labels{labels.FromStrings("__name__", "my_fake_metric", "instance", "foo")}
extLset := labels.FromStrings("prometheus", "p1", "replica", "0")

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)

now := time.Now()
id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc)
testutil.Ok(t, err)

l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l, s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.Endpoint("http"),
Insecure: true,
}, "test")
testutil.Ok(t, err)

testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String()))

s1, err := e2ethanos.NewStoreGW(
e,
"1",
client.BucketConfig{
Type: client.S3,
Config: s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.InternalEndpoint("http"),
Insecure: true,
},
},
"",
)
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(s1))

q, err := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc"), sidecar1.InternalEndpoint("grpc")).WithEnabledFeatures([]string{"query-pushdown"}).Build()
testutil.Ok(t, err)

testutil.Ok(t, e2e.StartAndWaitReady(q))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(1), "thanos_blocks_meta_synced"))

testutil.Ok(t, synthesizeSamples(ctx, prom1, []fakeMetricSample{
{
label: "foo",
value: 123,
},
}))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return "max_over_time(my_fake_metric[2h])"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, []model.Metric{
{
"instance": "foo",
"prometheus": "p1",
},
})
yeya24 marked this conversation as resolved.
Show resolved Hide resolved

queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return "max(my_fake_metric) by (__name__, instance)"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, []model.Metric{
{
"instance": "foo",
"__name__": "my_fake_metric",
},
})

queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return "min_over_time(my_fake_metric[2h])"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, []model.Metric{
{
"instance": "foo",
"prometheus": "p1",
},
})

queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return "min(my_fake_metric) by (instance, __name__)"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, []model.Metric{
{
"instance": "foo",
"__name__": "my_fake_metric",
},
})

queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return "group(my_fake_metric) by (__name__, instance)"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, []model.Metric{
{
"instance": "foo",
"__name__": "my_fake_metric",
},
})

}

func TestSidecarQueryEvaluation(t *testing.T) {
t.Parallel()

Expand Down