Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: New bloom planning using chunk size TSDB stats #14547

Merged
merged 14 commits into from
Oct 23, 2024
6 changes: 5 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3765,7 +3765,7 @@ shard_streams:
[bloom_creation_enabled: <boolean> | default = false]

# Experimental. Bloom planning strategy to use in bloom creation. Can be one of:
# 'split_keyspace_by_factor'
# 'split_keyspace_by_factor', 'split_by_series_chunks_size'
# CLI flag: -bloom-build.planning-strategy
[bloom_planning_strategy: <string> | default = "split_keyspace_by_factor"]

Expand All @@ -3775,6 +3775,10 @@ shard_streams:
# CLI flag: -bloom-build.split-keyspace-by
[bloom_split_series_keyspace_by: <int> | default = 256]

# Experimental. Target chunk size in bytes for bloom tasks. Default is 20GB.
# CLI flag: -bloom-build.split-target-series-chunk-size
[bloom_task_target_series_chunk_size: <int> | default = 20GB]

# Experimental. Compression algorithm for bloom block pages.
# CLI flag: -bloom-build.block-encoding
[bloom_block_encoding: <string> | default = "none"]
Expand Down
2 changes: 2 additions & 0 deletions integration/bloom_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func TestBloomBuilding(t *testing.T) {
"-bloom-build.planner.interval=15s",
"-bloom-build.planner.min-table-offset=0", // Disable table offset so we process today's data.
"-bloom.cache-list-ops=0", // Disable cache list operations to avoid caching issues.
"-bloom-build.planning-strategy=split_by_series_chunks_size",
"-bloom-build.split-target-series-chunk-size=1KB",
)
require.NoError(t, clu.Run())

Expand Down
7 changes: 3 additions & 4 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,6 @@ func (p *Planner) runOne(ctx context.Context) error {
continue
}

level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas))

var tenantTableEnqueuedTasks int
resultsCh := make(chan *protos.TaskResult, len(tasks))

Expand Down Expand Up @@ -377,13 +375,13 @@ func (p *Planner) computeTasks(
table config.DayTable,
tenant string,
) ([]*protos.Task, []bloomshipper.Meta, error) {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)

strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
}

logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "strategy", strategy.Name())

// Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms
metas, err := p.bloomStore.FetchMetas(
ctx,
Expand Down Expand Up @@ -432,6 +430,7 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}

level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(metas))
return tasks, metas, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -750,3 +751,11 @@ func (f *fakeLimits) BloomBuildMaxBuilders(_ string) int {
func (f *fakeLimits) BloomTaskMaxRetries(_ string) int {
return f.maxRetries
}

func (f *fakeLimits) BloomPlanningStrategy(_ string) string {
return strategies.SplitBySeriesChunkSizeStrategyName
}

func (f *fakeLimits) BloomTaskTargetSeriesChunksSizeBytes(_ string) uint64 {
return 1 << 20 // 1MB
}
8 changes: 6 additions & 2 deletions pkg/bloombuild/planner/plannertest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
}

func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1))
for i := bounds.Min; i <= bounds.Max; i++ {
return GenSeriesWithStep(bounds, 1)
}

func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, &v1.Series{
Fingerprint: i,
Chunks: v1.ChunkRefs{
Expand Down
286 changes: 286 additions & 0 deletions pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package strategies

import (
"context"
"fmt"
"math"
"sort"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

type ChunkSizeStrategyLimits interface {
BloomTaskTargetSeriesChunksSizeBytes(tenantID string) uint64
}

type ChunkSizeStrategy struct {
limits ChunkSizeStrategyLimits
logger log.Logger
}

func NewChunkSizeStrategy(
limits ChunkSizeStrategyLimits,
logger log.Logger,
) (*ChunkSizeStrategy, error) {
return &ChunkSizeStrategy{
limits: limits,
logger: logger,
}, nil
}

func (s *ChunkSizeStrategy) Name() string {
return SplitBySeriesChunkSizeStrategyName
}

func (s *ChunkSizeStrategy) Plan(
ctx context.Context,
table config.DayTable,
tenant string,
tsdbs TSDBSet,
metas []bloomshipper.Meta,
) ([]*protos.Task, error) {
targetTaskSize := s.limits.BloomTaskTargetSeriesChunksSizeBytes(tenant)

logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant)
level.Debug(s.logger).Log("msg", "loading work for tenant", "target task size", humanize.Bytes(targetTaskSize))

// Determine which TSDBs have gaps and need to be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(v1.NewBounds(0, math.MaxUint64), tsdbs, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to find gaps", "err", err)
return nil, fmt.Errorf("failed to find gaps: %w", err)
}

if len(tsdbsWithGaps) == 0 {
level.Debug(logger).Log("msg", "blooms exist for all tsdbs")
return nil, nil
}

sizedIter, iterSize, err := s.sizedSeriesIter(ctx, tenant, tsdbsWithGaps, targetTaskSize)
if err != nil {
return nil, fmt.Errorf("failed to get sized series iter: %w", err)
}

tasks := make([]*protos.Task, 0, iterSize)
for sizedIter.Next() {
series := sizedIter.At()
if series.Len() == 0 {
// This should never happen, but just in case.
level.Warn(logger).Log("msg", "got empty series batch", "tsdb", series.TSDB().Name())
continue
}

bounds := series.Bounds()

blocks, err := getBlocksMatchingBounds(metas, bounds)
if err != nil {
return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err)
}

planGap := protos.Gap{
Bounds: bounds,
Series: series.V1Series(),
Blocks: blocks,
}

tasks = append(tasks, protos.NewTask(table, tenant, bounds, series.TSDB(), []protos.Gap{planGap}))
}
if err := sizedIter.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over sized series: %w", err)
}

