Skip to content

Commit

Permalink
added chunks batches iterator to download chunks in batches instead o…
Browse files Browse the repository at this point in the history
…f downloading all of them at once

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
  • Loading branch information
vlad-diachenko committed Jan 11, 2024
1 parent 5559b26 commit 4dcf3c8
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 99 deletions.
4 changes: 4 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3088,6 +3088,10 @@ shard_streams:
# CLI flag: -bloom-compactor.enable-compaction
[bloom_compactor_enable_compaction: <boolean> | default = false]

# The batch size of the chunks the bloom-compactor downloads at once.
# CLI flag: -bloom-compactor.chunks-batch-size
[bloom_compactor_chunks_batch_size: <int> | default = 100]

# Length of the n-grams created when computing blooms from log lines.
# CLI flag: -bloom-compactor.ngram-length
[bloom_ngram_length: <int> | default = 4]
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}

fpRate := c.limits.BloomFalsePositiveRate(job.tenantID)
resultingBlock, err = compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder)
resultingBlock, err = compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder, c.limits)
if err != nil {
return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err)
}
Expand All @@ -537,7 +537,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
// When already compacted metas exists, we need to merge all blocks with amending blooms with new series
level.Info(logger).Log("msg", "already compacted metas exists, use mergeBlockBuilder")

var populate = createPopulateFunc(ctx, logger, job, storeClient, bt)
var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits)

seriesIter := makeSeriesIterFromSeriesMeta(job)

Expand Down
52 changes: 26 additions & 26 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

type compactorTokenizer interface {
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunks []chunk.Chunk) error
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunkBatchesIterator v1.Iterator[[]chunk.Chunk]) error
}

type chunkClient interface {
Expand Down Expand Up @@ -86,7 +86,7 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing
return chunkRefs
}

func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks []chunk.Chunk) (v1.SeriesWithBloom, error) {
func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) {
// Create a bloom for this series
bloomForChks := v1.SeriesWithBloom{
Series: &v1.Series{
Expand Down Expand Up @@ -155,21 +155,21 @@ func createLocalDirName(workingDir string, job Job) string {
}

// Compacts given list of chunks, uploads them to storage and returns a list of bloomBlocks
func compactNewChunks(
ctx context.Context,
func compactNewChunks(ctx context.Context,
logger log.Logger,
job Job,
fpRate float64,
bt compactorTokenizer,
storeClient chunkClient,
builder blockBuilder,
limits Limits,
) (bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
}

bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, fpRate, logger)
bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, fpRate, logger, limits)

// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
Expand All @@ -182,13 +182,14 @@ func compactNewChunks(
}

type lazyBloomBuilder struct {
ctx context.Context
metas v1.Iterator[seriesMeta]
tenant string
client chunkClient
bt compactorTokenizer
fpRate float64
logger log.Logger
ctx context.Context
metas v1.Iterator[seriesMeta]
tenant string
client chunkClient
bt compactorTokenizer
fpRate float64
logger log.Logger
chunksBatchSize uint

cur v1.SeriesWithBloom // retured by At()
err error // returned by Err()
Expand All @@ -198,15 +199,16 @@ type lazyBloomBuilder struct {
// which are used by the blockBuilder to write a bloom block.
// We use an interator to avoid loading all blooms into memory first, before
// building the block.
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, fpRate float64, logger log.Logger) *lazyBloomBuilder {
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, fpRate float64, logger log.Logger, limits Limits) *lazyBloomBuilder {
return &lazyBloomBuilder{
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: fpRate,
logger: logger,
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: fpRate,
logger: logger,
chunksBatchSize: limits.BloomCompactorChunksBatchSize(job.tenantID),
}
}

Expand All @@ -218,20 +220,18 @@ func (it *lazyBloomBuilder) Next() bool {
}
meta := it.metas.At()

// Get chunks data from list of chunkRefs
chks, err := it.client.GetChunks(it.ctx, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP))
batchesIterator, err := newChunkBatchesIterator(it.ctx, it.client, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP), it.chunksBatchSize)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in getChunks", err)
level.Debug(it.logger).Log("msg", "err creating chunks batches iterator", "err", err)
return false
}

