Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): max block size #14615

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/bloom_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB
}
}

func (b *BloomBlockBuilder) UnflushedSize() int {
return b.scratch.Len() + b.page.UnflushedSize()
}

func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) {
if !b.writtenSchema {
if err := b.writeSchema(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func (w *PageWriter) Reset() {
w.n = 0
}

func (w *PageWriter) UnflushedSize() int {
return w.enc.Len()
}

func (w *PageWriter) SpaceFor(numBytes int) bool {
// if a single bloom exceeds the target size, still accept it
// otherwise only accept it if adding it would not exceed the target size
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder {
}
}

func (b *IndexBuilder) UnflushedSize() int {
return b.scratch.Len() + b.page.UnflushedSize()
}

func (b *IndexBuilder) WriteOpts() error {
b.scratch.Reset()
b.opts.Encode(b.scratch)
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/bloom/v1/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ func CompareIterators[A, B any](
a iter.Iterator[A],
b iter.Iterator[B],
) {
var i int
for a.Next() {
require.True(t, b.Next())
require.Truef(t, b.Next(), "'a' has %dth element but 'b' does not'", i)
f(t, a.At(), b.At())
i++
}
require.False(t, b.Next())
require.NoError(t, a.Err())
Expand Down
27 changes: 26 additions & 1 deletion pkg/storage/bloom/v1/versioned_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,35 @@ func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields Set[F
return false, errors.Wrapf(err, "writing index for series %v", series.Fingerprint)
}

full, _, err := b.writer.Full(b.opts.BlockSize)
full, err := b.full()
if err != nil {
return false, errors.Wrap(err, "checking if block is full")
}

return full, nil
}

func (b *V3Builder) full() (bool, error) {
if b.opts.BlockSize == 0 {
// Unlimited block size
return false, nil
}

full, writtenSize, err := b.writer.Full(b.opts.BlockSize)
if err != nil {
return false, errors.Wrap(err, "checking if block writer is full")
}
if full {
return true, nil
}

// Even if the block writer is not full, we may have unflushed data in the bloom builders.
// Check if by flushing these, we would exceed the block size.
unflushedIndexSize := b.index.UnflushedSize()
unflushedBloomSize := b.blooms.UnflushedSize()
if uint64(writtenSize+unflushedIndexSize+unflushedBloomSize) > b.opts.BlockSize {
return true, nil
}

return false, nil
}
103 changes: 102 additions & 1 deletion pkg/storage/bloom/v1/versioned_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/compression"
Expand All @@ -17,7 +18,7 @@ import (
func smallBlockOpts(v Version, enc compression.Codec) BlockOptions {
return BlockOptions{
Schema: NewSchema(v, enc),
SeriesPageSize: 100,
SeriesPageSize: 4 << 10,
BloomPageSize: 2 << 10,
BlockSize: 0, // unlimited
}
Expand Down Expand Up @@ -78,3 +79,103 @@ func TestV3Roundtrip(t *testing.T) {
querier,
)
}

func seriesWithBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms {
series, _ := MkBasicSeriesWithBlooms(nSeries, fromFp, throughFp, 0, 10000)
return series
}

func seriesWithoutBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms {
series := seriesWithBlooms(nSeries, fromFp, throughFp)

// remove blooms from series
for i := range series {
series[i].Blooms = v2.NewSliceIter([]*Bloom{})
}

return series
}
func TestFullBlock(t *testing.T) {
opts := smallBlockOpts(V3, compression.None)
minBlockSize := opts.SeriesPageSize // 1 index page, 4KB
const maxEmptySeriesPerBlock = 47
for _, tc := range []struct {
name string
maxBlockSize uint64
series []SeriesWithBlooms
expected []SeriesWithBlooms
}{
{
name: "only series without blooms",
maxBlockSize: minBlockSize,
// +1 so we test adding the last series that fills the block
series: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff),
expected: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff),
},
{
name: "series without blooms and one with blooms",
maxBlockSize: minBlockSize,
series: append(
seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff),
seriesWithBlooms(50, 0x8000, 0xffff)...,
),
expected: append(
seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff),
seriesWithBlooms(1, 0x8000, 0x8001)...,
),
},
{
name: "only one series with bloom",
maxBlockSize: minBlockSize,
series: seriesWithBlooms(10, 0, 0xffff),
expected: seriesWithBlooms(1, 0, 1),
},
{
name: "one huge series with bloom and then series without",
maxBlockSize: minBlockSize,
series: append(
seriesWithBlooms(1, 0, 1),
seriesWithoutBlooms(100, 1, 0xffff)...,
),
expected: seriesWithBlooms(1, 0, 1),
},
{
name: "big block",
maxBlockSize: 1 << 20, // 1MB
series: seriesWithBlooms(100, 0, 0xffff),
expected: seriesWithBlooms(100, 0, 0xffff),
},
} {
t.Run(tc.name, func(t *testing.T) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)
opts.BlockSize = tc.maxBlockSize

b, err := NewBlockBuilderV3(opts, writer)
require.NoError(t, err)

_, err = b.BuildFrom(v2.NewSliceIter(tc.series))
require.NoError(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

CompareIterators(
t,
func(t *testing.T, a SeriesWithBlooms, b *SeriesWithBlooms) {
require.Equal(t, a.Series.Fingerprint, b.Series.Fingerprint)
require.ElementsMatch(t, a.Series.Chunks, b.Series.Chunks)
bloomsA, err := v2.Collect(a.Blooms)
require.NoError(t, err)
bloomsB, err := v2.Collect(b.Blooms)
require.NoError(t, err)
require.Equal(t, len(bloomsB), len(bloomsA))
},
v2.NewSliceIter(tc.expected),
querier,
)
})
}
}
Loading