diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 0ae99314cc..1f629f1af7 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -271,7 +271,7 @@ func runSidecar( MinTime: mint, MaxTime: maxt, SupportsSharding: true, - SupportsWithoutReplicaLabels: false, // TODO(bwplotka): Add support for efficiency. + SupportsWithoutReplicaLabels: true, } } return nil diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index b4546c7d05..e4444c718f 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -174,17 +174,23 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie } } + extLsetToRemove := map[string]struct{}{} + for _, lbl := range r.WithoutReplicaLabels { + extLsetToRemove[lbl] = struct{}{} + } + if r.SkipChunks { + finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove) labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime) if err != nil { return err } for _, lbm := range labelMaps { - lset := make([]labelpb.ZLabel, 0, len(lbm)+len(extLset)) + lset := make([]labelpb.ZLabel, 0, len(lbm)+len(finalExtLset)) for k, v := range lbm { lset = append(lset, labelpb.ZLabel{Name: k, Value: v}) } - lset = append(lset, labelpb.ZLabelsFromPromLabels(extLset)...) + lset = append(lset, labelpb.ZLabelsFromPromLabels(finalExtLset)...) sort.Slice(lset, func(i, j int) bool { return lset[i].Name < lset[j].Name }) @@ -199,7 +205,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie defer shardMatcher.Close() if r.QueryHints != nil && r.QueryHints.IsSafeToExecute() && !shardMatcher.IsSharded() { - return p.queryPrometheus(s, r) + return p.queryPrometheus(s, r, extLsetToRemove) } q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} @@ -234,16 +240,20 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie // remote read. contentType := httpResp.Header.Get("Content-Type") if strings.HasPrefix(contentType, "application/x-protobuf") { - return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation) + return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, enableChunkHashCalculation, extLsetToRemove) } if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") { return errors.Errorf("not supported remote read content type: %s", contentType) } - return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation) + return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, enableChunkHashCalculation, extLsetToRemove) } -func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error { +func (p *PrometheusStore) queryPrometheus( + s storepb.Store_SeriesServer, + r *storepb.SeriesRequest, + extLsetToRemove map[string]struct{}, +) error { var matrix model.Matrix opts := promclient.QueryOptions{} @@ -274,7 +284,7 @@ func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *store } } - externalLbls := p.externalLabelsFn() + externalLbls := rmLabels(p.externalLabelsFn().Copy(), extLsetToRemove) for _, vector := range matrix { seriesLbls := labels.Labels(make([]labels.Label, 0, len(vector.Metric))) @@ -314,8 +324,8 @@ func (p *PrometheusStore) handleSampledPrometheusResponse( s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, - extLset labels.Labels, calculateChecksums bool, + extLsetToRemove map[string]struct{}, ) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") @@ -330,7 +340,9 @@ func (p *PrometheusStore) handleSampledPrometheusResponse( span.SetTag("series_count", len(resp.Results[0].Timeseries)) for _, e := range resp.Results[0].Timeseries { - lset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(e.Labels), extLset) + // Sampled remote read handler already adds external labels for us: + // https://github.com/prometheus/prometheus/blob/3f6f5d3357e232abe53f1775f893fdf8f842712c/storage/remote/read_handler.go#L166. + lset := rmLabels(labelpb.ZLabelsToPromLabels(e.Labels), extLsetToRemove) if len(e.Samples) == 0 { // As found in https://github.com/thanos-io/thanos/issues/381 // Prometheus can give us completely empty time series. Ignore these with log until we figure out that @@ -365,8 +377,8 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( shardMatcher *storepb.ShardMatcher, httpResp *http.Response, querySpan tracing.Span, - extLset labels.Labels, calculateChecksums bool, + extLsetToRemove map[string]struct{}, ) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") @@ -405,7 +417,10 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( framesNum++ for _, series := range res.ChunkedSeries { - completeLabelset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLset) + // Streamed remote read handler already adds + // external labels: + // https://github.com/prometheus/prometheus/blob/3f6f5d3357e232abe53f1775f893fdf8f842712c/storage/remote/codec.go#L210. + completeLabelset := rmLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLsetToRemove) if !shardMatcher.MatchesLabels(completeLabelset) { continue } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index eb9d80ed15..cb3c500579 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -48,11 +48,11 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 a := p.Appender() - _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+100, 1) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+100, 1) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+200, 2) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+200, 2) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+300, 3) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+300, 3) testutil.Ok(t, err) testutil.Ok(t, a.Commit()) @@ -381,11 +381,11 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 a := p.Appender() - _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+100, 1) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+100, 1) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+200, 2) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+200, 2) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+300, 3) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+300, 3) testutil.Ok(t, err) testutil.Ok(t, a.Commit()) @@ -507,7 +507,7 @@ func testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, appender st offset := int64(2*math.MaxUint16 + 5) for i := int64(0); i < offset; i++ { - _, err := appender.Append(0, labels.FromStrings("a", "b"), baseT+i, 1) + _, err := appender.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+i, 1) testutil.Ok(t, err) } diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index 6616db4016..d37b4e4840 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -13,6 +13,15 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" ) +func TestRmLabelsCornerCases(t *testing.T) { + testutil.Equals(t, rmLabels(labelsFromStrings("aa", "bb"), map[string]struct{}{ + "aa": {}, + }), labels.Labels{}) + testutil.Equals(t, rmLabels(labelsFromStrings(), map[string]struct{}{ + "aa": {}, + }), labels.Labels{}) +} + func TestSortWithoutLabels(t *testing.T) { for _, tcase := range []struct { input []*storepb.SeriesResponse