diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cb5b88033..d3293028d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b). +- [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: fix block deduplication ### Added diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8a2664eeb4..027122447e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1255,7 +1255,6 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) - if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1376,21 +1375,47 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - part := newLazyRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - ) + // If we have inner replica labels we need to resort. + s.mtx.Lock() + needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels) + s.mtx.Unlock() + + var resp respSet + if needsEagerRetrival { + labelsToRemove := make(map[string]struct{}) + for _, replicaLabel := range req.WithoutReplicaLabels { + labelsToRemove[replicaLabel] = struct{}{} + } + resp = newEagerRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + labelsToRemove, + ) + } else { + resp = newLazyRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + ) + } mtx.Lock() - respSets = append(respSets, part) + respSets = append(respSets, resp) mtx.Unlock() return nil diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 8d75ed6b04..e13afd6518 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2017,8 +2017,6 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { replicaLabels: []string{"replica"}, expectedSeries: []labels.Labels{ labels.FromStrings("a", "1", "ext1", "0", "z", "1"), - labels.FromStrings("a", "1", "ext1", "0", "z", "1"), - labels.FromStrings("a", "1", "ext1", "0", "z", "2"), labels.FromStrings("a", "1", "ext1", "0", "z", "2"), labels.FromStrings("a", "1", "ext1", "1", "z", "1"), labels.FromStrings("a", "1", "ext1", "1", "z", "2"), @@ -3344,8 +3342,6 @@ func TestBucketIndexReader_decodeCachedPostingsErrors(t *testing.T) { } func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { - t.Skip("Known Issue, Added for debugging in followup PR.") - logger := log.NewNopLogger() tmpDir := t.TempDir() bktDir := filepath.Join(tmpDir, "bkt") diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index d5cc940637..7ea18b134d 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -605,7 +605,8 @@ func newAsyncRespSet( seriesCtx, span, frameTimeout, - st, + st.String(), + st.LabelSets(), closeSeries, cl, shardMatcher, @@ -639,12 +640,14 @@ type eagerRespSet struct { ctx context.Context closeSeries context.CancelFunc - st Client frameTimeout time.Duration shardMatcher *storepb.ShardMatcher removeLabels map[string]struct{} - storeLabels map[string]struct{} + + storeName string + storeLabels map[string]struct{} + storeLabelSets []labels.Labels // Internal bookkeeping. bufferedResponses []*storepb.SeriesResponse @@ -656,7 +659,8 @@ func newEagerRespSet( ctx context.Context, span opentracing.Span, frameTimeout time.Duration, - st Client, + storeName string, + storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, shardMatcher *storepb.ShardMatcher, @@ -666,7 +670,6 @@ func newEagerRespSet( ) respSet { ret := &eagerRespSet{ span: span, - st: st, closeSeries: closeSeries, cl: cl, frameTimeout: frameTimeout, @@ -675,9 +678,11 @@ func newEagerRespSet( wg: &sync.WaitGroup{}, shardMatcher: shardMatcher, removeLabels: removeLabels, + storeName: storeName, + storeLabelSets: storeLabelSets, } ret.storeLabels = make(map[string]struct{}) - for _, ls := range st.LabelSets() { + for _, ls := range storeLabelSets { for _, l := range ls { ret.storeLabels[l.Name] = struct{}{} } @@ -686,7 +691,7 @@ func newEagerRespSet( ret.wg.Add(1) // Start a goroutine and immediately buffer everything. - go func(st Client, l *eagerRespSet) { + go func(l *eagerRespSet) { seriesStats := &storepb.SeriesStatsCounter{} bytesProcessed := 0 @@ -715,7 +720,7 @@ func newEagerRespSet( select { case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) + err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName) l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) l.span.SetTag("err", err.Error()) return false @@ -731,9 +736,9 @@ func newEagerRespSet( // Most likely the per-Recv timeout has been reached. // There's a small race between canceling and the Recv() // but this is most likely true. - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String()) + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) } else { - rerr = errors.Wrapf(err, "receive series from %s", st.String()) + rerr = errors.Wrapf(err, "receive series from %s", storeName) } l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) l.span.SetTag("err", rerr.Error()) @@ -773,7 +778,7 @@ func newEagerRespSet( sortWithoutLabels(l.bufferedResponses, l.removeLabels) } - }(st, ret) + }(ret) return ret } @@ -845,6 +850,7 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] } func (l *eagerRespSet) Close() { + l.closeSeries() l.shardMatcher.Close() } @@ -873,11 +879,11 @@ func (l *eagerRespSet) Empty() bool { } func (l *eagerRespSet) StoreID() string { - return l.st.String() + return l.storeName } func (l *eagerRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.st.LabelSets()) + return labelpb.PromLabelSetsToString(l.storeLabelSets) } func (l *eagerRespSet) StoreLabels() map[string]struct{} {