Skip to content

Commit

Permalink
fix(blooms): Copy chunks from ForSeries (#14863)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Nov 11, 2024
1 parent fbd3292 commit bfc2890
Showing 1 changed file with 31 additions and 42 deletions.
73 changes: 31 additions & 42 deletions pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,16 @@ func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBou
return deduped, nil
}

type seriesWithChunks struct {
tsdb tsdb.SingleTenantTSDBIdentifier
fp model.Fingerprint
chunks []index.ChunkMeta
}

type seriesBatch struct {
series []seriesWithChunks
tsdb tsdb.SingleTenantTSDBIdentifier
series []*v1.Series
size uint64
}

func newSeriesBatch() seriesBatch {
func newSeriesBatch(tsdb tsdb.SingleTenantTSDBIdentifier) seriesBatch {
return seriesBatch{
series: make([]seriesWithChunks, 0, 100),
tsdb: tsdb,
series: make([]*v1.Series, 0, 100),
}
}

Expand All @@ -179,31 +175,14 @@ func (b *seriesBatch) Bounds() v1.FingerprintBounds {

// We assume that the series are sorted by fingerprint.
// This is guaranteed since series are iterated in order by the TSDB.
return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp)
return v1.NewBounds(b.series[0].Fingerprint, b.series[len(b.series)-1].Fingerprint)
}

func (b *seriesBatch) V1Series() []*v1.Series {
series := make([]*v1.Series, 0, len(b.series))
for _, s := range b.series {
res := &v1.Series{
Fingerprint: s.fp,
Chunks: make(v1.ChunkRefs, 0, len(s.chunks)),
}
for _, chk := range s.chunks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
}

return series
return b.series
}

func (b *seriesBatch) Append(s seriesWithChunks, size uint64) {
func (b *seriesBatch) Append(s *v1.Series, size uint64) {
b.series = append(b.series, s)
b.size += size
}
Expand All @@ -217,10 +196,7 @@ func (b *seriesBatch) Size() uint64 {
}

func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier {
if len(b.series) == 0 {
return tsdb.SingleTenantTSDBIdentifier{}
}
return b.series[0].tsdb
return b.tsdb
}

func (s *ChunkSizeStrategy) sizedSeriesIter(
Expand All @@ -230,9 +206,14 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
targetTaskSizeBytes uint64,
) (iter.Iterator[seriesBatch], int, error) {
batches := make([]seriesBatch, 0, 100)
currentBatch := newSeriesBatch()
var currentBatch seriesBatch

for _, idx := range tsdbsWithGaps {
if currentBatch.Len() > 0 {
batches = append(batches, currentBatch)
}
currentBatch = newSeriesBatch(idx.tsdbIdentifier)

for _, gap := range idx.gaps {
if err := idx.tsdb.ForSeries(
ctx,
Expand All @@ -253,14 +234,22 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
// AND Adding this series to the batch would exceed the target task size.
if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
}

res := &v1.Series{
Fingerprint: fp,
Chunks: make(v1.ChunkRefs, 0, len(chks)),
}
for _, chk := range chks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

currentBatch.Append(seriesWithChunks{
tsdb: idx.tsdbIdentifier,
fp: fp,
chunks: chks,
}, seriesSize)
currentBatch.Append(res, seriesSize)
return false
}
},
Expand All @@ -269,10 +258,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
return nil, 0, err
}

// Add the last batch for this TSDB if it's not empty.
// Add the last batch for this gap if it's not empty.
if currentBatch.Len() > 0 {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
}
}
}
Expand Down

0 comments on commit bfc2890

Please sign in to comment.