it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, chks)
it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, batchesIterator)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in buildBloomFromSeries", err)
level.Debug(it.logger).Log("msg", "err in buildBloomFromSeries", "err", err)
return false
}
return true
Expand Down
12 changes: 7 additions & 5 deletions pkg/bloomcompactor/chunkcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestChunkCompactor_BuildBloomFromSeries(t *testing.T) {
chunks := []chunk.Chunk{createTestChunk(fp, label)}

mbt := mockBloomTokenizer{}
bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, chunks)
bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, v1.NewSliceIter([][]chunk.Chunk{chunks}))
require.NoError(t, err)
require.Equal(t, seriesMeta.seriesFP, bloom.Series.Fingerprint)
require.Equal(t, chunks, mbt.chunks)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) {
pbb := mockPersistentBlockBuilder{}

// Run Compaction
compactedBlock, err := compactNewChunks(context.Background(), logger, job, fpRate, &mbt, &mcc, &pbb)
compactedBlock, err := compactNewChunks(context.Background(), logger, job, fpRate, &mbt, &mcc, &pbb, mockLimits{})

// Validate Compaction Succeeds
require.NoError(t, err)
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestLazyBloomBuilder(t *testing.T) {
mbt := &mockBloomTokenizer{}
mcc := &mockChunkClient{}

it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, fpRate, logger)
it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, fpRate, logger, mockLimits{chunksDownloadingBatchSize: 10})

// first seriesMeta has 1 chunks
require.True(t, it.Next())
Expand Down Expand Up @@ -199,8 +199,10 @@ type mockBloomTokenizer struct {
chunks []chunk.Chunk
}

func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c []chunk.Chunk) error {
mbt.chunks = append(mbt.chunks, c...)
func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c v1.Iterator[[]chunk.Chunk]) error {
for c.Next() {
mbt.chunks = append(mbt.chunks, c.At()...)
}
return nil
}

Expand Down
48 changes: 48 additions & 0 deletions pkg/bloomcompactor/chunksbatchesiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package bloomcompactor

import (
"context"
"errors"

"github.com/grafana/loki/pkg/storage/chunk"
)

type chunksBatchesIterator struct {
context context.Context
client chunkClient
chunksToDownload []chunk.Chunk
batchSize uint

currentBatch []chunk.Chunk
err error
}

func newChunkBatchesIterator(context context.Context, client chunkClient, chunksToDownload []chunk.Chunk, batchSize uint) (*chunksBatchesIterator, error) {
if batchSize == 0 {
return nil, errors.New("batchSize must be greater than 0")
}
return &chunksBatchesIterator{context: context, client: client, chunksToDownload: chunksToDownload, batchSize: batchSize}, nil
}

func (c *chunksBatchesIterator) Next() bool {
if len(c.chunksToDownload) == 0 {
return false
}
batchSize := c.batchSize
chunksToDownloadCount := uint(len(c.chunksToDownload))
if chunksToDownloadCount < batchSize {
batchSize = chunksToDownloadCount
}
chunksToDownload := c.chunksToDownload[:batchSize]
c.chunksToDownload = c.chunksToDownload[batchSize:]
c.currentBatch, c.err = c.client.GetChunks(c.context, chunksToDownload)
return true
}

func (c *chunksBatchesIterator) Err() error {
return c.err
}

func (c *chunksBatchesIterator) At() []chunk.Chunk {
return c.currentBatch
}
96 changes: 96 additions & 0 deletions pkg/bloomcompactor/chunksbatchesiterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package bloomcompactor

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk"
tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

