Skip to content

Commit

Permalink
Remove package global variable for bloom page allocator
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jun 10, 2024
1 parent 3692235 commit 97ee3ff
Show file tree
Hide file tree
Showing 23 changed files with 164 additions and 119 deletions.
5 changes: 3 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var testDay = parseDayTime("2023-09-01")
Expand Down Expand Up @@ -410,7 +411,7 @@ func createPlanner(
reg := prometheus.NewPedanticRegistry()
metasCache := cache.NewNoopCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)

planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -822,7 +823,7 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B
metasCache := cache.NewMockCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger)

store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
if err == nil {
t.Cleanup(store.Stop)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -92,7 +93,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {

reg := prometheus.NewRegistry()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, nil, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)
t.Cleanup(store.Stop)

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
// after iteration for performance (alloc reduction).
// This is safe to do here because we do not capture
// the underlying bloom []byte outside of iteration
bloomshipper.WithPool(true),
bloomshipper.WithPool(p.store.Allocator()),
)
duration = time.Since(startBlocks)
level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var _ bloomshipper.Store = &dummyStore{}
Expand Down Expand Up @@ -73,6 +74,10 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
return nil, nil
}

func (s *dummyStore) Allocator() mempool.Allocator {
return nil
}

func (s *dummyStore) Stop() {
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func parseDayTime(s string) config.DayTime {
Expand Down Expand Up @@ -433,7 +434,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
Expand Down
37 changes: 22 additions & 15 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (

"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
Expand Down Expand Up @@ -732,19 +731,6 @@ func (t *Loki) initBloomStore() (services.Service, error) {
reg := prometheus.DefaultRegisterer
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{}
case "dynamic":
bloomshipper.BloomPageAllocator = v1.BloomPagePool
case "fixed":
bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// do nothing
bloomshipper.BloomPageAllocator = nil
}

var metasCache cache.Cache
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)
Expand All @@ -768,7 +754,28 @@ func (t *Loki) initBloomStore() (services.Service, error) {
level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err)
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger)
var pageAllocator mempool.Allocator

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
pageAllocator = &mempool.SimpleHeapAllocator{}
case "dynamic":
// sync buffer pool for bloom pages
// 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB
pageAllocator = mempool.NewBytePoolAllocator(
128<<10, 128<<20, 2,
func(size int) interface{} {
return make([]byte, size)
})
case "fixed":
pageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// should not happen as the type is validated upfront
return nil, fmt.Errorf("failed to create bloom store: invalid allocator type")
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, pageAllocator, reg, logger)
if err != nil {
return nil, fmt.Errorf("failed to create bloom store: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"

"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/util/mempool"
)

type BlockMetadata struct {
Expand Down Expand Up @@ -117,7 +119,7 @@ type BlockQuerier struct {
// blooms for querying (bloom-gateway), but not for writing (bloom-compactor).
// Therefore, when calling NewBlockQuerier on the write path, you should always
// pass the SimpleHeapAllocator implementation of the Allocator interface.
func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier {
func NewBlockQuerier(b *Block, alloc mempool.Allocator, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
LazySeriesIter: NewLazySeriesIter(b),
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

// NB(chaudum): Some block pages are way bigger than others (400MiB and
Expand Down Expand Up @@ -63,7 +64,7 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}

func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPage(r io.Reader, alloc mempool.Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
Expand Down Expand Up @@ -101,7 +102,7 @@ func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool,
}

// shortcut to skip allocations when we know the page is not compressed
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc mempool.Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
// data + checksum
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
Expand Down Expand Up @@ -167,7 +168,7 @@ type BloomPageDecoder struct {
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder:
// on reads in the bloom-gw but not in the bloom-compactor
func (d *BloomPageDecoder) Relinquish(alloc Allocator) {
func (d *BloomPageDecoder) Relinquish(alloc mempool.Allocator) {
if d == nil {
return
}
Expand Down Expand Up @@ -284,7 +285,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc mempool.Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand Down
10 changes: 7 additions & 3 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package v1

import "github.com/pkg/errors"
import (
"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/util/mempool"
)

type BloomQuerier interface {
Seek(BloomOffset) (*Bloom, error)
Expand All @@ -10,7 +14,7 @@ type LazyBloomIter struct {
b *Block
m int // max page size in bytes

alloc Allocator
alloc mempool.Allocator

// state
initialized bool
Expand All @@ -24,7 +28,7 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter {
func NewLazyBloomIter(b *Block, alloc mempool.Allocator, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
b: b,
m: maxSize,
Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var blockEncodings = []chunkenc.Encoding{
Expand Down Expand Up @@ -121,7 +122,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -239,7 +240,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBlooms](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()))
blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)

EqualIterators[*SeriesWithBlooms](
t,
Expand Down Expand Up @@ -372,7 +373,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)

require.True(t, querier.Next())
require.Equal(t,
Expand Down Expand Up @@ -417,7 +418,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -482,7 +483,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -552,7 +553,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize)
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBlooms](PointerSlice[SeriesWithBlooms](xs))

EqualIterators[*SeriesWithBlooms](
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ import (

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var BloomPagePool = mempool.New("test", []mempool.Bucket{
{Size: 16, Capacity: 128 << 10},
{Size: 16, Capacity: 256 << 10},
{Size: 16, Capacity: 512 << 10},
}, nil)

// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't
// have to refactor tests here in order to fix this elsewhere, but it can/should be fixed --
// the skip & n len are hardcoded based on data that's passed to it elsewhere.
Expand Down
Loading

0 comments on commit 97ee3ff

Please sign in to comment.