diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 14feb35231..907dd56a8c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -819,10 +819,11 @@ type blockSeriesClient struct { i int postings []storage.SeriesRef chkMetas []chunks.Meta + lset labels.Labels symbolizedLset []symbolizedLabel entries []seriesEntry - batch []*storepb.SeriesResponse hasMorePostings bool + batchSize int } func newBlockSeriesClient( @@ -856,8 +857,8 @@ func newBlockSeriesClient( loadAggregates: req.Aggregates, shardMatcher: shardMatcher, calculateChunkHash: calculateChunkHash, - entries: make([]seriesEntry, 0, batchSize), hasMorePostings: true, + batchSize: batchSize, } } @@ -895,24 +896,31 @@ func (b *blockSeriesClient) ExpandPostings( } b.postings = ps + if b.batchSize > len(ps) { + b.batchSize = len(ps) + } + b.entries = make([]seriesEntry, 0, b.batchSize) return nil } func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { - for len(b.batch) == 0 && b.hasMorePostings { + for len(b.entries) == 0 && b.hasMorePostings { if err := b.nextBatch(); err != nil { return nil, err } } - if len(b.batch) == 0 { + if len(b.entries) == 0 { return nil, io.EOF } - next := b.batch[0] - b.batch = b.batch[1:] + next := b.entries[0] + b.entries = b.entries[1:] - return next, nil + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(next.lset), + Chunks: next.chks, + }), nil } func (b *blockSeriesClient) nextBatch() error { @@ -929,9 +937,6 @@ func (b *blockSeriesClient) nextBatch() error { return nil } - b.entries = b.entries[:0] - b.batch = b.batch[:0] - b.indexr.reset() if !b.skipChunks { b.chunkr.reset() @@ -941,6 +946,7 @@ func (b *blockSeriesClient) nextBatch() error { return errors.Wrap(err, "preload series") } + b.entries = b.entries[:0] for i := 0; i < len(postingsBatch); i++ { ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt) if err != nil { @@ -950,26 +956,22 @@ func (b *blockSeriesClient) nextBatch() error { continue } - var lset labels.Labels - if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { + if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &b.lset); err != nil { return errors.Wrap(err, "Lookup labels symbols") } - completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if !b.shardMatcher.MatchesLabels(completeLabelset) { continue } + s := seriesEntry{lset: completeLabelset} if b.skipChunks { - b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), - })) + b.entries = append(b.entries, s) continue } - s := seriesEntry{} // Schedule loading chunks. - s.lset = completeLabelset s.refs = make([]chunks.ChunkRef, 0, len(b.chkMetas)) s.chks = make([]storepb.AggrChunk, 0, len(b.chkMetas)) @@ -998,13 +1000,6 @@ func (b *blockSeriesClient) nextBatch() error { } } - for _, entry := range b.entries { - b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(entry.lset), - Chunks: entry.chks, - })) - } - return nil }