Skip to content

Commit

Permalink
store/prometheus: ensure sidecar respects ext labels
Browse files Browse the repository at this point in the history
Bring back behaviour from pre-0.31.0:
https://github.com/thanos-io/thanos/pull/6127/files#diff-fe654aedc6e5df2ed2a3c493dbb5648358a65ba7c8b3c3029ea099d930b4f00aL333.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Jul 27, 2023
1 parent 510c054 commit ef4fb7c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 8 deletions.
19 changes: 11 additions & 8 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,13 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
// remote read.
contentType := httpResp.Header.Get("Content-Type")
if strings.HasPrefix(contentType, "application/x-protobuf") {
return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, enableChunkHashCalculation, extLsetToRemove)
return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove)
}

if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") {
return errors.Errorf("not supported remote read content type: %s", contentType)
}
return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, enableChunkHashCalculation, extLsetToRemove)
return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove)
}

func (p *PrometheusStore) queryPrometheus(
Expand Down Expand Up @@ -326,6 +326,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(
s storepb.Store_SeriesServer,
httpResp *http.Response,
querySpan tracing.Span,
extLset labels.Labels,
calculateChecksums bool,
extLsetToRemove map[string]struct{},
) error {
Expand All @@ -342,9 +343,10 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(
span.SetTag("series_count", len(resp.Results[0].Timeseries))

for _, e := range resp.Results[0].Timeseries {
// Sampled remote read handler already adds external labels for us:
// https://github.com/prometheus/prometheus/blob/3f6f5d3357e232abe53f1775f893fdf8f842712c/storage/remote/read_handler.go#L166.
lset := rmLabels(labelpb.ZLabelsToPromLabels(e.Labels), extLsetToRemove)
// https://github.com/prometheus/prometheus/blob/3f6f5d3357e232abe53f1775f893fdf8f842712c/storage/remote/read_handler.go#L166
// MergeLabels() prefers local labels over external labels but we prefer
// external labels hence we need to do this:
lset := rmLabels(labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(e.Labels), extLset), extLsetToRemove)
if len(e.Samples) == 0 {
// As found in https://github.com/thanos-io/thanos/issues/381
// Prometheus can give us completely empty time series. Ignore these with log until we figure out that
Expand Down Expand Up @@ -379,6 +381,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(
shardMatcher *storepb.ShardMatcher,
httpResp *http.Response,
querySpan tracing.Span,
extLset labels.Labels,
calculateChecksums bool,
extLsetToRemove map[string]struct{},
) error {
Expand Down Expand Up @@ -419,10 +422,10 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(

framesNum++
for _, series := range res.ChunkedSeries {
// Streamed remote read handler already adds
// external labels:
// MergeLabels() prefers local labels over external labels but we prefer
// external labels hence we need to do this:
// https://github.com/prometheus/prometheus/blob/3f6f5d3357e232abe53f1775f893fdf8f842712c/storage/remote/codec.go#L210.
completeLabelset := rmLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLsetToRemove)
completeLabelset := rmLabels(labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLset), extLsetToRemove)
if !shardMatcher.MatchesLabels(completeLabelset) {
continue
}
Expand Down
50 changes: 50 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,3 +2241,53 @@ func TestConnectedQueriesWithLazyProxy(t *testing.T) {
}, time.Now, promclient.QueryOptions{}, 0)

}

func TestSidecarPrefersExtLabels(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("sidecar-extlbls")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

promCfg := `global:
external_labels:
region: test`

prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "p1", promCfg, "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))

endpoints := []string{
sidecar.InternalEndpoint("grpc"),
}
querier := e2ethanos.
NewQuerierBuilder(e, "1", endpoints...).
Init()
testutil.Ok(t, e2e.StartAndWaitReady(querier))

now := time.Now()
ctx := context.Background()
m := model.Sample{
Metric: map[model.LabelName]model.LabelValue{
"__name__": "sidecar_test_metric",
"instance": model.LabelValue("test"),
"region": model.LabelValue("foo"),
},
Value: model.SampleValue(2),
Timestamp: model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()),
}
testutil.Ok(t, synthesizeSamples(ctx, prom, []model.Sample{m}))

retv, _ := instantQuery(t, context.Background(), querier.Endpoint("http"), func() string {
return "sidecar_test_metric"
}, func() time.Time { return now.Add(time.Hour) }, promclient.QueryOptions{}, 1)

testutil.Equals(t, model.Vector{&model.Sample{
Metric: model.Metric{
"__name__": "sidecar_test_metric",
"instance": "test",
"region": "test",
},
Value: model.SampleValue(2),
Timestamp: model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()),
}}, retv)
}

0 comments on commit ef4fb7c

Please sign in to comment.