From 5ac922bec3844b54ddb12ae9824d8c07e679c9f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 15 Feb 2023 12:08:38 +0200 Subject: [PATCH] sidecar: implement without replica labels capability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement the new without replica labels capability in Sidecar to have more efficient querying. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/sidecar.go | 2 +- pkg/store/prometheus.go | 25 ++++++++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 0ae99314cc..1f629f1af7 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -271,7 +271,7 @@ func runSidecar( MinTime: mint, MaxTime: maxt, SupportsSharding: true, - SupportsWithoutReplicaLabels: false, // TODO(bwplotka): Add support for efficiency. + SupportsWithoutReplicaLabels: true, } } return nil diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index b4546c7d05..f61c8096a7 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -198,8 +198,13 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie shardMatcher := r.ShardInfo.Matcher(&p.buffers) defer shardMatcher.Close() + extLsetToRemove := map[string]struct{}{} + for _, lbl := range r.WithoutReplicaLabels { + extLsetToRemove[lbl] = struct{}{} + } + if r.QueryHints != nil && r.QueryHints.IsSafeToExecute() && !shardMatcher.IsSharded() { - return p.queryPrometheus(s, r) + return p.queryPrometheus(s, r, extLsetToRemove) } q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} @@ -234,16 +239,20 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie // remote read. contentType := httpResp.Header.Get("Content-Type") if strings.HasPrefix(contentType, "application/x-protobuf") { - return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation) + return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove) } if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") { return errors.Errorf("not supported remote read content type: %s", contentType) } - return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation) + return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove) } -func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error { +func (p *PrometheusStore) queryPrometheus( + s storepb.Store_SeriesServer, + r *storepb.SeriesRequest, + extLsetToRemove map[string]struct{}, +) error { var matrix model.Matrix opts := promclient.QueryOptions{} @@ -274,7 +283,7 @@ func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *store } } - externalLbls := p.externalLabelsFn() + externalLbls := rmLabels(p.externalLabelsFn().Copy(), extLsetToRemove) for _, vector := range matrix { seriesLbls := labels.Labels(make([]labels.Label, 0, len(vector.Metric))) @@ -316,6 +325,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse( querySpan tracing.Span, extLset labels.Labels, calculateChecksums bool, + extLsetToRemove map[string]struct{}, ) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") @@ -330,7 +340,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse( span.SetTag("series_count", len(resp.Results[0].Timeseries)) for _, e := range resp.Results[0].Timeseries { - lset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(e.Labels), extLset) + lset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(e.Labels), rmLabels(extLset.Copy(), extLsetToRemove)) if len(e.Samples) == 0 { // As found in https://github.com/thanos-io/thanos/issues/381 // Prometheus can give us completely empty time series. Ignore these with log until we figure out that @@ -367,6 +377,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( querySpan tracing.Span, extLset labels.Labels, calculateChecksums bool, + extLsetToRemove map[string]struct{}, ) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") @@ -405,7 +416,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( framesNum++ for _, series := range res.ChunkedSeries { - completeLabelset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLset) + completeLabelset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), rmLabels(extLset.Copy(), extLsetToRemove)) if !shardMatcher.MatchesLabels(completeLabelset) { continue }