Skip to content

Commit

Permalink
Avoid overallocating for small responses
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 6, 2022
1 parent 31874e2 commit f4550c7
Showing 1 changed file with 20 additions and 25 deletions.
45 changes: 20 additions & 25 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,10 +819,11 @@ type blockSeriesClient struct {
i int
postings []storage.SeriesRef
chkMetas []chunks.Meta
lset labels.Labels
symbolizedLset []symbolizedLabel
entries []seriesEntry
batch []*storepb.SeriesResponse
hasMorePostings bool
batchSize int
}

func newBlockSeriesClient(
Expand Down Expand Up @@ -856,8 +857,8 @@ func newBlockSeriesClient(
loadAggregates: req.Aggregates,
shardMatcher: shardMatcher,
calculateChunkHash: calculateChunkHash,
entries: make([]seriesEntry, 0, batchSize),
hasMorePostings: true,
batchSize: batchSize,
}
}

Expand Down Expand Up @@ -895,24 +896,31 @@ func (b *blockSeriesClient) ExpandPostings(
}

b.postings = ps
if b.batchSize > len(ps) {
b.batchSize = len(ps)
}
b.entries = make([]seriesEntry, 0, b.batchSize)
return nil
}

func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) {
for len(b.batch) == 0 && b.hasMorePostings {
for len(b.entries) == 0 && b.hasMorePostings {
if err := b.nextBatch(); err != nil {
return nil, err
}
}

if len(b.batch) == 0 {
if len(b.entries) == 0 {
return nil, io.EOF
}

next := b.batch[0]
b.batch = b.batch[1:]
next := b.entries[0]
b.entries = b.entries[1:]

return next, nil
return storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(next.lset),
Chunks: next.chks,
}), nil
}

func (b *blockSeriesClient) nextBatch() error {
Expand All @@ -929,9 +937,6 @@ func (b *blockSeriesClient) nextBatch() error {
return nil
}

b.entries = b.entries[:0]
b.batch = b.batch[:0]

b.indexr.reset()
if !b.skipChunks {
b.chunkr.reset()
Expand All @@ -941,6 +946,7 @@ func (b *blockSeriesClient) nextBatch() error {
return errors.Wrap(err, "preload series")
}

b.entries = b.entries[:0]
for i := 0; i < len(postingsBatch); i++ {
ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt)
if err != nil {
Expand All @@ -950,26 +956,22 @@ func (b *blockSeriesClient) nextBatch() error {
continue
}

var lset labels.Labels
if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil {
if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &b.lset); err != nil {
return errors.Wrap(err, "Lookup labels symbols")
}

completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset)
completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset)
if !b.shardMatcher.MatchesLabels(completeLabelset) {
continue
}

s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(completeLabelset),
}))
b.entries = append(b.entries, s)
continue
}

s := seriesEntry{}
// Schedule loading chunks.
s.lset = completeLabelset
s.refs = make([]chunks.ChunkRef, 0, len(b.chkMetas))
s.chks = make([]storepb.AggrChunk, 0, len(b.chkMetas))

Expand Down Expand Up @@ -998,13 +1000,6 @@ func (b *blockSeriesClient) nextBatch() error {
}
}

for _, entry := range b.entries {
b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(entry.lset),
Chunks: entry.chks,
}))
}

return nil
}

Expand Down

0 comments on commit f4550c7

Please sign in to comment.