From cb90b9d79bfebbca20354a23fbb64a8b917065df Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 15 Jul 2024 10:25:37 +0200 Subject: [PATCH 1/4] chore: Use filesystem backed writer for blooms The `DirectoryBlockWriter` and `DirectoryBlockReader` are used to avoid OOMing of compactors/builders. The tradeoff is that the writer/reader needs to be cleaned up and that it is I/O bound. Signed-off-by: Christian Haudum --- integration/bloom_building_test.go | 2 +- pkg/bloombuild/builder/builder.go | 17 ++++++++++++----- pkg/bloomcompactor/controller.go | 11 ++++++----- pkg/storage/bloom/v1/block_writer.go | 19 +++++++++++++++++++ pkg/storage/bloom/v1/reader.go | 18 ++++++++++++++++++ 5 files changed, 56 insertions(+), 11 deletions(-) diff --git a/integration/bloom_building_test.go b/integration/bloom_building_test.go index 0a96ee5702ac..8e98a6dca235 100644 --- a/integration/bloom_building_test.go +++ b/integration/bloom_building_test.go @@ -170,7 +170,6 @@ func checkForTimestampMetric(t *testing.T, cliPlanner *client.Client, metricName func createBloomStore(t *testing.T, sharedPath string) *bloomshipper.BloomStore { logger := log.NewNopLogger() - //logger := log.NewLogfmtLogger(os.Stdout) schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{ @@ -223,6 +222,7 @@ func checkSeriesInBlooms( ) { for _, lbs := range series { seriesFP := model.Fingerprint(lbs.Hash()) + t.Log(seriesFP) metas, err := bloomStore.FetchMetas(context.Background(), bloomshipper.MetaSearchParams{ TenantID: tenantID, diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 5957ad4925a8..f616b608dbe9 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -1,9 +1,9 @@ package builder import ( - "bytes" "context" "fmt" + "os" "sync" "time" @@ -371,6 +371,7 @@ func (b *Builder) processTask( built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk) if err != nil { + _ = blk.Reader().Cleanup() level.Error(logger).Log("msg", "failed to build block", "err", err) return nil, fmt.Errorf("failed to build block: %w", err) } @@ -381,11 +382,16 @@ func (b *Builder) processTask( ctx, built, ); err != nil { + _ = blk.Reader().Cleanup() level.Error(logger).Log("msg", "failed to write block", "err", err) return nil, fmt.Errorf("failed to write block: %w", err) } b.metrics.blocksCreated.Inc() + if err := blk.Reader().Cleanup(); err != nil { + level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err) + } + totalGapKeyspace := gap.Bounds.Max - gap.Bounds.Min progress := built.Bounds.Max - gap.Bounds.Min pct := float64(progress) / float64(totalGapKeyspace) * 100 @@ -477,9 +483,10 @@ func (b *Builder) loadWorkForGap( return seriesItr, blocksIter, nil } -// TODO(owen-d): pool, evaluate if memory-only is the best choice func (b *Builder) rwFn() (v1.BlockWriter, v1.BlockReader) { - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf) + dir, err := os.MkdirTemp("", "bloom-block-") + if err != nil { + panic(err) + } + return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir) } diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index fffd67f7f2f4..81fc9af2efbe 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -1,10 +1,10 @@ package bloomcompactor import ( - "bytes" "context" "fmt" "math" + "os" "sort" "sync" @@ -49,11 +49,12 @@ func NewSimpleBloomController( } } -// TODO(owen-d): pool, evaluate if memory-only is the best choice func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) { - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf) + dir, err := os.MkdirTemp("", "bloom-block-") + if err != nil { + panic(err) + } + return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir) } /* diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go index 70ed868235a7..a50c2f81e4b8 100644 --- a/pkg/storage/bloom/v1/block_writer.go +++ b/pkg/storage/bloom/v1/block_writer.go @@ -8,6 +8,8 @@ import ( "github.com/pkg/errors" + "github.com/grafana/dskit/multierror" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" ) @@ -22,6 +24,7 @@ type BlockWriter interface { Blooms() (io.WriteCloser, error) Size() (int, error) // byte size of accumualted index & blooms Full(maxSize uint64) (full bool, size int, err error) + Cleanup() error } // in memory impl @@ -39,6 +42,7 @@ func NewMemoryBlockWriter(index, blooms *bytes.Buffer) MemoryBlockWriter { func (b MemoryBlockWriter) Index() (io.WriteCloser, error) { return NewNoopCloser(b.index), nil } + func (b MemoryBlockWriter) Blooms() (io.WriteCloser, error) { return NewNoopCloser(b.blooms), nil } @@ -60,6 +64,12 @@ func (b MemoryBlockWriter) Full(maxSize uint64) (full bool, size int, err error) return uint64(size) >= maxSize, size, nil } +func (b MemoryBlockWriter) Cleanup() error { + b.index.Reset() + b.blooms.Reset() + return nil +} + // Directory based impl type DirectoryBlockWriter struct { dir string @@ -139,3 +149,12 @@ func (b *DirectoryBlockWriter) Full(maxSize uint64) (full bool, size int, err er return uint64(size) >= maxSize, size, nil } + +func (b *DirectoryBlockWriter) Cleanup() error { + b.initialized = false + err := multierror.New() + err.Add(os.Remove(b.index.Name())) + err.Add(os.Remove(b.blooms.Name())) + err.Add(os.RemoveAll(b.dir)) + return err.Err() +} diff --git a/pkg/storage/bloom/v1/reader.go b/pkg/storage/bloom/v1/reader.go index d402ee1fd971..6102d87421ce 100644 --- a/pkg/storage/bloom/v1/reader.go +++ b/pkg/storage/bloom/v1/reader.go @@ -8,6 +8,8 @@ import ( "github.com/pkg/errors" + "github.com/grafana/dskit/multierror" + iter "github.com/grafana/loki/v3/pkg/iter/v2" ) @@ -15,6 +17,7 @@ type BlockReader interface { Index() (io.ReadSeeker, error) Blooms() (io.ReadSeeker, error) TarEntries() (iter.Iterator[TarEntry], error) + Cleanup() error } // In memory reader @@ -61,6 +64,12 @@ func (r *ByteReader) TarEntries() (iter.Iterator[TarEntry], error) { return iter.NewSliceIter[TarEntry](entries), err } +func (r *ByteReader) Cleanup() error { + r.index.Reset() + r.blooms.Reset() + return nil +} + // File reader type DirectoryBlockReader struct { dir string @@ -144,3 +153,12 @@ func (r *DirectoryBlockReader) TarEntries() (iter.Iterator[TarEntry], error) { return iter.NewSliceIter[TarEntry](entries), nil } + +func (r *DirectoryBlockReader) Cleanup() error { + r.initialized = false + err := multierror.New() + err.Add(os.Remove(r.index.Name())) + err.Add(os.Remove(r.blooms.Name())) + err.Add(os.RemoveAll(r.dir)) + return err.Err() +} From 9c962663d5c53fdb35b0408381e1f69afeb44a70 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 15 Jul 2024 11:23:54 +0200 Subject: [PATCH 2/4] Unify function names Signed-off-by: Christian Haudum --- integration/bloom_building_test.go | 1 - pkg/bloombuild/builder/builder.go | 4 +-- pkg/bloombuild/builder/spec.go | 46 +++++++++++++++--------------- pkg/bloomcompactor/controller.go | 4 +-- pkg/bloomcompactor/spec.go | 46 +++++++++++++++--------------- pkg/storage/bloom/v1/reader.go | 13 ++++++++- 6 files changed, 62 insertions(+), 52 deletions(-) diff --git a/integration/bloom_building_test.go b/integration/bloom_building_test.go index 8e98a6dca235..46e8570c4771 100644 --- a/integration/bloom_building_test.go +++ b/integration/bloom_building_test.go @@ -222,7 +222,6 @@ func checkSeriesInBlooms( ) { for _, lbs := range series { seriesFP := model.Fingerprint(lbs.Hash()) - t.Log(seriesFP) metas, err := bloomStore.FetchMetas(context.Background(), bloomshipper.MetaSearchParams{ TenantID: tenantID, diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index f616b608dbe9..403765b8bceb 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -356,7 +356,7 @@ func (b *Builder) processTask( seriesItrWithCounter, b.chunkLoader, blocksIter, - b.rwFn, + b.writerReaderFunc, nil, // TODO(salvacorts): Pass reporter or remove when we address tracking b.bloomStore.BloomMetrics(), logger, @@ -483,7 +483,7 @@ func (b *Builder) loadWorkForGap( return seriesItr, blocksIter, nil } -func (b *Builder) rwFn() (v1.BlockWriter, v1.BlockReader) { +func (b *Builder) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) { dir, err := os.MkdirTemp("", "bloom-block-") if err != nil { panic(err) diff --git a/pkg/bloombuild/builder/spec.go b/pkg/bloombuild/builder/spec.go index abb6cef1447f..3602b8c8397e 100644 --- a/pkg/bloombuild/builder/spec.go +++ b/pkg/bloombuild/builder/spec.go @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct { metrics *v1.Metrics logger log.Logger - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - reporter func(model.Fingerprint) + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + reporter func(model.Fingerprint) tokenizer *v1.BloomTokenizer } @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator( store iter.Iterator[*v1.Series], chunkLoader ChunkLoader, blocksIter iter.ResetIterator[*v1.SeriesWithBlooms], - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), reporter func(model.Fingerprint), metrics *v1.Metrics, logger log.Logger, @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator( "component", "bloom_generator", "org_id", userID, ), - readWriterFn: readWriterFn, - metrics: metrics, - reporter: reporter, + writerReaderFunc: writerReaderFunc, + metrics: metrics, + reporter: reporter, tokenizer: v1.NewBloomTokenizer( opts.Schema.NGramLen(), @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds // each block by adding series to them until they are full. type LazyBlockBuilderIterator struct { - ctx context.Context - opts v1.BlockOptions - metrics *v1.Metrics - populate v1.BloomPopulatorFunc - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - series iter.PeekIterator[*v1.Series] - blocks iter.ResetIterator[*v1.SeriesWithBlooms] + ctx context.Context + opts v1.BlockOptions + metrics *v1.Metrics + populate v1.BloomPopulatorFunc + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + series iter.PeekIterator[*v1.Series] + blocks iter.ResetIterator[*v1.SeriesWithBlooms] bytesAdded int curr *v1.Block @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator( opts v1.BlockOptions, metrics *v1.Metrics, populate v1.BloomPopulatorFunc, - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), series iter.PeekIterator[*v1.Series], blocks iter.ResetIterator[*v1.SeriesWithBlooms], ) *LazyBlockBuilderIterator { return &LazyBlockBuilderIterator{ - ctx: ctx, - opts: opts, - metrics: metrics, - populate: populate, - readWriterFn: readWriterFn, - series: series, - blocks: blocks, + ctx: ctx, + opts: opts, + metrics: metrics, + populate: populate, + writerReaderFunc: writerReaderFunc, + series: series, + blocks: blocks, } } @@ -221,7 +221,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { } mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics) - writer, reader := b.readWriterFn() + writer, reader := b.writerReaderFunc() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { b.err = errors.Wrap(err, "failed to create bloom block builder") diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 81fc9af2efbe..b852896bfd27 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -49,7 +49,7 @@ func NewSimpleBloomController( } } -func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) { +func (s *SimpleBloomController) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) { dir, err := os.MkdirTemp("", "bloom-block-") if err != nil { panic(err) @@ -410,7 +410,7 @@ func (s *SimpleBloomController) buildGaps( seriesItrWithCounter, s.chunkLoader, blocksIter, - s.rwFn, + s.writerReaderFunc, reporter, s.metrics, logger, diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 61cd8f1d06a4..696f192970b6 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct { metrics *Metrics logger log.Logger - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - reporter func(model.Fingerprint) + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + reporter func(model.Fingerprint) tokenizer *v1.BloomTokenizer } @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator( store iter.Iterator[*v1.Series], chunkLoader ChunkLoader, blocksIter iter.ResetIterator[*v1.SeriesWithBlooms], - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), reporter func(model.Fingerprint), metrics *Metrics, logger log.Logger, @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator( "component", "bloom_generator", "org_id", userID, ), - readWriterFn: readWriterFn, - metrics: metrics, - reporter: reporter, + writerReaderFunc: writerReaderFunc, + metrics: metrics, + reporter: reporter, tokenizer: v1.NewBloomTokenizer( opts.Schema.NGramLen(), @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds // each block by adding series to them until they are full. type LazyBlockBuilderIterator struct { - ctx context.Context - opts v1.BlockOptions - metrics *Metrics - populate v1.BloomPopulatorFunc - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - series iter.PeekIterator[*v1.Series] - blocks iter.ResetIterator[*v1.SeriesWithBlooms] + ctx context.Context + opts v1.BlockOptions + metrics *Metrics + populate v1.BloomPopulatorFunc + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + series iter.PeekIterator[*v1.Series] + blocks iter.ResetIterator[*v1.SeriesWithBlooms] bytesAdded int curr *v1.Block @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator( opts v1.BlockOptions, metrics *Metrics, populate v1.BloomPopulatorFunc, - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), series iter.PeekIterator[*v1.Series], blocks iter.ResetIterator[*v1.SeriesWithBlooms], ) *LazyBlockBuilderIterator { return &LazyBlockBuilderIterator{ - ctx: ctx, - opts: opts, - metrics: metrics, - populate: populate, - readWriterFn: readWriterFn, - series: series, - blocks: blocks, + ctx: ctx, + opts: opts, + metrics: metrics, + populate: populate, + writerReaderFunc: writerReaderFunc, + series: series, + blocks: blocks, } } @@ -221,7 +221,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { } mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics) - writer, reader := b.readWriterFn() + writer, reader := b.writerReaderFunc() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { b.err = errors.Wrap(err, "failed to create bloom block builder") diff --git a/pkg/storage/bloom/v1/reader.go b/pkg/storage/bloom/v1/reader.go index 6102d87421ce..d589aa19c492 100644 --- a/pkg/storage/bloom/v1/reader.go +++ b/pkg/storage/bloom/v1/reader.go @@ -122,17 +122,28 @@ func (r *DirectoryBlockReader) Blooms() (io.ReadSeeker, error) { } func (r *DirectoryBlockReader) TarEntries() (iter.Iterator[TarEntry], error) { + var err error if !r.initialized { - if err := r.Init(); err != nil { + if err = r.Init(); err != nil { return nil, err } } + _, err = r.index.Seek(0, io.SeekStart) + if err != nil { + return nil, errors.Wrap(err, "error seeking series file") + } + idxInfo, err := r.index.Stat() if err != nil { return nil, errors.Wrap(err, "error stat'ing series file") } + _, err = r.blooms.Seek(0, io.SeekStart) + if err != nil { + return nil, errors.Wrap(err, "error seeking bloom file") + } + bloomInfo, err := r.blooms.Stat() if err != nil { return nil, errors.Wrap(err, "error stat'ing bloom file") From 5dde9a39c61812b35fb65e2055ec8b406671c33a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 15 Jul 2024 17:25:21 +0200 Subject: [PATCH 3/4] Log cleanup error Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/builder.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 403765b8bceb..d338584ef313 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -371,8 +371,10 @@ func (b *Builder) processTask( built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk) if err != nil { - _ = blk.Reader().Cleanup() level.Error(logger).Log("msg", "failed to build block", "err", err) + if err = blk.Reader().Cleanup(); err != nil { + level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err) + } return nil, fmt.Errorf("failed to build block: %w", err) } @@ -382,8 +384,10 @@ func (b *Builder) processTask( ctx, built, ); err != nil { - _ = blk.Reader().Cleanup() level.Error(logger).Log("msg", "failed to write block", "err", err) + if err = blk.Reader().Cleanup(); err != nil { + level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err) + } return nil, fmt.Errorf("failed to write block: %w", err) } b.metrics.blocksCreated.Inc() From 75453c4e3948ee2287dca1b0dc012f70534ce049 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 16 Jul 2024 12:10:09 +0200 Subject: [PATCH 4/4] Add hidden config for bloom builder working directory Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/builder.go | 2 +- pkg/bloombuild/builder/config.go | 2 ++ pkg/bloombuild/builder/spec.go | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index d338584ef313..982d14388e28 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -488,7 +488,7 @@ func (b *Builder) loadWorkForGap( } func (b *Builder) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) { - dir, err := os.MkdirTemp("", "bloom-block-") + dir, err := os.MkdirTemp(b.cfg.WorkingDir, "bloom-block-") if err != nil { panic(err) } diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index d0c553104b09..deeeb951465a 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -13,6 +13,7 @@ type Config struct { GrpcConfig grpcclient.Config `yaml:"grpc_config"` PlannerAddress string `yaml:"planner_address"` BackoffConfig backoff.Config `yaml:"backoff_config"` + WorkingDir string `yaml:"working_directory" doc:"hidden"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. @@ -20,6 +21,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner") cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+".backoff", f) + f.StringVar(&cfg.WorkingDir, prefix+".working-directory", "", "Working directory to which blocks are temporarily written to. Empty string defaults to the operating system's temp directory.") } func (cfg *Config) Validate() error { diff --git a/pkg/bloombuild/builder/spec.go b/pkg/bloombuild/builder/spec.go index 3602b8c8397e..82457cf92b84 100644 --- a/pkg/bloombuild/builder/spec.go +++ b/pkg/bloombuild/builder/spec.go @@ -224,6 +224,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { writer, reader := b.writerReaderFunc() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { + _ = writer.Cleanup() b.err = errors.Wrap(err, "failed to create bloom block builder") return false } @@ -231,6 +232,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { b.bytesAdded += sourceBytes if err != nil { + _ = writer.Cleanup() b.err = errors.Wrap(err, "failed to build bloom block") return false }