Skip to content

Commit

Permalink
query: fix querying with interleaved data
Browse files Browse the repository at this point in the history
Fix querying when some data has been pushed down and some - hasn't.
Since `max_over_time` and `min_over_time` remove `__name__` from the
results either way, let's do that inside Select() to have a unified form
of data.

Add test to cover this case.

Without this code, the test fails:

```
level=error ts=2022-01-06T16:28:37.959956685Z msg="function failed. Retrying in next tick" err="read query instant response: expected 2xx response, got 422. Body: {\"status\":\"error\",\"errorType\":\"execution\",\"error\":\"vector cannot contain metrics with the same labelset\"}\n"
```

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Jan 6, 2022
1 parent e03452e commit fac4e88
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
17 changes: 17 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
promgate "github.com/prometheus/prometheus/util/gate"
Expand Down Expand Up @@ -310,6 +311,22 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
warns = append(warns, errors.New(w))
}

// Delete the metric's name from the result because that's what the
// PromQL does either way and we want our iterator to work with data
// that was either pushed down or not.
if q.enableQueryPushdown && (hints.Func == "max_over_time" || hints.Func == "min_over_time") {
for i := range resp.seriesSet {
lbls := resp.seriesSet[i].Labels
for j, lbl := range lbls {
if lbl.Name != model.MetricNameLabel {
continue
}
resp.seriesSet[i].Labels = append(resp.seriesSet[i].Labels[:j], resp.seriesSet[i].Labels[j+1:]...)
break
}
}
}

if !q.isDedupEnabled() {
// Return data without any deduplication.
return &promSeriesSet{
Expand Down
94 changes: 94 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"path/filepath"
"sort"
"strings"
Expand All @@ -25,21 +26,27 @@ import (
"github.com/chromedp/cdproto/network"
"github.com/chromedp/chromedp"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/rules"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/objstore/s3"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

Expand Down Expand Up @@ -631,6 +638,93 @@ func newSample(s fakeMetricSample) model.Sample {
}
}

// Regression test for https://github.com/thanos-io/thanos/issues/5033.
// Tests whether queries work with mixed sources w/ max_over_time and
// min_over_time.
func TestSidecarStorePushdown(t *testing.T) {
t.Parallel()

// Build up.
e, err := e2e.NewDockerEnvironment("e2e_sidecar_store_pushdown")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "p1", defaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "remote-write-receiver")
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1))

const bucket = "store_gateway_test"
m := e2ethanos.NewMinio(e, "thanos-minio", bucket)
testutil.Ok(t, e2e.StartAndWaitReady(m))

dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm))

series := []labels.Labels{labels.FromStrings("__name__", "my_fake_metric", "instance", "foo")}
extLset := labels.FromStrings("prometheus", "p1", "replica", "0")

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)

now := time.Now()
id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc)
testutil.Ok(t, err)

l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l, s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.Endpoint("http"),
Insecure: true,
}, "test")
testutil.Ok(t, err)

testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String()))

s1, err := e2ethanos.NewStoreGW(
e,
"1",
client.BucketConfig{
Type: client.S3,
Config: s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.InternalEndpoint("http"),
Insecure: true,
},
},
"",
)
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(s1))

q, err := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc"), sidecar1.InternalEndpoint("grpc")).WithEnabledFeatures([]string{"query-pushdown"}).Build()
testutil.Ok(t, err)

testutil.Ok(t, e2e.StartAndWaitReady(q))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(1), "thanos_blocks_meta_synced"))

testutil.Ok(t, synthesizeSamples(ctx, prom1, []fakeMetricSample{
{
label: "foo",
value: 123,
},
}))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return "max_over_time(my_fake_metric[2h])"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, []model.Metric{
{
"instance": "foo",
"prometheus": "p1",
},
})
}

func TestSidecarQueryEvaluation(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit fac4e88

Please sign in to comment.