func Test_chunksBatchesIterator(t *testing.T) {
tests := map[string]struct {
batchSize uint
chunksToDownload []chunk.Chunk
constructorError error

hadNextCount int
}{
"expected error if batch size is set to 0": {
batchSize: 0,
constructorError: errors.New("batchSize must be greater than 0"),
},
"expected no error if there are no chunks": {
hadNextCount: 0,
batchSize: 10,
},
"expected 1 call to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 1,
batchSize: 20,
},
"expected 1 call to the client(2)": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 1,
batchSize: 10,
},
"expected 2 calls to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 2,
batchSize: 6,
},
"expected 10 calls to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 10,
batchSize: 1,
},
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
client := &fakeClient{}
iterator, err := newChunkBatchesIterator(context.Background(), client, data.chunksToDownload, data.batchSize)
if data.constructorError != nil {
require.Equal(t, err, data.constructorError)
return
}
hadNextCount := 0
var downloadedChunks []chunk.Chunk
for iterator.Next() {
require.NoError(t, iterator.Err())
hadNextCount++
downloaded := iterator.At()
downloadedChunks = append(downloadedChunks, downloaded...)
require.LessOrEqual(t, uint(len(downloaded)), data.batchSize)
}
require.Equal(t, data.chunksToDownload, downloadedChunks)
require.Equal(t, data.hadNextCount, client.callsCount)
require.Equal(t, data.hadNextCount, hadNextCount)
})
}
}

func createFakeChunks(count int) []chunk.Chunk {
metas := make([]tsdbindex.ChunkMeta, 0, count)
for i := 0; i < count; i++ {
metas = append(metas, tsdbindex.ChunkMeta{
Checksum: uint32(i),
MinTime: int64(i),
MaxTime: int64(i + 100),
KB: uint32(i * 100),
Entries: uint32(i * 10),
})
}
return makeChunkRefs(metas, "fake", 0xFFFF)
}

type fakeClient struct {
callsCount int
}

func (f *fakeClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
f.callsCount++
return chunks, nil
}
1 change: 1 addition & 0 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorChunksBatchSize(userID string) uint
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloomcompactor/mergecompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomcompactor

import (
"context"
"fmt"

"github.com/grafana/dskit/concurrency"

Expand Down Expand Up @@ -74,7 +75,7 @@ func makeBlockIterFromBlocks(ctx context.Context, logger log.Logger,
return blockIters, blockPaths, nil
}

func createPopulateFunc(ctx context.Context, logger log.Logger, job Job, storeClient storeClient, bt *v1.BloomTokenizer) func(series *v1.Series, bloom *v1.Bloom) error {
func createPopulateFunc(ctx context.Context, job Job, storeClient storeClient, bt *v1.BloomTokenizer, limits Limits) func(series *v1.Series, bloom *v1.Bloom) error {
return func(series *v1.Series, bloom *v1.Bloom) error {
bloomForChks := v1.SeriesWithBloom{
Series: series,
Expand All @@ -95,12 +96,11 @@ func createPopulateFunc(ctx context.Context, logger log.Logger, job Job, storeCl
}
}

chks, err := storeClient.chunk.GetChunks(ctx, chunkRefs)
batchesIterator, err := newChunkBatchesIterator(ctx, storeClient.chunk, chunkRefs, limits.BloomCompactorChunksBatchSize(job.tenantID))
if err != nil {
level.Error(logger).Log("msg", "failed downloading chunks", "err", err)
return err
return fmt.Errorf("error creating chunks batches iterator: %w", err)
}
err = bt.PopulateSeriesWithBloom(&bloomForChks, chks)
err = bt.PopulateSeriesWithBloom(&bloomForChks, batchesIterator)
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/bloomcompactor/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,17 @@ func TestShuffleSharding(t *testing.T) {

type mockLimits struct {
*validation.Overrides
bloomCompactorShardSize int
bloomCompactorShardSize int
chunksDownloadingBatchSize uint
}

func (m mockLimits) BloomCompactorShardSize(_ string) int {
return m.bloomCompactorShardSize
}

func (m mockLimits) BloomCompactorChunksBatchSize(_ string) uint {
if m.chunksDownloadingBatchSize != 0 {
return m.chunksDownloadingBatchSize
}
return 1
}
Loading

0 comments on commit 4dcf3c8

Please sign in to comment.