From 4117b6ca981b2852a15b102be2394bffce37b3e2 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 20 Jun 2024 12:05:47 +0200 Subject: [PATCH] perf: Re-introduce fixed size memory pool for bloom querier (#13172) This PR re-introduces the fixed size memory pool that was originally introduced with #13039 but reverted with #13162 Additionally, it removes the package-global variable to store the type of allocator that should be used. Instead, the allocator type is passed during the bloom store creation. Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/builder.go | 4 +- pkg/bloombuild/builder/spec_test.go | 5 +- pkg/bloombuild/planner/planner.go | 4 +- pkg/bloombuild/planner/planner_test.go | 3 +- pkg/bloomcompactor/bloomcompactor.go | 4 +- pkg/bloomcompactor/controller.go | 4 +- pkg/bloomcompactor/retention.go | 4 +- pkg/bloomcompactor/retention_test.go | 3 +- pkg/bloomcompactor/spec_test.go | 5 +- pkg/bloomgateway/bloomgateway.go | 4 +- pkg/bloomgateway/bloomgateway_test.go | 3 +- pkg/bloomgateway/processor.go | 2 +- pkg/bloomgateway/processor_test.go | 21 ++- pkg/bloomgateway/resolver.go | 4 +- pkg/bloomgateway/util_test.go | 1 + pkg/loki/loki.go | 2 +- pkg/loki/modules.go | 20 ++- pkg/storage/bloom/v1/block.go | 23 +-- pkg/storage/bloom/v1/bloom.go | 44 ++++-- pkg/storage/bloom/v1/bloom_querier.go | 36 ++--- pkg/storage/bloom/v1/builder_test.go | 15 +- pkg/storage/bloom/v1/fuse_test.go | 16 ++- pkg/storage/bloom/v1/index.go | 2 +- pkg/storage/bloom/v1/util.go | 31 +--- .../bloom/v1/versioned_builder_test.go | 5 +- .../stores/shipper/bloomshipper/cache.go | 7 +- .../shipper/bloomshipper/config/config.go | 67 +++++++++ .../stores/shipper/bloomshipper/fetcher.go | 7 +- .../stores/shipper/bloomshipper/shipper.go | 4 +- .../stores/shipper/bloomshipper/store.go | 35 +++-- .../stores/shipper/bloomshipper/store_test.go | 3 +- pkg/util/flagext/csv.go | 62 ++++++++ pkg/util/flagext/csv_test.go | 79 ++++++++++ pkg/util/mempool/allocator.go | 49 +++++++ pkg/util/mempool/bucket.go | 51 +++++++ pkg/util/mempool/metrics.go | 32 +++++ pkg/util/mempool/pool.go | 135 ++++++++++++++++++ pkg/util/mempool/pool_test.go | 133 +++++++++++++++++ tools/bloom/inspector/main.go | 3 +- 39 files changed, 797 insertions(+), 135 deletions(-) create mode 100644 pkg/util/flagext/csv.go create mode 100644 pkg/util/flagext/csv_test.go create mode 100644 pkg/util/mempool/allocator.go create mode 100644 pkg/util/mempool/bucket.go create mode 100644 pkg/util/mempool/metrics.go create mode 100644 pkg/util/mempool/pool.go create mode 100644 pkg/util/mempool/pool_test.go diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index cbbd737a8319..52ef9e023f4f 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -38,7 +38,7 @@ type Builder struct { logger log.Logger tsdbStore common.TSDBStore - bloomStore bloomshipper.Store + bloomStore bloomshipper.StoreBase chunkLoader ChunkLoader client protos.PlannerForBuilderClient @@ -51,7 +51,7 @@ func New( storeCfg storage.Config, storageMetrics storage.ClientMetrics, fetcherProvider stores.ChunkFetcherProvider, - bloomStore bloomshipper.Store, + bloomStore bloomshipper.StoreBase, logger log.Logger, r prometheus.Registerer, ) (*Builder, error) { diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index e6b47b1442a6..77bb76f7ecaf 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/v3/pkg/chunkenc" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/util/mempool" ) func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) { @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), }) } @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter() + bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter() for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 8234dde9c54a..dd44c545ff36 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -40,7 +40,7 @@ type Planner struct { schemaCfg config.SchemaConfig tsdbStore common.TSDBStore - bloomStore bloomshipper.Store + bloomStore bloomshipper.StoreBase tasksQueue *queue.RequestQueue activeUsers *util.ActiveUsersCleanupService @@ -57,7 +57,7 @@ func New( schemaCfg config.SchemaConfig, storeCfg storage.Config, storageMetrics storage.ClientMetrics, - bloomStore bloomshipper.Store, + bloomStore bloomshipper.StoreBase, logger log.Logger, r prometheus.Registerer, ) (*Planner, error) { diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 64c6ef086dac..433c978fa0f2 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -29,6 +29,7 @@ import ( bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/storage/types" + "github.com/grafana/loki/v3/pkg/util/mempool" ) var testDay = parseDayTime("2023-09-01") @@ -411,7 +412,7 @@ func createPlanner( reg := prometheus.NewPedanticRegistry() metasCache := cache.NewNoopCache() blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger) - bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, reg, logger) + bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger) require.NoError(t, err) planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index acfb5ba01f35..8eed0823314a 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -53,7 +53,7 @@ type Compactor struct { retentionManager *RetentionManager // temporary workaround until bloomStore has implemented read/write shipper interface - bloomStore bloomshipper.Store + bloomStore bloomshipper.StoreBase sharding util_ring.TenantSharding @@ -69,7 +69,7 @@ func New( ring ring.ReadRing, ringLifeCycler *ring.BasicLifecycler, limits Limits, - store bloomshipper.StoreWithMetrics, + store bloomshipper.Store, logger log.Logger, r prometheus.Registerer, ) (*Compactor, error) { diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 277d040d688b..3929f2da3f80 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -22,7 +22,7 @@ import ( type SimpleBloomController struct { tsdbStore TSDBStore - bloomStore bloomshipper.Store + bloomStore bloomshipper.StoreBase chunkLoader ChunkLoader metrics *Metrics limits Limits @@ -32,7 +32,7 @@ type SimpleBloomController struct { func NewSimpleBloomController( tsdbStore TSDBStore, - blockStore bloomshipper.Store, + blockStore bloomshipper.StoreBase, chunkLoader ChunkLoader, limits Limits, metrics *Metrics, diff --git a/pkg/bloomcompactor/retention.go b/pkg/bloomcompactor/retention.go index 7dd30dece9e8..caaf80ffb9c3 100644 --- a/pkg/bloomcompactor/retention.go +++ b/pkg/bloomcompactor/retention.go @@ -95,7 +95,7 @@ type RetentionLimits interface { type RetentionManager struct { cfg RetentionConfig limits RetentionLimits - bloomStore bloomshipper.Store + bloomStore bloomshipper.StoreBase sharding retentionSharding metrics *Metrics logger log.Logger @@ -108,7 +108,7 @@ type RetentionManager struct { func NewRetentionManager( cfg RetentionConfig, limits RetentionLimits, - bloomStore bloomshipper.Store, + bloomStore bloomshipper.StoreBase, sharding retentionSharding, metrics *Metrics, logger log.Logger, diff --git a/pkg/bloomcompactor/retention_test.go b/pkg/bloomcompactor/retention_test.go index b8e855b0d4e9..e610ab5b02e0 100644 --- a/pkg/bloomcompactor/retention_test.go +++ b/pkg/bloomcompactor/retention_test.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/v3/pkg/storage/types" util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/mempool" lokiring "github.com/grafana/loki/v3/pkg/util/ring" "github.com/grafana/loki/v3/pkg/validation" ) @@ -822,7 +823,7 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B metasCache := cache.NewMockCache() blocksCache := bloomshipper.NewFsBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger) - store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, reg, logger) + store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger) if err == nil { t.Cleanup(store.Stop) } diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index f887d3205322..e08cafb68cab 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/v3/pkg/chunkenc" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/util/mempool" ) func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) { @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), }) } @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter() + bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter() for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 5747f6e7993e..603d41c2c437 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -50,7 +50,7 @@ type Gateway struct { queue *queue.RequestQueue activeUsers *util.ActiveUsersCleanupService - bloomStore bloomshipper.StoreWithMetrics + bloomStore bloomshipper.Store pendingTasks *atomic.Int64 @@ -72,7 +72,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int { } // New returns a new instance of the Bloom Gateway. -func New(cfg Config, store bloomshipper.StoreWithMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { +func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { utillog.WarnExperimentalUse("Bloom Gateway", logger) g := &Gateway{ cfg: cfg, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 9250ec91ff86..67bb59e460ad 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -27,6 +27,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/v3/pkg/storage/types" + "github.com/grafana/loki/v3/pkg/util/mempool" "github.com/grafana/loki/v3/pkg/validation" ) @@ -92,7 +93,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore { reg := prometheus.NewRegistry() blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, nil, logger) - store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, reg, logger) + store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger) require.NoError(t, err) t.Cleanup(store.Stop) diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 6973ad1f565b..e95953c94bf4 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -88,7 +88,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config. // after iteration for performance (alloc reduction). // This is safe to do here because we do not capture // the underlying bloom []byte outside of iteration - bloomshipper.WithPool(true), + bloomshipper.WithPool(p.store.Allocator()), ) duration = time.Since(startBlocks) level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err) diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index 9d2d6c6d0642..0a2fd804ead7 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -20,16 +20,21 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/util/mempool" ) -var _ bloomshipper.Store = &dummyStore{} +var _ bloomshipper.StoreBase = &dummyStore{} // refs and blocks must be in 1-1 correspondence. func newMockBloomStore(refs []bloomshipper.BlockRef, blocks []*v1.Block, metas []bloomshipper.Meta) *dummyStore { + allocator := mempool.New("bloompages", mempool.Buckets{ + {Size: 32, Capacity: 512 << 10}, + }, nil) return &dummyStore{ - refs: refs, - blocks: blocks, - metas: metas, + refs: refs, + blocks: blocks, + metas: metas, + allocator: allocator, } } @@ -38,6 +43,8 @@ type dummyStore struct { refs []bloomshipper.BlockRef blocks []*v1.Block + allocator mempool.Allocator + // mock how long it takes to serve block queriers delay time.Duration // mock response error when serving block queriers in ForEach @@ -76,6 +83,10 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) { return nil, nil } +func (s *dummyStore) Allocator() mempool.Allocator { + return s.allocator +} + func (s *dummyStore) Stop() { } @@ -92,7 +103,7 @@ func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef if ref.Bounds.Equal(s.refs[i].Bounds) { blockCopy := *block bq := &bloomshipper.CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(&blockCopy, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(&blockCopy, s.Allocator(), v1.DefaultMaxPageSize), BlockRef: s.refs[i], } result = append(result, bq) diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index 62ec5836cc13..0f6fe2762695 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -24,7 +24,7 @@ type blockWithSeries struct { } type defaultBlockResolver struct { - store bloomshipper.Store + store bloomshipper.StoreBase logger log.Logger } @@ -123,7 +123,7 @@ func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkR return skipped } -func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver { +func NewBlockResolver(store bloomshipper.StoreBase, logger log.Logger) BlockResolver { return &defaultBlockResolver{ store: store, logger: logger, diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index ed47d46456d9..f6ae68cf2aa2 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -432,6 +432,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, // t.Log(i, j, string(keys[i][j])) // } // } + blocks = append(blocks, block) metas = append(metas, meta) blockRefs = append(blockRefs, blockRef) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index ce826f0752d4..68b210de4a77 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -337,7 +337,7 @@ type Loki struct { querierAPI *querier.QuerierAPI ingesterQuerier *querier.IngesterQuerier Store storage.Store - BloomStore bloomshipper.StoreWithMetrics + BloomStore bloomshipper.Store tableManager *index.TableManager frontend Frontend ruler *base_ruler.Ruler diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 39c5df98b84c..e405f5d762f0 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -79,6 +79,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" "github.com/grafana/loki/v3/pkg/util/limiter" util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/mempool" "github.com/grafana/loki/v3/pkg/util/querylimits" lokiring "github.com/grafana/loki/v3/pkg/util/ring" serverutil "github.com/grafana/loki/v3/pkg/util/server" @@ -754,7 +755,24 @@ func (t *Loki) initBloomStore() (services.Service, error) { level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err) } - t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger) + var pageAllocator mempool.Allocator + + // Set global BloomPageAllocator variable + switch bsCfg.MemoryManagement.BloomPageAllocationType { + case "simple": + pageAllocator = &mempool.SimpleHeapAllocator{} + case "dynamic": + // sync buffer pool for bloom pages + // 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB + pageAllocator = mempool.NewBytePoolAllocator(128<<10, 128<<20, 2) + case "fixed": + pageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg) + default: + // should not happen as the type is validated upfront + return nil, fmt.Errorf("failed to create bloom store: invalid allocator type") + } + + t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, pageAllocator, reg, logger) if err != nil { return nil, fmt.Errorf("failed to create bloom store: %w", err) } diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index b0b4e5ad9647..8aaf21d5e751 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/pkg/errors" + + "github.com/grafana/loki/v3/pkg/util/mempool" ) type BlockMetadata struct { @@ -110,17 +112,18 @@ type BlockQuerier struct { } // NewBlockQuerier returns a new BlockQuerier for the given block. -// WARNING: If noCapture is true, the underlying byte slice of the bloom page -// will be returned to the pool for efficiency. This can only safely be used -// when the underlying bloom bytes don't escape the decoder, i.e. -// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor). -// When usePool is true, the bloom MUST NOT be captured by the caller. Rather, -// it should be discarded before another call to Next(). -func NewBlockQuerier(b *Block, usePool bool, maxPageSize int) *BlockQuerier { +// WARNING: You can pass an implementation of Allocator that is responsible for +// whether the underlying byte slice of the bloom page will be returned to the +// pool for efficiency or not. Returning to the pool can only safely be used +// when the underlying bloom bytes don't escape the decoder, i.e. when loading +// blooms for querying (bloom-gateway), but not for writing (bloom-compactor). +// Therefore, when calling NewBlockQuerier on the write path, you should always +// pass the SimpleHeapAllocator implementation of the Allocator interface. +func NewBlockQuerier(b *Block, alloc mempool.Allocator, maxPageSize int) *BlockQuerier { return &BlockQuerier{ block: b, LazySeriesIter: NewLazySeriesIter(b), - blooms: NewLazyBloomIter(b, usePool, maxPageSize), + blooms: NewLazyBloomIter(b, alloc, maxPageSize), } } @@ -144,6 +147,10 @@ func (bq *BlockQuerier) Err() error { return bq.blooms.Err() } +func (bq *BlockQuerier) Close() { + bq.blooms.Close() +} + type BlockQuerierIter struct { *BlockQuerier } diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index b9f4b0cdc6a9..dfd8b758c338 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" + "github.com/grafana/loki/v3/pkg/util/mempool" ) // NB(chaudum): Some block pages are way bigger than others (400MiB and @@ -24,7 +25,7 @@ type Bloom struct { func (b *Bloom) Encode(enc *encoding.Encbuf) error { // divide by 8 b/c bloom capacity is measured in bits, but we want bytes - buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8))) + buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8))) // TODO(owen-d): have encoder implement writer directly so we don't need // to indirect via a buffer @@ -36,7 +37,6 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error { data := buf.Bytes() enc.PutUvarint(len(data)) // length of bloom filter enc.PutBytes(data) - BloomPagePool.Put(data[:0]) // release to pool return nil } @@ -64,11 +64,14 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error { return nil } -func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { - data := BloomPagePool.Get(page.Len)[:page.Len] - defer BloomPagePool.Put(data) +func LazyDecodeBloomPage(r io.Reader, alloc mempool.Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { + data, err := alloc.Get(page.Len) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } + defer alloc.Put(data) - _, err := io.ReadFull(r, data) + _, err = io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -84,7 +87,10 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe } defer pool.PutReader(decompressor) - b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen] + b, err := alloc.Get(page.DecompressedLen) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } if _, err = io.ReadFull(decompressor, b); err != nil { return nil, errors.Wrap(err, "decompressing bloom page") @@ -96,14 +102,18 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe } // shortcut to skip allocations when we know the page is not compressed -func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) { +func LazyDecodeBloomPageNoCompression(r io.Reader, alloc mempool.Allocator, page BloomPageHeader) (*BloomPageDecoder, error) { // data + checksum if page.Len != page.DecompressedLen+4 { return nil, errors.New("the Len and DecompressedLen of the page do not match") } - data := BloomPagePool.Get(page.Len)[:page.Len] - _, err := io.ReadFull(r, data) + data, err := alloc.Get(page.Len) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } + + _, err = io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -158,12 +168,16 @@ type BloomPageDecoder struct { // This can only safely be used when the underlying bloom // bytes don't escape the decoder: // on reads in the bloom-gw but not in the bloom-compactor -func (d *BloomPageDecoder) Relinquish() { +func (d *BloomPageDecoder) Relinquish(alloc mempool.Allocator) { + if d == nil { + return + } + data := d.data d.data = nil if cap(data) > 0 { - BloomPagePool.Put(data) + _ = alloc.Put(data) } } @@ -271,7 +285,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { // BloomPageDecoder returns a decoder for the given page index. // It may skip the page if it's too large. // NB(owen-d): if `skip` is true, err _must_ be nil. -func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { +func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc mempool.Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { if pageIdx < 0 || pageIdx >= len(b.pageHeaders) { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen)) @@ -294,9 +308,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize } if b.schema.encoding == chunkenc.EncNone { - res, err = LazyDecodeBloomPageNoCompression(r, page) + res, err = LazyDecodeBloomPageNoCompression(r, alloc, page) } else { - res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page) + res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page) } if err != nil { diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index 8de9a33e713f..ab30b74f8a9e 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -1,17 +1,21 @@ package v1 -import "github.com/pkg/errors" +import ( + "github.com/pkg/errors" + + "github.com/grafana/loki/v3/pkg/util/mempool" +) type BloomQuerier interface { Seek(BloomOffset) (*Bloom, error) } type LazyBloomIter struct { - usePool bool - b *Block m int // max page size in bytes + alloc mempool.Allocator + // state initialized bool err error @@ -24,11 +28,11 @@ type LazyBloomIter struct { // will be returned to the pool for efficiency. // This can only safely be used when the underlying bloom // bytes don't escape the decoder. -func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter { +func NewLazyBloomIter(b *Block, alloc mempool.Allocator, maxSize int) *LazyBloomIter { return &LazyBloomIter{ - usePool: pool, - b: b, - m: maxSize, + b: b, + m: maxSize, + alloc: alloc, } } @@ -53,16 +57,14 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) { // drop the current page if it exists and // we're using the pool - if it.curPage != nil && it.usePool { - it.curPage.Relinquish() - } + it.curPage.Relinquish(it.alloc) r, err := it.b.reader.Blooms() if err != nil { it.err = errors.Wrap(err, "getting blooms reader") return false } - decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics) + decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics) if err != nil { it.err = errors.Wrap(err, "loading bloom page") return false @@ -106,6 +108,7 @@ func (it *LazyBloomIter) next() bool { var skip bool it.curPage, skip, err = it.b.blooms.BloomPageDecoder( r, + it.alloc, it.curPageIndex, it.m, it.b.metrics, @@ -130,11 +133,8 @@ func (it *LazyBloomIter) next() bool { // we've exhausted the current page, progress to next it.curPageIndex++ - // drop the current page if it exists and - // we're using the pool - if it.usePool { - it.curPage.Relinquish() - } + // drop the current page if it exists + it.curPage.Relinquish(it.alloc) it.curPage = nil continue } @@ -161,3 +161,7 @@ func (it *LazyBloomIter) Err() error { return nil } } + +func (it *LazyBloomIter) Close() { + it.curPage.Relinquish(it.alloc) +} diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index ae1b440af09b..15f0de0842a9 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" + "github.com/grafana/loki/v3/pkg/util/mempool" ) var blockEncodings = []chunkenc.Encoding{ @@ -121,7 +122,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) { } block := NewBlock(tc.reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize).Iter() + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter() err = block.LoadHeaders() require.Nil(t, err) @@ -239,7 +240,7 @@ func TestMergeBuilder(t *testing.T) { itr := NewSliceIter[SeriesWithBlooms](data[min:max]) _, err = builder.BuildFrom(itr) require.Nil(t, err) - blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize).Iter())) + blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter())) } // We're not testing the ability to extend a bloom in this test @@ -280,7 +281,7 @@ func TestMergeBuilder(t *testing.T) { require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize) EqualIterators[*SeriesWithBlooms]( t, @@ -372,7 +373,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) { require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize) require.True(t, querier.Next()) require.Equal(t, @@ -417,7 +418,7 @@ func TestBlockReset(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize) rounds := make([][]model.Fingerprint, 2) @@ -482,7 +483,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize).Iter() + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter() // rather than use the block querier directly, collect it's data // so we can use it in a few places later @@ -552,7 +553,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data - mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize) + mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize) sourceItr := NewSliceIter[*SeriesWithBlooms](PointerSlice[SeriesWithBlooms](xs)) EqualIterators[*SeriesWithBlooms]( diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 7f11eece4c23..745981965893 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -14,8 +14,15 @@ import ( "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" + "github.com/grafana/loki/v3/pkg/util/mempool" ) +var BloomPagePool = mempool.New("test", []mempool.Bucket{ + {Size: 16, Capacity: 128 << 10}, + {Size: 16, Capacity: 256 << 10}, + {Size: 16, Capacity: 512 << 10}, +}, nil) + // TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't // have to refactor tests here in order to fix this elsewhere, but it can/should be fixed -- // the skip & n len are hardcoded based on data that's passed to it elsewhere. @@ -64,7 +71,7 @@ func TestFusedQuerier(t *testing.T) { require.NoError(t, err) require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, DefaultMaxPageSize) + querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) n := 500 // series per request nReqs := numSeries / n @@ -194,7 +201,7 @@ func TestFuseMultiPage(t *testing.T) { block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, 100<<20) // 100MB too large to interfere + querier := NewBlockQuerier(block, BloomPagePool, 100<<20) // 100MB too large to interfere keys := [][]byte{ key1, // found in the first bloom @@ -315,8 +322,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - smallMaxPageSize := 1000 // deliberately trigger page skipping for tests - querier := NewBlockQuerier(block, true, smallMaxPageSize) + querier := NewBlockQuerier(block, BloomPagePool, 1000) for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ { err := querier.Seek(fp) @@ -373,7 +379,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou _, err = builder.BuildFrom(itr) require.Nil(b, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, DefaultMaxPageSize) + querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) numRequestChains := 100 seriesPerRequest := 100 diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index b11927ed303b..8cae1d8a87f1 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -166,7 +166,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead return nil, errors.Wrap(err, "seeking to series page") } - data := SeriesPagePool.Get(header.Len)[:header.Len] + data, _ := SeriesPagePool.Get(header.Len) defer SeriesPagePool.Put(data) _, err = io.ReadFull(r, data) if err != nil { diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 85aa7baa7b81..ae0a70453098 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -8,7 +8,7 @@ import ( "io" "sync" - "github.com/prometheus/prometheus/util/pool" + "github.com/grafana/loki/v3/pkg/util/mempool" ) type Version byte @@ -44,36 +44,9 @@ var ( // buffer pool for series pages // 1KB 2KB 4KB 8KB 16KB 32KB 64KB 128KB - SeriesPagePool = BytePool{ - pool: pool.New( - 1<<10, 128<<10, 2, - func(size int) interface{} { - return make([]byte, size) - }), - } - - // buffer pool for bloom pages - // 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB - BloomPagePool = BytePool{ - pool: pool.New( - 128<<10, 128<<20, 2, - func(size int) interface{} { - return make([]byte, size) - }), - } + SeriesPagePool = mempool.NewBytePoolAllocator(1<<10, 128<<10, 2) ) -type BytePool struct { - pool *pool.Pool -} - -func (p *BytePool) Get(size int) []byte { - return p.pool.Get(size).([]byte)[:0] -} -func (p *BytePool) Put(b []byte) { - p.pool.Put(b) -} - func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go index a88ed9396982..eca86ef7aaa1 100644 --- a/pkg/storage/bloom/v1/versioned_builder_test.go +++ b/pkg/storage/bloom/v1/versioned_builder_test.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/util/encoding" + "github.com/grafana/loki/v3/pkg/util/mempool" ) // smallBlockOpts returns a set of block options that are suitable for testing @@ -61,7 +62,7 @@ func TestV1RoundTrip(t *testing.T) { // Ensure Equality block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize).Iter() + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter() CompareIterators[SeriesWithLiteralBlooms, *SeriesWithBlooms]( t, @@ -118,7 +119,7 @@ func TestV2Roundtrip(t *testing.T) { // Ensure Equality block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize).Iter() + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter() CompareIterators[SeriesWithLiteralBlooms, *SeriesWithBlooms]( t, diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 3c324b7b8b0e..8b7ba7d253a9 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -13,6 +13,7 @@ import ( v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/util" + "github.com/grafana/loki/v3/pkg/util/mempool" ) type CloseableBlockQuerier struct { @@ -22,6 +23,7 @@ type CloseableBlockQuerier struct { } func (c *CloseableBlockQuerier) Close() error { + c.BlockQuerier.Close() if c.close != nil { return c.close() } @@ -157,15 +159,14 @@ func (b *BlockDirectory) resolveSize() error { // BlockQuerier returns a new block querier from the directory. // The passed function `close` is called when the the returned querier is closed. - func (b BlockDirectory) BlockQuerier( - usePool bool, + alloc mempool.Allocator, close func() error, maxPageSize int, metrics *v1.Metrics, ) *CloseableBlockQuerier { return &CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool, maxPageSize), + BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), alloc, maxPageSize), BlockRef: b.BlockRef, close: close, } diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 72d8f8557b09..6de144a3f84b 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -4,11 +4,16 @@ package config import ( "errors" "flag" + "fmt" + "slices" + "strings" "time" "github.com/grafana/dskit/flagext" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + lokiflagext "github.com/grafana/loki/v3/pkg/util/flagext" + "github.com/grafana/loki/v3/pkg/util/mempool" ) type Config struct { @@ -18,6 +23,7 @@ type Config struct { BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` MetasCache cache.Config `yaml:"metas_cache"` MetasLRUCache cache.EmbeddedCacheConfig `yaml:"metas_lru_cache"` + MemoryManagement MemoryManagementConfig `yaml:"memory_management" doc:"hidden"` // This will always be set to true when flags are registered. // In tests, where config is created as literal, it can be set manually. @@ -34,6 +40,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour) c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f) c.MetasLRUCache.RegisterFlagsWithPrefix(prefix+"metas-lru-cache.", "In-memory LRU cache for bloom metas. ", f) + c.MemoryManagement.RegisterFlagsWithPrefix(prefix+"memory-management.", f) // always cache LIST operations c.CacheListOps = true @@ -43,6 +50,9 @@ func (c *Config) Validate() error { if len(c.WorkingDirectory) == 0 { return errors.New("at least one working directory must be specified") } + if err := c.MemoryManagement.Validate(); err != nil { + return err + } return nil } @@ -81,3 +91,60 @@ func (cfg *BlocksCacheConfig) Validate() error { } return nil } + +var ( + // the default that describes a 4GiB memory pool + defaultMemPoolBuckets = mempool.Buckets{ + {Size: 128, Capacity: 64 << 10}, // 8MiB -- for tests + {Size: 512, Capacity: 2 << 20}, // 1024MiB + {Size: 128, Capacity: 8 << 20}, // 1024MiB + {Size: 32, Capacity: 32 << 20}, // 1024MiB + {Size: 8, Capacity: 128 << 20}, // 1024MiB + } + types = supportedAllocationTypes{ + "simple", "simple heap allocations using Go's make([]byte, n) and no re-cycling of buffers", + "dynamic", "a buffer pool with variable sized buckets and best effort re-cycling of buffers using Go's sync.Pool", + "fixed", "a fixed size memory pool with configurable slab sizes, see mem-pool-buckets", + } +) + +type MemoryManagementConfig struct { + BloomPageAllocationType string `yaml:"bloom_page_alloc_type"` + BloomPageMemPoolBuckets lokiflagext.CSV[mempool.Bucket] `yaml:"bloom_page_mem_pool_buckets"` +} + +func (cfg *MemoryManagementConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.BloomPageAllocationType, prefix+"alloc-type", "dynamic", fmt.Sprintf("One of: %s", strings.Join(types.descriptions(), ", "))) + + _ = cfg.BloomPageMemPoolBuckets.Set(defaultMemPoolBuckets.String()) + f.Var(&cfg.BloomPageMemPoolBuckets, prefix+"mem-pool-buckets", "Comma separated list of buckets in the format {size}x{bytes}") +} + +func (cfg *MemoryManagementConfig) Validate() error { + if !slices.Contains(types.names(), cfg.BloomPageAllocationType) { + msg := fmt.Sprintf("bloom_page_alloc_type must be one of: %s", strings.Join(types.descriptions(), ", ")) + return errors.New(msg) + } + if cfg.BloomPageAllocationType == "fixed" && len(cfg.BloomPageMemPoolBuckets) == 0 { + return errors.New("fixed memory pool requires at least one bucket") + } + return nil +} + +type supportedAllocationTypes []string + +func (t supportedAllocationTypes) names() []string { + names := make([]string, 0, len(t)/2) + for i := 0; i < len(t); i += 2 { + names = append(names, t[i]) + } + return names +} + +func (t supportedAllocationTypes) descriptions() []string { + names := make([]string, 0, len(t)/2) + for i := 0; i < len(t); i += 2 { + names = append(names, fmt.Sprintf("%s (%s)", t[i], t[i+1])) + } + return names +} diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index c2a2939a805b..42d8d116b64a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -19,6 +19,7 @@ import ( v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/util/mempool" "github.com/grafana/loki/v3/pkg/util/spanlogger" ) @@ -30,7 +31,7 @@ type options struct { // return bloom blocks to pool after iteration; default=false // NB(owen-d): this can only be safely used when blooms are not captured outside // of iteration or it can introduce use-after-free bugs - usePool bool + usePool mempool.Allocator } func (o *options) apply(opts ...FetchOption) { @@ -53,7 +54,7 @@ func WithFetchAsync(v bool) FetchOption { } } -func WithPool(v bool) FetchOption { +func WithPool(v mempool.Allocator) FetchOption { return func(opts *options) { opts.usePool = v } @@ -222,7 +223,7 @@ func (f *Fetcher) writeBackMetas(ctx context.Context, metas []Meta) error { // FetchBlocks implements fetcher func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...FetchOption) ([]*CloseableBlockQuerier, error) { // apply fetch options - cfg := &options{ignoreNotFound: true, fetchAsync: false, usePool: false} + cfg := &options{ignoreNotFound: true, fetchAsync: false, usePool: &mempool.SimpleHeapAllocator{}} cfg.apply(opts...) // first, resolve blocks from cache and enqueue missing blocks to download queue diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index edaa15596fff..8e58e1231d25 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -16,10 +16,10 @@ type Interface interface { } type Shipper struct { - store Store + store StoreBase } -func NewShipper(client Store) *Shipper { +func NewShipper(client StoreBase) *Shipper { return &Shipper{store: client} } diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index f2c77d7ac74e..363fb7806ece 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/util/mempool" "github.com/grafana/loki/v3/pkg/util/spanlogger" ) @@ -28,7 +29,7 @@ var ( errNoStore = errors.New("no store found for time") ) -type Store interface { +type StoreBase interface { ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error) FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...FetchOption) ([]*CloseableBlockQuerier, error) @@ -41,9 +42,10 @@ type Store interface { Stop() } -type StoreWithMetrics interface { - Store +type Store interface { + StoreBase BloomMetrics() *v1.Metrics + Allocator() mempool.Allocator } type bloomStoreConfig struct { @@ -53,7 +55,7 @@ type bloomStoreConfig struct { } // Compiler check to ensure bloomStoreEntry implements the Store interface -var _ Store = &bloomStoreEntry{} +var _ StoreBase = &bloomStoreEntry{} type bloomStoreEntry struct { start model.Time @@ -272,15 +274,15 @@ func (b bloomStoreEntry) Stop() { } // Compiler check to ensure BloomStore implements the Store interface -var _ StoreWithMetrics = &BloomStore{} +var _ Store = &BloomStore{} type BloomStore struct { - stores []*bloomStoreEntry - storageConfig storage.Config - metrics *storeMetrics - bloomMetrics *v1.Metrics - + stores []*bloomStoreEntry + storageConfig storage.Config + metrics *storeMetrics + bloomMetrics *v1.Metrics logger log.Logger + allocator mempool.Allocator defaultKeyResolver // TODO(owen-d): impl schema aware resolvers } @@ -290,6 +292,7 @@ func NewBloomStore( clientMetrics storage.ClientMetrics, metasCache cache.Cache, blocksCache Cache, + allocator mempool.Allocator, reg prometheus.Registerer, logger log.Logger, ) (*BloomStore, error) { @@ -297,6 +300,7 @@ func NewBloomStore( storageConfig: storageConfig, metrics: newStoreMetrics(reg, constants.Loki, "bloom_store"), bloomMetrics: v1.NewMetrics(reg), + allocator: allocator, logger: logger, } @@ -404,7 +408,7 @@ func (b *BloomStore) TenantFilesForInterval( ) (map[string][]client.StorageObject, error) { var allTenants map[string][]client.StorageObject - err := b.forStores(ctx, interval, func(innerCtx context.Context, interval Interval, store Store) error { + err := b.forStores(ctx, interval, func(innerCtx context.Context, interval Interval, store StoreBase) error { tenants, err := store.TenantFilesForInterval(innerCtx, interval, filter) if err != nil { return err @@ -441,12 +445,17 @@ func (b *BloomStore) Client(ts model.Time) (Client, error) { return nil, errNoStore } +// Allocator implements Store. +func (b *BloomStore) Allocator() mempool.Allocator { + return b.allocator +} + // ResolveMetas implements Store. func (b *BloomStore) ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error) { refs := make([][]MetaRef, 0, len(b.stores)) fetchers := make([]*Fetcher, 0, len(b.stores)) - err := b.forStores(ctx, params.Interval, func(innerCtx context.Context, interval Interval, store Store) error { + err := b.forStores(ctx, params.Interval, func(innerCtx context.Context, interval Interval, store StoreBase) error { newParams := params newParams.Interval = interval metas, fetcher, err := store.ResolveMetas(innerCtx, newParams) @@ -580,7 +589,7 @@ func (b *BloomStore) storeDo(ts model.Time, f func(s *bloomStoreEntry) error) er return fmt.Errorf("no store found for timestamp %s", ts.Time()) } -func (b *BloomStore) forStores(ctx context.Context, interval Interval, f func(innerCtx context.Context, interval Interval, store Store) error) error { +func (b *BloomStore) forStores(ctx context.Context, interval Interval, f func(innerCtx context.Context, interval Interval, store StoreBase) error) error { if len(b.stores) == 0 { return nil } diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 093858444891..15568e8763bd 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -23,6 +23,7 @@ import ( storageconfig "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/v3/pkg/storage/types" + "github.com/grafana/loki/v3/pkg/util/mempool" ) func newMockBloomStore(t *testing.T) (*BloomStore, string, error) { @@ -77,7 +78,7 @@ func newMockBloomStoreWithWorkDir(t *testing.T, workDir, storeDir string) (*Bloo metasCache := cache.NewMockCache() blocksCache := NewFsBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger) - store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, reg, logger) + store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger) if err == nil { t.Cleanup(store.Stop) } diff --git a/pkg/util/flagext/csv.go b/pkg/util/flagext/csv.go new file mode 100644 index 000000000000..6ed5f9bad11a --- /dev/null +++ b/pkg/util/flagext/csv.go @@ -0,0 +1,62 @@ +package flagext + +import ( + "strings" +) + +type ListValue interface { + String() string + Parse(s string) (any, error) +} + +// StringSliceCSV is a slice of strings that is parsed from a comma-separated string +// It implements flag.Value and yaml Marshalers +type CSV[T ListValue] []T + +// String implements flag.Value +func (v CSV[T]) String() string { + s := make([]string, 0, len(v)) + for i := range v { + s = append(s, v[i].String()) + } + return strings.Join(s, ",") +} + +// Set implements flag.Value +func (v *CSV[T]) Set(s string) error { + if len(s) == 0 { + *v = nil + return nil + } + var zero T + values := strings.Split(s, ",") + *v = make(CSV[T], 0, len(values)) + for _, val := range values { + el, err := zero.Parse(val) + if err != nil { + return err + } + *v = append(*v, el.(T)) + } + return nil +} + +// String implements flag.Getter +func (v CSV[T]) Get() []T { + return v +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (v *CSV[T]) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + return v.Set(s) +} + +// MarshalYAML implements yaml.Marshaler. +func (v CSV[T]) MarshalYAML() (interface{}, error) { + return v.String(), nil +} diff --git a/pkg/util/flagext/csv_test.go b/pkg/util/flagext/csv_test.go new file mode 100644 index 000000000000..aca4ea8a77ee --- /dev/null +++ b/pkg/util/flagext/csv_test.go @@ -0,0 +1,79 @@ +package flagext + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +type customType int + +// Parse implements ListValue. +func (l customType) Parse(s string) (any, error) { + v, err := strconv.Atoi(s) + if err != nil { + return customType(0), err + } + return customType(v), nil +} + +// String implements ListValue. +func (l customType) String() string { + return strconv.Itoa(int(l)) +} + +var _ ListValue = customType(0) + +func Test_CSV(t *testing.T) { + for _, tc := range []struct { + in string + err bool + out []customType + }{ + { + in: "", + err: false, + out: nil, + }, + { + in: ",", + err: true, + out: []customType{}, + }, + { + in: "1", + err: false, + out: []customType{1}, + }, + { + in: "1,2", + err: false, + out: []customType{1, 2}, + }, + { + in: "1,", + err: true, + out: []customType{}, + }, + { + in: ",1", + err: true, + out: []customType{}, + }, + } { + t.Run(tc.in, func(t *testing.T) { + var v CSV[customType] + + err := v.Set(tc.in) + if tc.err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + require.Equal(t, tc.out, v.Get()) + } + + }) + } + +} diff --git a/pkg/util/mempool/allocator.go b/pkg/util/mempool/allocator.go new file mode 100644 index 000000000000..a27429b80692 --- /dev/null +++ b/pkg/util/mempool/allocator.go @@ -0,0 +1,49 @@ +package mempool + +import ( + "github.com/prometheus/prometheus/util/pool" +) + +// Allocator handles byte slices for bloom queriers. +// It exists to reduce the cost of allocations and allows to re-use already allocated memory. +type Allocator interface { + Get(size int) ([]byte, error) + Put([]byte) bool +} + +// SimpleHeapAllocator allocates a new byte slice every time and does not re-cycle buffers. +type SimpleHeapAllocator struct{} + +func (a *SimpleHeapAllocator) Get(size int) ([]byte, error) { + return make([]byte, size), nil +} + +func (a *SimpleHeapAllocator) Put([]byte) bool { + return true +} + +// BytePool uses a sync.Pool to re-cycle already allocated buffers. +type BytePool struct { + pool *pool.Pool +} + +func NewBytePoolAllocator(minSize, maxSize int, factor float64) *BytePool { + return &BytePool{ + pool: pool.New( + minSize, maxSize, factor, + func(size int) interface{} { + return make([]byte, size) + }), + } +} + +// Get implements Allocator +func (p *BytePool) Get(size int) ([]byte, error) { + return p.pool.Get(size).([]byte)[:size], nil +} + +// Put implements Allocator +func (p *BytePool) Put(b []byte) bool { + p.pool.Put(b) + return true +} diff --git a/pkg/util/mempool/bucket.go b/pkg/util/mempool/bucket.go new file mode 100644 index 000000000000..2a56608230d3 --- /dev/null +++ b/pkg/util/mempool/bucket.go @@ -0,0 +1,51 @@ +package mempool + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/c2h5oh/datasize" +) + +type Bucket struct { + Size int // Number of buffers + Capacity uint64 // Size of a buffer +} + +func (b Bucket) Parse(s string) (any, error) { + parts := strings.Split(s, "x") + if len(parts) != 2 { + return nil, errors.New("bucket must be in format {count}x{bytes}") + } + + size, err := strconv.Atoi(parts[0]) + if err != nil { + return nil, err + } + + capacity, err := datasize.ParseString(parts[1]) + if err != nil { + panic(err.Error()) + } + + return Bucket{ + Size: size, + Capacity: uint64(capacity), + }, nil +} + +func (b Bucket) String() string { + return fmt.Sprintf("%dx%s", b.Size, datasize.ByteSize(b.Capacity).String()) +} + +type Buckets []Bucket + +func (b Buckets) String() string { + s := make([]string, 0, len(b)) + for i := range b { + s = append(s, b[i].String()) + } + return strings.Join(s, ",") +} diff --git a/pkg/util/mempool/metrics.go b/pkg/util/mempool/metrics.go new file mode 100644 index 000000000000..f7d5a52eb0d9 --- /dev/null +++ b/pkg/util/mempool/metrics.go @@ -0,0 +1,32 @@ +package mempool + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/util/constants" +) + +type metrics struct { + availableBuffersPerSlab *prometheus.CounterVec + errorsCounter *prometheus.CounterVec +} + +func newMetrics(r prometheus.Registerer, name string) *metrics { + return &metrics{ + availableBuffersPerSlab: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "mempool", + Name: "available_buffers_per_slab", + Help: "The amount of available buffers per slab.", + ConstLabels: prometheus.Labels{"pool": name}, + }, []string{"slab"}), + errorsCounter: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "mempool", + Name: "errors_total", + Help: "The total amount of errors returned from the pool.", + ConstLabels: prometheus.Labels{"pool": name}, + }, []string{"slab", "reason"}), + } +} diff --git a/pkg/util/mempool/pool.go b/pkg/util/mempool/pool.go new file mode 100644 index 000000000000..b42d8d923767 --- /dev/null +++ b/pkg/util/mempool/pool.go @@ -0,0 +1,135 @@ +package mempool + +import ( + "errors" + "fmt" + "sync" + "unsafe" + + "github.com/dustin/go-humanize" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + errSlabExhausted = errors.New("slab exhausted") + + reasonSizeExceeded = "size-exceeded" + reasonSlabExhausted = "slab-exhausted" +) + +type slab struct { + buffer chan unsafe.Pointer + size, count int + mtx sync.Mutex + metrics *metrics + name string +} + +func newSlab(bufferSize, bufferCount int, m *metrics) *slab { + name := humanize.Bytes(uint64(bufferSize)) + m.availableBuffersPerSlab.WithLabelValues(name).Add(0) // initialize metric with value 0 + + return &slab{ + size: bufferSize, + count: bufferCount, + metrics: m, + name: name, + } +} + +func (s *slab) init() { + s.buffer = make(chan unsafe.Pointer, s.count) + for i := 0; i < s.count; i++ { + buf := make([]byte, 0, s.size) + ptr := unsafe.Pointer(unsafe.SliceData(buf)) + s.buffer <- ptr + } + s.metrics.availableBuffersPerSlab.WithLabelValues(s.name).Add(float64(s.count)) +} + +func (s *slab) get(size int) ([]byte, error) { + s.mtx.Lock() + if s.buffer == nil { + s.init() + } + defer s.mtx.Unlock() + + // wait for available buffer on channel + var buf []byte + select { + case ptr := <-s.buffer: + buf = unsafe.Slice((*byte)(ptr), s.size) + default: + s.metrics.errorsCounter.WithLabelValues(s.name, reasonSlabExhausted).Inc() + return nil, errSlabExhausted + } + + // Taken from https://github.com/ortuman/nuke/blob/main/monotonic_arena.go#L37-L48 + // This piece of code will be translated into a runtime.memclrNoHeapPointers + // invocation by the compiler, which is an assembler optimized implementation. + // Architecture specific code can be found at src/runtime/memclr_$GOARCH.s + // in Go source (since https://codereview.appspot.com/137880043). + for i := range buf { + buf[i] = 0 + } + + return buf[:size], nil +} + +func (s *slab) put(buf []byte) { + if s.buffer == nil { + panic("slab is not initialized") + } + + ptr := unsafe.Pointer(unsafe.SliceData(buf)) + s.buffer <- ptr +} + +// MemPool is an Allocator implementation that uses a fixed size memory pool +// that is split into multiple slabs of different buffer sizes. +// Buffers are re-cycled and need to be returned back to the pool, otherwise +// the pool runs out of available buffers. +type MemPool struct { + slabs []*slab + metrics *metrics +} + +func New(name string, buckets []Bucket, r prometheus.Registerer) *MemPool { + a := &MemPool{ + slabs: make([]*slab, 0, len(buckets)), + metrics: newMetrics(r, name), + } + for _, b := range buckets { + a.slabs = append(a.slabs, newSlab(int(b.Capacity), b.Size, a.metrics)) + } + return a +} + +// Get satisfies Allocator interface +// Allocating a buffer from an exhausted pool/slab, or allocating a buffer that +// exceeds the largest slab size will return an error. +func (a *MemPool) Get(size int) ([]byte, error) { + for i := 0; i < len(a.slabs); i++ { + if a.slabs[i].size < size { + continue + } + return a.slabs[i].get(size) + } + a.metrics.errorsCounter.WithLabelValues("pool", reasonSizeExceeded).Inc() + return nil, fmt.Errorf("no slab found for size: %d", size) +} + +// Put satisfies Allocator interface +// Every buffer allocated with Get(size int) needs to be returned to the pool +// using Put(buffer []byte) so it can be re-cycled. +func (a *MemPool) Put(buffer []byte) bool { + size := cap(buffer) + for i := 0; i < len(a.slabs); i++ { + if a.slabs[i].size < size { + continue + } + a.slabs[i].put(buffer) + return true + } + return false +} diff --git a/pkg/util/mempool/pool_test.go b/pkg/util/mempool/pool_test.go new file mode 100644 index 000000000000..da0fc361dd4a --- /dev/null +++ b/pkg/util/mempool/pool_test.go @@ -0,0 +1,133 @@ +package mempool + +import ( + "math/rand" + "sync" + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/require" +) + +func TestMemPool(t *testing.T) { + + t.Run("empty pool", func(t *testing.T) { + pool := New("test", []Bucket{}, nil) + _, err := pool.Get(256) + require.Error(t, err) + }) + + t.Run("requested size too big", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 128}, + }, nil) + _, err := pool.Get(256) + require.Error(t, err) + }) + + t.Run("requested size within bucket", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 128}, + {Size: 1, Capacity: 256}, + {Size: 1, Capacity: 512}, + }, nil) + res, err := pool.Get(200) + require.NoError(t, err) + require.Equal(t, 200, len(res)) + require.Equal(t, 256, cap(res)) + + res, err = pool.Get(300) + require.NoError(t, err) + require.Equal(t, 300, len(res)) + require.Equal(t, 512, cap(res)) + }) + + t.Run("buffer is cleared when returned", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 64}, + }, nil) + res, err := pool.Get(8) + require.NoError(t, err) + require.Equal(t, 8, len(res)) + source := []byte{0, 1, 2, 3, 4, 5, 6, 7} + copy(res, source) + + pool.Put(res) + + res, err = pool.Get(8) + require.NoError(t, err) + require.Equal(t, 8, len(res)) + require.Equal(t, make([]byte, 8), res) + }) + + t.Run("pool returns error when no buffer is available", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 64}, + }, nil) + buf1, _ := pool.Get(32) + require.Equal(t, 32, len(buf1)) + + _, err := pool.Get(16) + require.ErrorContains(t, err, errSlabExhausted.Error()) + }) + + t.Run("test ring buffer returns same backing array", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 2, Capacity: 128}, + }, nil) + res1, _ := pool.Get(32) + ptr1 := unsafe.Pointer(unsafe.SliceData(res1)) + + res2, _ := pool.Get(64) + ptr2 := unsafe.Pointer(unsafe.SliceData(res2)) + + pool.Put(res2) + pool.Put(res1) + + res3, _ := pool.Get(48) + ptr3 := unsafe.Pointer(unsafe.SliceData(res3)) + + res4, _ := pool.Get(96) + ptr4 := unsafe.Pointer(unsafe.SliceData(res4)) + + require.Equal(t, ptr1, ptr4) + require.Equal(t, ptr2, ptr3) + }) + + t.Run("concurrent access", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 32, Capacity: 2 << 10}, + {Size: 16, Capacity: 4 << 10}, + {Size: 8, Capacity: 8 << 10}, + {Size: 4, Capacity: 16 << 10}, + {Size: 2, Capacity: 32 << 10}, + }, nil) + + var wg sync.WaitGroup + numWorkers := 256 + n := 10 + + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < n; i++ { + s := 2 << rand.Intn(5) + buf1, err1 := pool.Get(s) + buf2, err2 := pool.Get(s) + if err2 == nil { + pool.Put(buf2) + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) + if err1 == nil { + pool.Put(buf1) + } + } + }() + } + + wg.Wait() + t.Log("finished") + }) +} diff --git a/tools/bloom/inspector/main.go b/tools/bloom/inspector/main.go index dfcc7c79cd86..8f60422cd648 100644 --- a/tools/bloom/inspector/main.go +++ b/tools/bloom/inspector/main.go @@ -5,6 +5,7 @@ import ( "os" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/util/mempool" ) func main() { @@ -18,7 +19,7 @@ func main() { r := v1.NewDirectoryBlockReader(path) b := v1.NewBlock(r, v1.NewMetrics(nil)) - q := v1.NewBlockQuerier(b, true, v1.DefaultMaxPageSize) + q := v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) md, err := q.Metadata() if err != nil {