Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(blooms)!: Introduce a new block schema (V3) #14038

Merged
merged 10 commits into from
Sep 5, 2024
5 changes: 5 additions & 0 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBl
return err
}

// We require V3+ schema
if schema.Version() < v1.V3 {
return v1.ErrUnsupportedSchemaVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that processor.processTasks aborts after the first task group where p.processTasksForDay returns an error. Is that something we want to keep?

Especially with this most recent change, I believe that means if any of the existing blocks aren't the current schema version, then we don't process any of newer ones, even if they're valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. We could ignore incompatible blocks, rather than fail.
But for the sake of simplicity, I would cancel a request with the first incompatible block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. If this turns out to be an issue in the future we can replace it with multierror instead of returning immediately.

}

tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), schema.NGramSkip())
iters := make([]iter.PeekIterator[v1.Request], 0, len(tasks))

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestArchive(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ type Bloom struct {
filter.ScalableBloomFilter
}

func NewBloom() *Bloom {
return &Bloom{
// TODO parameterise SBF options. fp_rate
ScalableBloomFilter: *filter.NewScalableBloomFilter(1024, 0.01, 0.8),
}
}

func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8)))
Expand Down
31 changes: 3 additions & 28 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/grafana/loki/v3/pkg/iter"
v2iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/encoding"

"github.com/grafana/loki/pkg/push"
Expand Down Expand Up @@ -92,13 +90,6 @@ func estimatedCount(m uint, p float64) uint {
return uint(-float64(m) * math.Log(1-p))
}

func (bt *BloomTokenizer) newBloom() *Bloom {
return &Bloom{
// TODO parameterise SBF options. fp_rate
ScalableBloomFilter: *filter.NewScalableBloomFilter(1024, 0.01, 0.8),
}
}

// Populates a bloom filter(s) with the tokens from the given chunks.
// Called once per series
func (bt *BloomTokenizer) Populate(
Expand Down Expand Up @@ -132,14 +123,14 @@ func (bt *BloomTokenizer) Populate(
)
}
} else {
bloom = bt.newBloom()
bloom = NewBloom()
}

var bytesAdded int

for chks.Next() {
chk := chks.At()
itr := newPeekingEntryIterAdapter(chk.Itr)
itr := v2iter.NewPeekIter(chk.Itr)

for {
full, newBytes := bt.addChunkToBloom(
Expand All @@ -156,7 +147,7 @@ func (bt *BloomTokenizer) Populate(

// start a new bloom + reset bytesAdded counter
bytesAdded = 0
bloom = bt.newBloom()
bloom = NewBloom()

// cache _MUST_ be cleared when a new bloom is created to ensure that all tokens from
// each line are indexed into at least one bloom
Expand Down Expand Up @@ -288,19 +279,3 @@ outer:

return full, chunkBytes
}

type entryIterAdapter struct {
iter.EntryIterator
}

func (a entryIterAdapter) At() logproto.Entry {
return a.EntryIterator.At()
}

func (a entryIterAdapter) Err() error {
return a.EntryIterator.Err()
}

func newPeekingEntryIterAdapter(itr iter.EntryIterator) *v2iter.PeekIter[logproto.Entry] {
return v2iter.NewPeekIter[logproto.Entry](entryIterAdapter{itr})
}
9 changes: 7 additions & 2 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) {

func NewBlockOptions(enc chunkenc.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions {
opts := NewBlockOptionsFromSchema(Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: enc,
nGramLength: nGramLength,
nGramSkip: nGramSkip,
Expand Down Expand Up @@ -289,7 +289,12 @@ func (mb *MergeBuilder) processNextSeries(
bytesAdded += bloom.SourceBytesAdded
}

done, err := builder.AddSeries(*nextInStore, offsets)
// TODO(chaudum): Use the indexed fields from bloom creation, however,
// currently we still build blooms from log lines.
fields := NewSet[Field](1)
fields.Add("__line__")

done, err := builder.AddSeries(*nextInStore, offsets, fields)
if err != nil {
return nil, bytesAdded, 0, false, false, errors.Wrap(err, "committing series")
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ var blockEncodings = []chunkenc.Encoding{
chunkenc.EncZstd,
}

func TestBlockOptionsRoundTrip(t *testing.T) {
func TestBlockOptions_RoundTrip(t *testing.T) {
t.Parallel()
opts := BlockOptions{
Schema: Schema{
version: V1,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
nGramLength: 10,
nGramSkip: 2,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
t.Run(desc, func(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: enc,
nGramLength: 10,
nGramSkip: 2,
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestMergeBuilder(t *testing.T) {
data, _ := MkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
blockOpts := BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {

blockOpts := BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestBlockReset(t *testing.T) {
reader := NewByteReader(indexBuf, bloomsBuf)

schema := Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
nGramLength: 10,
nGramSkip: 2,
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

blockOpts := BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy, // test with different encodings?
nGramLength: 4, // needs to match values from MkBasicSeriesWithBlooms
nGramSkip: 0, // needs to match values from MkBasicSeriesWithBlooms
Expand Down Expand Up @@ -548,9 +548,11 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
builder, err := NewBlockBuilder(blockOpts, writer)
require.Nil(t, err)

checksum, _, err := mb.Build(builder)
_, _, err = mb.Build(builder)
require.Nil(t, err)
require.Equal(t, uint32(0x2a6cdba6), checksum)
rfratto marked this conversation as resolved.
Show resolved Hide resolved
// checksum changes as soon as the contents of the block or the encoding change
// once the block format is stable, calculate the checksum and assert its correctness
// require.Equal(t, uint32(0x2a6cdba6), checksum)

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (fq *FusedQuerier) Run() error {
return nil
}

func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithOffsets, reqs []Request) {
func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithMeta, reqs []Request) {
// For a given chunk|series to be removed, it must fail to match all blooms.
// Because iterating/loading blooms can be expensive, we iterate blooms one at a time, collecting
// the removals (failures) for each (bloom, chunk) pair.
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestFusedQuerier(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestFuseMultiPage(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
nGramLength: 3, // we test trigrams
nGramSkip: 0,
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncNone,
},
SeriesPageSize: 100,
Expand Down Expand Up @@ -430,7 +430,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
version: CurrentSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 256 << 10, // 256k
Expand Down
Loading
Loading