Skip to content

Commit

Permalink
sidecar: implement without replica labels capability
Browse files Browse the repository at this point in the history
Implement the new without replica labels capability in Sidecar to have
more efficient querying.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Feb 15, 2023
1 parent 6e28411 commit 5ac922b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func runSidecar(
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SupportsWithoutReplicaLabels: false, // TODO(bwplotka): Add support for efficiency.
SupportsWithoutReplicaLabels: true,
}
}
return nil
Expand Down
25 changes: 18 additions & 7 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,13 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
shardMatcher := r.ShardInfo.Matcher(&p.buffers)
defer shardMatcher.Close()

extLsetToRemove := map[string]struct{}{}
for _, lbl := range r.WithoutReplicaLabels {
extLsetToRemove[lbl] = struct{}{}
}

if r.QueryHints != nil && r.QueryHints.IsSafeToExecute() && !shardMatcher.IsSharded() {
return p.queryPrometheus(s, r)
return p.queryPrometheus(s, r, extLsetToRemove)
}

q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}
Expand Down Expand Up @@ -234,16 +239,20 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
// remote read.
contentType := httpResp.Header.Get("Content-Type")
if strings.HasPrefix(contentType, "application/x-protobuf") {
return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation)
return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove)
}

if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") {
return errors.Errorf("not supported remote read content type: %s", contentType)
}
return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation)
return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove)
}

func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error {
func (p *PrometheusStore) queryPrometheus(
s storepb.Store_SeriesServer,
r *storepb.SeriesRequest,
extLsetToRemove map[string]struct{},
) error {
var matrix model.Matrix

opts := promclient.QueryOptions{}
Expand Down Expand Up @@ -274,7 +283,7 @@ func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *store
}
}

externalLbls := p.externalLabelsFn()
externalLbls := rmLabels(p.externalLabelsFn().Copy(), extLsetToRemove)
for _, vector := range matrix {
seriesLbls := labels.Labels(make([]labels.Label, 0, len(vector.Metric)))

Expand Down Expand Up @@ -316,6 +325,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(
querySpan tracing.Span,
extLset labels.Labels,
calculateChecksums bool,
extLsetToRemove map[string]struct{},
) error {
level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.")

Expand All @@ -330,7 +340,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(
span.SetTag("series_count", len(resp.Results[0].Timeseries))

for _, e := range resp.Results[0].Timeseries {
lset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(e.Labels), extLset)
lset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(e.Labels), rmLabels(extLset.Copy(), extLsetToRemove))
if len(e.Samples) == 0 {
// As found in https://github.com/thanos-io/thanos/issues/381
// Prometheus can give us completely empty time series. Ignore these with log until we figure out that
Expand Down Expand Up @@ -367,6 +377,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(
querySpan tracing.Span,
extLset labels.Labels,
calculateChecksums bool,
extLsetToRemove map[string]struct{},
) error {
level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.")

Expand Down Expand Up @@ -405,7 +416,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(

framesNum++
for _, series := range res.ChunkedSeries {
completeLabelset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLset)
completeLabelset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), rmLabels(extLset.Copy(), extLsetToRemove))
if !shardMatcher.MatchesLabels(completeLabelset) {
continue
}
Expand Down

0 comments on commit 5ac922b

Please sign in to comment.