Skip to content

Commit

Permalink
perf: Introduce fixed size memory pool for bloom querier
Browse files Browse the repository at this point in the history
---

Revert "fix(regression):  reverts #13039 to prevent use-after-free corruptions (#13162)"

This reverts commit 41c5ee2.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jun 19, 2024
1 parent f5a9905 commit ad79a80
Show file tree
Hide file tree
Showing 21 changed files with 691 additions and 64 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// t.Log(i, j, string(keys[i][j]))
// }
// }

blocks = append(blocks, block)
metas = append(metas, meta)
blockRefs = append(blockRefs, blockRef)
Expand Down
15 changes: 15 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
Expand Down Expand Up @@ -79,6 +80,7 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/limiter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/util/querylimits"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
Expand Down Expand Up @@ -730,6 +732,19 @@ func (t *Loki) initBloomStore() (services.Service, error) {
reg := prometheus.DefaultRegisterer
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{}
case "dynamic":
bloomshipper.BloomPageAllocator = v1.BloomPagePool
case "fixed":
bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// do nothing
bloomshipper.BloomPageAllocator = nil
}

var metasCache cache.Cache
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)
Expand Down
21 changes: 13 additions & 8 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,18 @@ type BlockQuerier struct {
}

// NewBlockQuerier returns a new BlockQuerier for the given block.
// WARNING: If noCapture is true, the underlying byte slice of the bloom page
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
// When usePool is true, the bloom MUST NOT be captured by the caller. Rather,
// it should be discarded before another call to Next().
func NewBlockQuerier(b *Block, usePool bool, maxPageSize int) *BlockQuerier {
// WARNING: You can pass an implementation of Allocator that is responsibe for
// whether the underlying byte slice of the bloom page will be returned to the
// pool for efficiency or not. Returning to the pool can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e. when loading
// blooms for querying (bloom-gateway), but not for writing (bloom-compactor).
// Therefore, when calling NewBlockQuerier on the write path, you should always
// pass the SimpleHeapAllocator implementation of the Allocator interface.
func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
LazySeriesIter: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, usePool, maxPageSize),
blooms: NewLazyBloomIter(b, alloc, maxPageSize),
}
}

Expand All @@ -144,6 +145,10 @@ func (bq *BlockQuerier) Err() error {
return bq.blooms.Err()
}

func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}

type BlockQuerierIter struct {
*BlockQuerier
}
Expand Down
43 changes: 28 additions & 15 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Bloom struct {

func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8)))
buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8)))

// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
Expand All @@ -36,7 +36,6 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
data := buf.Bytes()
enc.PutUvarint(len(data)) // length of bloom filter
enc.PutBytes(data)
BloomPagePool.Put(data[:0]) // release to pool
return nil
}

Expand Down Expand Up @@ -64,11 +63,14 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}

func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BloomPagePool.Get(page.Len)[:page.Len]
defer BloomPagePool.Put(data)
func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}
defer alloc.Put(data)

_, err := io.ReadFull(r, data)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand All @@ -84,7 +86,10 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}
defer pool.PutReader(decompressor)

b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen]
b, err := alloc.Get(page.DecompressedLen)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}

if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
Expand All @@ -96,14 +101,18 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}

// shortcut to skip allocations when we know the page is not compressed
func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
// data + checksum
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
}
data := BloomPagePool.Get(page.Len)[:page.Len]

_, err := io.ReadFull(r, data)
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}

_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand Down Expand Up @@ -158,12 +167,16 @@ type BloomPageDecoder struct {
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder:
// on reads in the bloom-gw but not in the bloom-compactor
func (d *BloomPageDecoder) Relinquish() {
func (d *BloomPageDecoder) Relinquish(alloc Allocator) {
if d == nil {
return
}

data := d.data
d.data = nil

if cap(data) > 0 {
BloomPagePool.Put(data)
_ = alloc.Put(data)
}
}

Expand Down Expand Up @@ -271,7 +284,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand All @@ -294,9 +307,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
}

if b.schema.encoding == chunkenc.EncNone {
res, err = LazyDecodeBloomPageNoCompression(r, page)
res, err = LazyDecodeBloomPageNoCompression(r, alloc, page)
} else {
res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page)
res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page)
}

if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ type BloomQuerier interface {
}

type LazyBloomIter struct {
usePool bool

b *Block
m int // max page size in bytes

alloc Allocator

// state
initialized bool
err error
Expand All @@ -24,11 +24,11 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter {
func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
usePool: pool,
b: b,
m: maxSize,
b: b,
m: maxSize,
alloc: alloc,
}
}

Expand All @@ -53,16 +53,14 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) {

// drop the current page if it exists and
// we're using the pool
if it.curPage != nil && it.usePool {
it.curPage.Relinquish()
}
it.curPage.Relinquish(it.alloc)

r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return false
}
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return false
Expand Down Expand Up @@ -106,6 +104,7 @@ func (it *LazyBloomIter) next() bool {
var skip bool
it.curPage, skip, err = it.b.blooms.BloomPageDecoder(
r,
it.alloc,
it.curPageIndex,
it.m,
it.b.metrics,
Expand All @@ -130,11 +129,8 @@ func (it *LazyBloomIter) next() bool {

// we've exhausted the current page, progress to next
it.curPageIndex++
// drop the current page if it exists and
// we're using the pool
if it.usePool {
it.curPage.Relinquish()
}
// drop the current page if it exists
it.curPage.Relinquish(it.alloc)
it.curPage = nil
continue
}
Expand All @@ -161,3 +157,7 @@ func (it *LazyBloomIter) Err() error {
return nil
}
}

func (it *LazyBloomIter) Close() {
it.curPage.Relinquish(it.alloc)
}
14 changes: 7 additions & 7 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize).Iter()
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBlooms](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize).Iter()))
blocks = append(blocks, NewPeekingIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)

EqualIterators[*SeriesWithBlooms](
t,
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)

require.True(t, querier.Next())
require.Equal(t,
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -482,7 +482,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize).Iter()
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -552,7 +552,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBlooms](PointerSlice[SeriesWithBlooms](xs))

EqualIterators[*SeriesWithBlooms](
Expand Down
Loading

0 comments on commit ad79a80

Please sign in to comment.