Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Use filesystem backed writer for blooms #13522

Merged
merged 4 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion integration/bloom_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func checkForTimestampMetric(t *testing.T, cliPlanner *client.Client, metricName

func createBloomStore(t *testing.T, sharedPath string) *bloomshipper.BloomStore {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
Expand Down
25 changes: 18 additions & 7 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package builder

import (
"bytes"
"context"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -356,7 +356,7 @@ func (b *Builder) processTask(
seriesItrWithCounter,
b.chunkLoader,
blocksIter,
b.rwFn,
b.writerReaderFunc,
nil, // TODO(salvacorts): Pass reporter or remove when we address tracking
b.bloomStore.BloomMetrics(),
logger,
Expand All @@ -372,6 +372,9 @@ func (b *Builder) processTask(
built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
chaudum marked this conversation as resolved.
Show resolved Hide resolved
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}
return nil, fmt.Errorf("failed to build block: %w", err)
}

Expand All @@ -382,10 +385,17 @@ func (b *Builder) processTask(
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}
return nil, fmt.Errorf("failed to write block: %w", err)
}
b.metrics.blocksCreated.Inc()

if err := blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}

totalGapKeyspace := gap.Bounds.Max - gap.Bounds.Min
progress := built.Bounds.Max - gap.Bounds.Min
pct := float64(progress) / float64(totalGapKeyspace) * 100
Expand Down Expand Up @@ -477,9 +487,10 @@ func (b *Builder) loadWorkForGap(
return seriesItr, blocksIter, nil
}

// TODO(owen-d): pool, evaluate if memory-only is the best choice
func (b *Builder) rwFn() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf)
func (b *Builder) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) {
dir, err := os.MkdirTemp(b.cfg.WorkingDir, "bloom-block-")
if err != nil {
panic(err)
}
return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir)
}
2 changes: 2 additions & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ type Config struct {
GrpcConfig grpcclient.Config `yaml:"grpc_config"`
PlannerAddress string `yaml:"planner_address"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
WorkingDir string `yaml:"working_directory" doc:"hidden"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner")
cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)
cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+".backoff", f)
f.StringVar(&cfg.WorkingDir, prefix+".working-directory", "", "Working directory to which blocks are temporarily written to. Empty string defaults to the operating system's temp directory.")
}

func (cfg *Config) Validate() error {
Expand Down
48 changes: 25 additions & 23 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct {
metrics *v1.Metrics
logger log.Logger

readWriterFn func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)

tokenizer *v1.BloomTokenizer
}
Expand All @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator(
store iter.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter iter.ResetIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *v1.Metrics,
logger log.Logger,
Expand All @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator(
"component", "bloom_generator",
"org_id", userID,
),
readWriterFn: readWriterFn,
metrics: metrics,
reporter: reporter,
writerReaderFunc: writerReaderFunc,
metrics: metrics,
reporter: reporter,

tokenizer: v1.NewBloomTokenizer(
opts.Schema.NGramLen(),
Expand Down Expand Up @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
)
}

return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
// each block by adding series to them until they are full.
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *v1.Metrics
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]
ctx context.Context
opts v1.BlockOptions
metrics *v1.Metrics
populate v1.BloomPopulatorFunc
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]

bytesAdded int
curr *v1.Block
Expand All @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator(
opts v1.BlockOptions,
metrics *v1.Metrics,
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
series iter.PeekIterator[*v1.Series],
blocks iter.ResetIterator[*v1.SeriesWithBlooms],
) *LazyBlockBuilderIterator {
return &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
readWriterFn: readWriterFn,
series: series,
blocks: blocks,
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
writerReaderFunc: writerReaderFunc,
series: series,
blocks: blocks,
}
}

Expand All @@ -221,16 +221,18 @@ func (b *LazyBlockBuilderIterator) Next() bool {
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics)
writer, reader := b.readWriterFn()
writer, reader := b.writerReaderFunc()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
_ = writer.Cleanup()
b.err = errors.Wrap(err, "failed to create bloom block builder")
return false
}
_, sourceBytes, err := mergeBuilder.Build(blockBuilder)
b.bytesAdded += sourceBytes

