From fac4e8848ff036d9db5637f44dd95e5a73ca7afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 6 Jan 2022 18:29:04 +0200 Subject: [PATCH 1/2] query: fix querying with interleaved data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix querying when some data has been pushed down and some - hasn't. Since `max_over_time` and `min_over_time` remove `__name__` from the results either way, let's do that inside Select() to have a unified form of data. Add test to cover this case. Without this code, the test fails: ``` level=error ts=2022-01-06T16:28:37.959956685Z msg="function failed. Retrying in next tick" err="read query instant response: expected 2xx response, got 422. Body: {\"status\":\"error\",\"errorType\":\"execution\",\"error\":\"vector cannot contain metrics with the same labelset\"}\n" ``` Signed-off-by: Giedrius Statkevičius --- pkg/query/querier.go | 17 ++++++++ test/e2e/query_test.go | 94 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/pkg/query/querier.go b/pkg/query/querier.go index b8cc8cf48b..3470a7c40b 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" promgate "github.com/prometheus/prometheus/util/gate" @@ -310,6 +311,22 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . warns = append(warns, errors.New(w)) } + // Delete the metric's name from the result because that's what the + // PromQL does either way and we want our iterator to work with data + // that was either pushed down or not. + if q.enableQueryPushdown && (hints.Func == "max_over_time" || hints.Func == "min_over_time") { + for i := range resp.seriesSet { + lbls := resp.seriesSet[i].Labels + for j, lbl := range lbls { + if lbl.Name != model.MetricNameLabel { + continue + } + resp.seriesSet[i].Labels = append(resp.seriesSet[i].Labels[:j], resp.seriesSet[i].Labels[j+1:]...) + break + } + } + } + if !q.isDedupEnabled() { // Return data without any deduplication. return &promSeriesSet{ diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 8eb58778a0..4e9004c775 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -10,6 +10,7 @@ import ( "net/http/httptest" "net/url" "os" + "path" "path/filepath" "sort" "strings" @@ -25,6 +26,7 @@ import ( "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" "github.com/efficientgo/e2e" + e2edb "github.com/efficientgo/e2e/db" "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -32,14 +34,19 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/rules" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/metadata/metadatapb" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/client" + "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/targets/targetspb" "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -631,6 +638,93 @@ func newSample(s fakeMetricSample) model.Sample { } } +// Regression test for https://github.com/thanos-io/thanos/issues/5033. +// Tests whether queries work with mixed sources w/ max_over_time and +// min_over_time. +func TestSidecarStorePushdown(t *testing.T) { + t.Parallel() + + // Build up. + e, err := e2e.NewDockerEnvironment("e2e_sidecar_store_pushdown") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "p1", defaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "remote-write-receiver") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) + + const bucket = "store_gateway_test" + m := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) + + series := []labels.Labels{labels.FromStrings("__name__", "my_fake_metric", "instance", "foo")} + extLset := labels.FromStrings("prometheus", "p1", "replica", "0") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + now := time.Now() + id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, s3.Config{ + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: m.Endpoint("http"), + Insecure: true, + }, "test") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) + + s1, err := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: s3.Config{ + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: m.InternalEndpoint("http"), + Insecure: true, + }, + }, + "", + ) + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(s1)) + + q, err := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc"), sidecar1.InternalEndpoint("grpc")).WithEnabledFeatures([]string{"query-pushdown"}).Build() + testutil.Ok(t, err) + + testutil.Ok(t, e2e.StartAndWaitReady(q)) + testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(1), "thanos_blocks_meta_synced")) + + testutil.Ok(t, synthesizeSamples(ctx, prom1, []fakeMetricSample{ + { + label: "foo", + value: 123, + }, + })) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { + return "max_over_time(my_fake_metric[2h])" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, []model.Metric{ + { + "instance": "foo", + "prometheus": "p1", + }, + }) +} + func TestSidecarQueryEvaluation(t *testing.T) { t.Parallel() From 203f449843681d05157030bec3cfa094f52a9fe7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 7 Jan 2022 12:10:01 +0200 Subject: [PATCH 2/2] e2e: add more tricky interleaved data test cases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- test/e2e/query_test.go | 50 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 4e9004c775..4f83d8fc4b 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -639,8 +639,9 @@ func newSample(s fakeMetricSample) model.Sample { } // Regression test for https://github.com/thanos-io/thanos/issues/5033. -// Tests whether queries work with mixed sources w/ max_over_time and -// min_over_time. +// Tests whether queries work with mixed sources, and with functions +// that we are pushing down: min, max, min_over_time, max_over_time, +// group. func TestSidecarStorePushdown(t *testing.T) { t.Parallel() @@ -723,6 +724,51 @@ func TestSidecarStorePushdown(t *testing.T) { "prometheus": "p1", }, }) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { + return "max(my_fake_metric) by (__name__, instance)" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, []model.Metric{ + { + "instance": "foo", + "__name__": "my_fake_metric", + }, + }) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { + return "min_over_time(my_fake_metric[2h])" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, []model.Metric{ + { + "instance": "foo", + "prometheus": "p1", + }, + }) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { + return "min(my_fake_metric) by (instance, __name__)" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, []model.Metric{ + { + "instance": "foo", + "__name__": "my_fake_metric", + }, + }) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { + return "group(my_fake_metric) by (__name__, instance)" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, []model.Metric{ + { + "instance": "foo", + "__name__": "my_fake_metric", + }, + }) + } func TestSidecarQueryEvaluation(t *testing.T) {