return tasks, nil
}

func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBounds) ([]bloomshipper.BlockRef, error) {
blocks := make([]bloomshipper.BlockRef, 0, 10)

for _, meta := range metas {
if meta.Bounds.Intersection(bounds) == nil {
// this meta doesn't overlap the gap, skip
continue
}

for _, block := range meta.Blocks {
if block.Bounds.Intersection(bounds) == nil {
// this block doesn't overlap the gap, skip
continue
}
// this block overlaps the gap, add it to the plan
// for this gap
blocks = append(blocks, block)
}
}

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Bounds.Less(blocks[j].Bounds)
})

peekingBlocks := iter.NewPeekIter(
iter.NewSliceIter(
blocks,
),
)

// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter(
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
iter.Identity[bloomshipper.BlockRef],
func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef {
return a
},
peekingBlocks,
)

deduped, err := iter.Collect(itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}

return deduped, nil
}

type seriesWithChunks struct {
tsdb tsdb.SingleTenantTSDBIdentifier
fp model.Fingerprint
chunks []index.ChunkMeta
}

type seriesBatch struct {
series []seriesWithChunks
size uint64
}

func newSeriesBatch() seriesBatch {
return seriesBatch{
series: make([]seriesWithChunks, 0, 100),
}
}

func (b *seriesBatch) Bounds() v1.FingerprintBounds {
if len(b.series) == 0 {
return v1.NewBounds(0, 0)
}

// We assume that the series are sorted by fingerprint.
// This is guaranteed since series are iterated in order by the TSDB.
return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp)
}

func (b *seriesBatch) V1Series() []*v1.Series {
series := make([]*v1.Series, 0, len(b.series))
for _, s := range b.series {
res := &v1.Series{
Fingerprint: s.fp,
Chunks: make(v1.ChunkRefs, 0, len(s.chunks)),
}
for _, chk := range s.chunks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
}

return series
}

func (b *seriesBatch) Append(s seriesWithChunks, size uint64) {
b.series = append(b.series, s)
b.size += size
}

func (b *seriesBatch) Len() int {
return len(b.series)
}

func (b *seriesBatch) Size() uint64 {
return b.size
}

func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier {
if len(b.series) == 0 {
return tsdb.SingleTenantTSDBIdentifier{}
}
return b.series[0].tsdb
}

func (s *ChunkSizeStrategy) sizedSeriesIter(
ctx context.Context,
tenant string,
tsdbsWithGaps []tsdbGaps,
targetTaskSizeBytes uint64,
) (iter.Iterator[seriesBatch], int, error) {
batches := make([]seriesBatch, 0, 100)
currentBatch := newSeriesBatch()

for _, idx := range tsdbsWithGaps {
for _, gap := range idx.gaps {
if err := idx.tsdb.ForSeries(
ctx,
tenant,
gap,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return true
default:
var seriesSize uint64
for _, chk := range chks {
seriesSize += uint64(chk.KB * 1024)
}

// Cut a new batch IF the current batch is not empty (so we add at least one series to the batch)
// AND Adding this series to the batch would exceed the target task size.
if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
}

currentBatch.Append(seriesWithChunks{
tsdb: idx.tsdbIdentifier,
fp: fp,
chunks: chks,
}, seriesSize)
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
); err != nil {
return nil, 0, err
}

// Add the last batch for this TSDB if it's not empty.
if currentBatch.Len() > 0 {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
}
}
}

select {
case <-ctx.Done():
return iter.NewEmptyIter[seriesBatch](), 0, ctx.Err()
default:
return iter.NewCancelableIter[seriesBatch](ctx, iter.NewSliceIter[seriesBatch](batches)), len(batches), nil
}
}
Loading
Loading