if err != nil {
_ = writer.Cleanup()
b.err = errors.Wrap(err, "failed to build bloom block")
return false
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package bloomcompactor

import (
"bytes"
"context"
"fmt"
"math"
"os"
"sort"
"sync"

Expand Down Expand Up @@ -49,11 +49,12 @@ func NewSimpleBloomController(
}
}

// TODO(owen-d): pool, evaluate if memory-only is the best choice
func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf)
func (s *SimpleBloomController) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) {
chaudum marked this conversation as resolved.
Show resolved Hide resolved
dir, err := os.MkdirTemp("", "bloom-block-")
if err != nil {
panic(err)
}
return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir)
}

/*
Expand Down Expand Up @@ -409,7 +410,7 @@ func (s *SimpleBloomController) buildGaps(
seriesItrWithCounter,
s.chunkLoader,
blocksIter,
s.rwFn,
s.writerReaderFunc,
reporter,
s.metrics,
logger,
Expand Down
46 changes: 23 additions & 23 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct {
metrics *Metrics
logger log.Logger

readWriterFn func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)

tokenizer *v1.BloomTokenizer
}
Expand All @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator(
store iter.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter iter.ResetIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *Metrics,
logger log.Logger,
Expand All @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator(
"component", "bloom_generator",
"org_id", userID,
),
readWriterFn: readWriterFn,
metrics: metrics,
reporter: reporter,
writerReaderFunc: writerReaderFunc,
metrics: metrics,
reporter: reporter,

tokenizer: v1.NewBloomTokenizer(
opts.Schema.NGramLen(),
Expand Down Expand Up @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
)
}

return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
// each block by adding series to them until they are full.
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate v1.BloomPopulatorFunc
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]

bytesAdded int
curr *v1.Block
Expand All @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator(
opts v1.BlockOptions,
metrics *Metrics,
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
series iter.PeekIterator[*v1.Series],
blocks iter.ResetIterator[*v1.SeriesWithBlooms],
) *LazyBlockBuilderIterator {
return &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
readWriterFn: readWriterFn,
series: series,
blocks: blocks,
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
writerReaderFunc: writerReaderFunc,
series: series,
blocks: blocks,
}
}

Expand All @@ -221,7 +221,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics)
writer, reader := b.readWriterFn()
writer, reader := b.writerReaderFunc()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
b.err = errors.Wrap(err, "failed to create bloom block builder")
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/bloom/v1/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/pkg/errors"

"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
)

Expand All @@ -22,6 +24,7 @@ type BlockWriter interface {
Blooms() (io.WriteCloser, error)
Size() (int, error) // byte size of accumualted index & blooms
Full(maxSize uint64) (full bool, size int, err error)
Cleanup() error
}

// in memory impl
Expand All @@ -39,6 +42,7 @@ func NewMemoryBlockWriter(index, blooms *bytes.Buffer) MemoryBlockWriter {
func (b MemoryBlockWriter) Index() (io.WriteCloser, error) {
return NewNoopCloser(b.index), nil
}

func (b MemoryBlockWriter) Blooms() (io.WriteCloser, error) {
return NewNoopCloser(b.blooms), nil
}
Expand All @@ -60,6 +64,12 @@ func (b MemoryBlockWriter) Full(maxSize uint64) (full bool, size int, err error)
return uint64(size) >= maxSize, size, nil
}

func (b MemoryBlockWriter) Cleanup() error {
b.index.Reset()
b.blooms.Reset()
return nil
}

// Directory based impl
type DirectoryBlockWriter struct {
dir string
Expand Down Expand Up @@ -139,3 +149,12 @@ func (b *DirectoryBlockWriter) Full(maxSize uint64) (full bool, size int, err er

return uint64(size) >= maxSize, size, nil
}

func (b *DirectoryBlockWriter) Cleanup() error {
b.initialized = false
err := multierror.New()
err.Add(os.Remove(b.index.Name()))
err.Add(os.Remove(b.blooms.Name()))
err.Add(os.RemoveAll(b.dir))
return err.Err()
}
Loading
Loading