diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index a7774c34c3ce6..fcf12832dbb47 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3796,9 +3796,14 @@ shard_streams: # CLI flag: -bloom-build.enable [bloom_creation_enabled: | default = false] -# Experimental. Number of splits to create for the series keyspace when building -# blooms. The series keyspace is split into this many parts to parallelize bloom -# creation. +# Experimental. Bloom planning strategy to use in bloom creation. Can be one of: +# 'split' +# CLI flag: -bloom-build.planning-strategy +[bloom_planning_strategy: | default = "split"] + +# Experimental. Only if `bloom-build.planning-strategy` is 'split'. Number of +# splits to create for the series keyspace when building blooms. The series +# keyspace is split into this many parts to parallelize bloom creation. # CLI flag: -bloom-build.split-keyspace-by [bloom_split_series_keyspace_by: | default = 256] diff --git a/pkg/bloombuild/planner/config.go b/pkg/bloombuild/planner/config.go index 40ec5707ef715..cfbccd84322d1 100644 --- a/pkg/bloombuild/planner/config.go +++ b/pkg/bloombuild/planner/config.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "time" + + "github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies" ) // Config configures the bloom-planner component. @@ -44,8 +46,8 @@ func (cfg *Config) Validate() error { type Limits interface { RetentionLimits + strategies.Limits BloomCreationEnabled(tenantID string) bool - BloomSplitSeriesKeyspaceBy(tenantID string) int BloomBuildMaxBuilders(tenantID string) int BuilderResponseTimeout(tenantID string) time.Duration BloomTaskMaxRetries(tenantID string) int diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index f65fdf59c9acb..ab9904369a204 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -17,6 +17,7 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/v3/pkg/bloombuild/common" + "github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies" "github.com/grafana/loki/v3/pkg/bloombuild/protos" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/queue" @@ -221,7 +222,7 @@ func (p *Planner) runOne(ctx context.Context) error { tables := p.tables(time.Now()) level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays()) - work, err := p.loadTenantWork(ctx, tables) + tenantTables, err := p.loadTenantTables(ctx, tables) if err != nil { return fmt.Errorf("error loading work: %w", err) } @@ -232,19 +233,21 @@ func (p *Planner) runOne(ctx context.Context) error { tasksResultForTenantTable := make(map[tenantTable]tenantTableTaskResults) var totalTasks int - for table, tenants := range work { - for tenant, ownershipRanges := range tenants { + for table, tenants := range tenantTables { + for _, tenant := range tenants { logger := log.With(p.logger, "tenant", tenant, "table", table.Addr()) + tt := tenantTable{ tenant: tenant, table: table, } - tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, ownershipRanges) + tasks, existingMetas, err := p.computeTasks(ctx, table, tenant) if err != nil { - level.Error(logger).Log("msg", "error computing tasks", "err", err) + level.Error(logger).Log("msg", "failed to compute tasks", "err", err) continue } + level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas)) var tenantTableEnqueuedTasks int @@ -334,17 +337,20 @@ func (p *Planner) runOne(ctx context.Context) error { return nil } -// computeTasks computes the tasks for a given table and tenant and ownership range. -// It returns the tasks to be executed and the metas that are existing relevant for the ownership range. +// computeTasks computes the tasks for a given table and tenant. +// It returns the tasks to be executed and the existing metas. func (p *Planner) computeTasks( ctx context.Context, table config.DayTable, tenant string, - ownershipRanges []v1.FingerprintBounds, ) ([]*protos.Task, []bloomshipper.Meta, error) { - var tasks []*protos.Task logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant) + strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger) + if err != nil { + return nil, nil, fmt.Errorf("error creating strategy: %w", err) + } + // Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms metas, err := p.bloomStore.FetchMetas( ctx, @@ -388,22 +394,9 @@ func (p *Planner) computeTasks( } }() - for _, ownershipRange := range ownershipRanges { - logger := log.With(logger, "ownership", ownershipRange.String()) - - // Filter only the metas that overlap in the ownership range - metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange) - - // Find gaps in the TSDBs for this tenant/table - gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger) - if err != nil { - level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err) - continue - } - - for _, gap := range gaps { - tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps)) - } + tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas) + if err != nil { + return nil, nil, fmt.Errorf("failed to plan tasks: %w", err) } return tasks, metas, nil @@ -616,15 +609,12 @@ func (p *Planner) tables(ts time.Time) *dayRangeIterator { return newDayRangeIterator(fromDay, throughDay, p.schemaCfg) } -type work map[config.DayTable]map[string][]v1.FingerprintBounds - -// loadTenantWork loads the work for each tenant and table tuple. -// work is the list of fingerprint ranges that need to be indexed in bloom filters. -func (p *Planner) loadTenantWork( +// loadTenantTables loads all tenants with bloom build enabled for each table. +func (p *Planner) loadTenantTables( ctx context.Context, tables *dayRangeIterator, -) (work, error) { - tenantTableWork := make(map[config.DayTable]map[string][]v1.FingerprintBounds, tables.TotalDays()) +) (map[config.DayTable][]string, error) { + tenantTables := make(map[config.DayTable][]string, tables.TotalDays()) for tables.Next() && tables.Err() == nil && ctx.Err() == nil { table := tables.At() @@ -637,8 +627,8 @@ func (p *Planner) loadTenantWork( level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Remaining()) // If this is the first this we see this table, initialize the map - if tenantTableWork[table] == nil { - tenantTableWork[table] = make(map[string][]v1.FingerprintBounds, tenants.Remaining()) + if tenantTables[table] == nil { + tenantTables[table] = make([]string, tenants.Remaining()) } for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil { @@ -650,11 +640,6 @@ func (p *Planner) loadTenantWork( continue } - splitFactor := p.limits.BloomSplitSeriesKeyspaceBy(tenant) - bounds := SplitFingerprintKeyspaceByFactor(splitFactor) - - tenantTableWork[table][tenant] = bounds - // Reset progress tracking metrics for this tenant // NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal. // Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about. @@ -662,7 +647,7 @@ func (p *Planner) loadTenantWork( p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Set(0) p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Set(0) - level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor) + tenantTables[table] = append(tenantTables[table], tenant) } if err := tenants.Err(); err != nil { level.Error(p.logger).Log("msg", "error iterating tenants", "err", err) @@ -675,7 +660,7 @@ func (p *Planner) loadTenantWork( return nil, fmt.Errorf("error iterating tables: %w", err) } - return tenantTableWork, ctx.Err() + return tenantTables, ctx.Err() } func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.SliceIter[string], error) { @@ -687,178 +672,6 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli return iter.NewSliceIter(tenants), nil } -// blockPlan is a plan for all the work needed to build a meta.json -// It includes: -// - the tsdb (source of truth) which contains all the series+chunks -// we need to ensure are indexed in bloom blocks -// - a list of gaps that are out of date and need to be checked+built -// - within each gap, a list of block refs which overlap the gap are included -// so we can use them to accelerate bloom generation. They likely contain many -// of the same chunks we need to ensure are indexed, just from previous tsdb iterations. -// This is a performance optimization to avoid expensive re-reindexing -type blockPlan struct { - tsdb tsdb.SingleTenantTSDBIdentifier - gaps []protos.Gap -} - -func (p *Planner) findOutdatedGaps( - ctx context.Context, - tenant string, - tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, - ownershipRange v1.FingerprintBounds, - metas []bloomshipper.Meta, - logger log.Logger, -) ([]blockPlan, error) { - // Determine which TSDBs have gaps in the ownership range and need to - // be processed. - tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas) - if err != nil { - level.Error(logger).Log("msg", "failed to find gaps", "err", err) - return nil, fmt.Errorf("failed to find gaps: %w", err) - } - - if len(tsdbsWithGaps) == 0 { - level.Debug(logger).Log("msg", "blooms exist for all tsdbs") - return nil, nil - } - - work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas) - if err != nil { - level.Error(logger).Log("msg", "failed to create plan", "err", err) - return nil, fmt.Errorf("failed to create plan: %w", err) - } - - return work, nil -} - -// Used to signal the gaps that need to be populated for a tsdb -type tsdbGaps struct { - tsdbIdentifier tsdb.SingleTenantTSDBIdentifier - tsdb common.ClosableForSeries - gaps []v1.FingerprintBounds -} - -// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting -// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB. -func gapsBetweenTSDBsAndMetas( - ownershipRange v1.FingerprintBounds, - tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, - metas []bloomshipper.Meta, -) (res []tsdbGaps, err error) { - for db, tsdb := range tsdbs { - id := db.Name() - - relevantMetas := make([]v1.FingerprintBounds, 0, len(metas)) - for _, meta := range metas { - for _, s := range meta.Sources { - if s.Name() == id { - relevantMetas = append(relevantMetas, meta.Bounds) - } - } - } - - gaps, err := FindGapsInFingerprintBounds(ownershipRange, relevantMetas) - if err != nil { - return nil, err - } - - if len(gaps) > 0 { - res = append(res, tsdbGaps{ - tsdbIdentifier: db, - tsdb: tsdb, - gaps: gaps, - }) - } - } - - return res, err -} - -// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks. -// This allows us to expedite bloom generation by using existing blocks to fill in the gaps -// since many will contain the same chunks. -func blockPlansForGaps( - ctx context.Context, - tenant string, - tsdbs []tsdbGaps, - metas []bloomshipper.Meta, -) ([]blockPlan, error) { - plans := make([]blockPlan, 0, len(tsdbs)) - - for _, idx := range tsdbs { - plan := blockPlan{ - tsdb: idx.tsdbIdentifier, - gaps: make([]protos.Gap, 0, len(idx.gaps)), - } - - for _, gap := range idx.gaps { - planGap := protos.Gap{ - Bounds: gap, - } - - seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap) - if err != nil { - return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err) - } - planGap.Series, err = iter.Collect(seriesItr) - if err != nil { - return nil, fmt.Errorf("failed to collect series: %w", err) - } - - for _, meta := range metas { - if meta.Bounds.Intersection(gap) == nil { - // this meta doesn't overlap the gap, skip - continue - } - - for _, block := range meta.Blocks { - if block.Bounds.Intersection(gap) == nil { - // this block doesn't overlap the gap, skip - continue - } - // this block overlaps the gap, add it to the plan - // for this gap - planGap.Blocks = append(planGap.Blocks, block) - } - } - - // ensure we sort blocks so deduping iterator works as expected - sort.Slice(planGap.Blocks, func(i, j int) bool { - return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds) - }) - - peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef]( - iter.NewSliceIter[bloomshipper.BlockRef]( - planGap.Blocks, - ), - ) - // dedupe blocks which could be in multiple metas - itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef]( - func(a, b bloomshipper.BlockRef) bool { - return a == b - }, - iter.Identity[bloomshipper.BlockRef], - func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef { - return a - }, - peekingBlocks, - ) - - deduped, err := iter.Collect[bloomshipper.BlockRef](itr) - if err != nil { - return nil, fmt.Errorf("failed to dedupe blocks: %w", err) - } - planGap.Blocks = deduped - - plan.gaps = append(plan.gaps, planGap) - } - - plans = append(plans, plan) - } - - return plans, nil -} - func (p *Planner) addPendingTask(task *QueueTask) { p.pendingTasks.Store(task.ID, task) } diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 88e45c725917e..df02f8650385e 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -16,12 +16,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "go.uber.org/atomic" "google.golang.org/grpc" - "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/chunkenc" iter "github.com/grafana/loki/v3/pkg/iter/v2" @@ -33,7 +31,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/util/mempool" ) @@ -64,110 +61,6 @@ func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.Bl return m } -func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { - - for _, tc := range []struct { - desc string - err bool - exp []tsdbGaps - ownershipRange v1.FingerprintBounds - tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries - metas []bloomshipper.Meta - }{ - { - desc: "non-overlapping tsdbs and metas", - err: true, - ownershipRange: v1.NewBounds(0, 10), - tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, - }, - metas: []bloomshipper.Meta{ - genMeta(11, 20, []int{0}, nil), - }, - }, - { - desc: "single tsdb", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, - }, - metas: []bloomshipper.Meta{ - genMeta(4, 8, []int{0}, nil), - }, - exp: []tsdbGaps{ - { - tsdbIdentifier: tsdbID(0), - gaps: []v1.FingerprintBounds{ - v1.NewBounds(0, 3), - v1.NewBounds(9, 10), - }, - }, - }, - }, - { - desc: "multiple tsdbs with separate blocks", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, - tsdbID(1): nil, - }, - metas: []bloomshipper.Meta{ - genMeta(0, 5, []int{0}, nil), - genMeta(6, 10, []int{1}, nil), - }, - exp: []tsdbGaps{ - { - tsdbIdentifier: tsdbID(0), - gaps: []v1.FingerprintBounds{ - v1.NewBounds(6, 10), - }, - }, - { - tsdbIdentifier: tsdbID(1), - gaps: []v1.FingerprintBounds{ - v1.NewBounds(0, 5), - }, - }, - }, - }, - { - desc: "multiple tsdbs with the same blocks", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, - tsdbID(1): nil, - }, - metas: []bloomshipper.Meta{ - genMeta(0, 5, []int{0, 1}, nil), - genMeta(6, 8, []int{1}, nil), - }, - exp: []tsdbGaps{ - { - tsdbIdentifier: tsdbID(0), - gaps: []v1.FingerprintBounds{ - v1.NewBounds(6, 10), - }, - }, - { - tsdbIdentifier: tsdbID(1), - gaps: []v1.FingerprintBounds{ - v1.NewBounds(9, 10), - }, - }, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) - if tc.err { - require.Error(t, err) - return - } - require.Equal(t, tc.exp, gaps) - }) - } -} - func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { startTS, endTS := testDay.Bounds() return bloomshipper.BlockRef{ @@ -214,265 +107,6 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { }, nil } -func Test_blockPlansForGaps(t *testing.T) { - for _, tc := range []struct { - desc string - ownershipRange v1.FingerprintBounds - tsdbs []tsdb.SingleTenantTSDBIdentifier - metas []bloomshipper.Meta - err bool - exp []blockPlan - }{ - { - desc: "single overlapping meta+no overlapping block", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, - metas: []bloomshipper.Meta{ - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}), - }, - exp: []blockPlan{ - { - tsdb: tsdbID(0), - gaps: []protos.Gap{ - { - Bounds: v1.NewBounds(0, 10), - Series: genSeries(v1.NewBounds(0, 10)), - }, - }, - }, - }, - }, - { - desc: "single overlapping meta+one overlapping block", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, - metas: []bloomshipper.Meta{ - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), - }, - exp: []blockPlan{ - { - tsdb: tsdbID(0), - gaps: []protos.Gap{ - { - Bounds: v1.NewBounds(0, 10), - Series: genSeries(v1.NewBounds(0, 10)), - Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)}, - }, - }, - }, - }, - }, - { - // the range which needs to be generated doesn't overlap with existing blocks - // from other tsdb versions since theres an up to date tsdb version block, - // but we can trim the range needing generation - desc: "trims up to date area", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, - metas: []bloomshipper.Meta{ - genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb - genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb - }, - exp: []blockPlan{ - { - tsdb: tsdbID(0), - gaps: []protos.Gap{ - { - Bounds: v1.NewBounds(0, 8), - Series: genSeries(v1.NewBounds(0, 8)), - }, - }, - }, - }, - }, - { - desc: "uses old block for overlapping range", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, - metas: []bloomshipper.Meta{ - genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb - }, - exp: []blockPlan{ - { - tsdb: tsdbID(0), - gaps: []protos.Gap{ - { - Bounds: v1.NewBounds(0, 8), - Series: genSeries(v1.NewBounds(0, 8)), - Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)}, - }, - }, - }, - }, - }, - { - desc: "multi case", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs - metas: []bloomshipper.Meta{ - genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ - genBlockRef(0, 1), - genBlockRef(1, 2), - }), // tsdb_0 - genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0 - - genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1 - genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1 - }, - exp: []blockPlan{ - { - tsdb: tsdbID(0), - gaps: []protos.Gap{ - // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) - { - Bounds: v1.NewBounds(3, 5), - Series: genSeries(v1.NewBounds(3, 5)), - Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)}, - }, - { - Bounds: v1.NewBounds(9, 10), - Series: genSeries(v1.NewBounds(9, 10)), - Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)}, - }, - }, - }, - // tsdb (id=1) can source chunks from the blocks built from tsdb (id=0) - { - tsdb: tsdbID(1), - gaps: []protos.Gap{ - { - Bounds: v1.NewBounds(0, 2), - Series: genSeries(v1.NewBounds(0, 2)), - Blocks: []bloomshipper.BlockRef{ - genBlockRef(0, 1), - genBlockRef(1, 2), - }, - }, - { - Bounds: v1.NewBounds(6, 7), - Series: genSeries(v1.NewBounds(6, 7)), - Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)}, - }, - }, - }, - }, - }, - { - desc: "dedupes block refs", - ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, - metas: []bloomshipper.Meta{ - genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ - genBlockRef(1, 4), - genBlockRef(9, 20), - }), // blocks for first diff tsdb - genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{ - genBlockRef(5, 10), - genBlockRef(9, 20), // same block references in prior meta (will be deduped) - }), // block for second diff tsdb - }, - exp: []blockPlan{ - { - tsdb: tsdbID(0), - gaps: []protos.Gap{ - { - Bounds: v1.NewBounds(0, 10), - Series: genSeries(v1.NewBounds(0, 10)), - Blocks: []bloomshipper.BlockRef{ - genBlockRef(1, 4), - genBlockRef(5, 10), - genBlockRef(9, 20), - }, - }, - }, - }, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - // We add series spanning the whole FP ownership range - tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries) - for _, id := range tc.tsdbs { - tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange)) - } - - // we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested - // separately and it's used to generate input in our regular code path (easier to write tests this way). - gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tsdbs, tc.metas) - require.NoError(t, err) - - plans, err := blockPlansForGaps( - context.Background(), - "fakeTenant", - gaps, - tc.metas, - ) - if tc.err { - require.Error(t, err) - return - } - require.Equal(t, tc.exp, plans) - }) - } -} - -func genSeries(bounds v1.FingerprintBounds) []*v1.Series { - series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) - for i := bounds.Min; i <= bounds.Max; i++ { - series = append(series, &v1.Series{ - Fingerprint: i, - Chunks: v1.ChunkRefs{ - { - From: 0, - Through: 1, - Checksum: 1, - }, - }, - }) - } - return series -} - -type fakeForSeries struct { - series []*v1.Series -} - -func newFakeForSeries(series []*v1.Series) *fakeForSeries { - return &fakeForSeries{ - series: series, - } -} - -func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error { - overlapping := make([]*v1.Series, 0, len(f.series)) - for _, s := range f.series { - if ff.Match(s.Fingerprint) { - overlapping = append(overlapping, s) - } - } - - for _, s := range overlapping { - chunks := make([]index.ChunkMeta, 0, len(s.Chunks)) - for _, c := range s.Chunks { - chunks = append(chunks, index.ChunkMeta{ - MinTime: int64(c.From), - MaxTime: int64(c.Through), - Checksum: c.Checksum, - }) - } - - if fn(labels.EmptyLabels(), s.Fingerprint, chunks) { - break - } - } - return nil -} - -func (f fakeForSeries) Close() error { - return nil -} - func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask { tasks := make([]*QueueTask, 0, n) // Enqueue tasks diff --git a/pkg/bloombuild/planner/strategies/factory.go b/pkg/bloombuild/planner/strategies/factory.go new file mode 100644 index 0000000000000..81211fbf7a837 --- /dev/null +++ b/pkg/bloombuild/planner/strategies/factory.go @@ -0,0 +1,50 @@ +package strategies + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + + "github.com/grafana/loki/v3/pkg/bloombuild/common" + "github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies/splitkeyspace" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +const ( + SplitKeyspaceStrategy = "split" +) + +type Limits interface { + splitkeyspace.Limits + BloomPlanningStrategy(tenantID string) string +} + +type PlanningStrategy interface { + // Plan returns a set of tasks for a given tenant-table tuple and TSDBs. + Plan( + ctx context.Context, + table config.DayTable, + tenant string, + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, + metas []bloomshipper.Meta, + ) ([]*protos.Task, error) +} + +func NewStrategy( + tenantID string, + limits Limits, + logger log.Logger, +) (PlanningStrategy, error) { + strategy := limits.BloomPlanningStrategy(tenantID) + + switch strategy { + case SplitKeyspaceStrategy: + return splitkeyspace.NewSplitKeyspaceStrategy(limits, logger) + default: + return nil, fmt.Errorf("unknown bloom planning strategy (%s)", strategy) + } +} diff --git a/pkg/bloombuild/planner/strategies/splitKeyspace/strategy.go b/pkg/bloombuild/planner/strategies/splitKeyspace/strategy.go new file mode 100644 index 0000000000000..560442dcf0aca --- /dev/null +++ b/pkg/bloombuild/planner/strategies/splitKeyspace/strategy.go @@ -0,0 +1,245 @@ +package splitkeyspace + +import ( + "context" + "fmt" + "sort" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + + "github.com/grafana/loki/v3/pkg/bloombuild/common" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" + iter "github.com/grafana/loki/v3/pkg/iter/v2" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +type Limits interface { + BloomSplitSeriesKeyspaceBy(tenantID string) int +} + +type Strategy struct { + limits Limits + + logger log.Logger +} + +func NewSplitKeyspaceStrategy( + limits Limits, + logger log.Logger, +) (*Strategy, error) { + return &Strategy{ + limits: limits, + logger: logger, + }, nil +} + +func (s *Strategy) Plan( + ctx context.Context, + table config.DayTable, + tenant string, + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, + metas []bloomshipper.Meta, +) ([]*protos.Task, error) { + splitFactor := s.limits.BloomSplitSeriesKeyspaceBy(tenant) + ownershipRanges := SplitFingerprintKeyspaceByFactor(splitFactor) + + logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant) + level.Debug(s.logger).Log("msg", "loading work for tenant", "splitFactor", splitFactor) + + var tasks []*protos.Task + for _, ownershipRange := range ownershipRanges { + logger := log.With(logger, "ownership", ownershipRange.String()) + + // Filter only the metas that overlap in the ownership range + metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange) + + // Find gaps in the TSDBs for this tenant/table + gaps, err := s.findOutdatedGaps(ctx, tenant, tsdbs, ownershipRange, metasInBounds, logger) + if err != nil { + level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err) + continue + } + + for _, gap := range gaps { + tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps)) + } + } + + return tasks, nil +} + +// blockPlan is a plan for all the work needed to build a meta.json +// It includes: +// - the tsdb (source of truth) which contains all the series+chunks +// we need to ensure are indexed in bloom blocks +// - a list of gaps that are out of date and need to be checked+built +// - within each gap, a list of block refs which overlap the gap are included +// so we can use them to accelerate bloom generation. They likely contain many +// of the same chunks we need to ensure are indexed, just from previous tsdb iterations. +// This is a performance optimization to avoid expensive re-reindexing +type blockPlan struct { + tsdb tsdb.SingleTenantTSDBIdentifier + gaps []protos.Gap +} + +func (s *Strategy) findOutdatedGaps( + ctx context.Context, + tenant string, + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, + ownershipRange v1.FingerprintBounds, + metas []bloomshipper.Meta, + logger log.Logger, +) ([]blockPlan, error) { + // Determine which TSDBs have gaps in the ownership range and need to + // be processed. + tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas) + if err != nil { + level.Error(logger).Log("msg", "failed to find gaps", "err", err) + return nil, fmt.Errorf("failed to find gaps: %w", err) + } + + if len(tsdbsWithGaps) == 0 { + level.Debug(logger).Log("msg", "blooms exist for all tsdbs") + return nil, nil + } + + work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas) + if err != nil { + level.Error(logger).Log("msg", "failed to create plan", "err", err) + return nil, fmt.Errorf("failed to create plan: %w", err) + } + + return work, nil +} + +// Used to signal the gaps that need to be populated for a tsdb +type tsdbGaps struct { + tsdbIdentifier tsdb.SingleTenantTSDBIdentifier + tsdb common.ClosableForSeries + gaps []v1.FingerprintBounds +} + +// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting +// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB. +func gapsBetweenTSDBsAndMetas( + ownershipRange v1.FingerprintBounds, + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, + metas []bloomshipper.Meta, +) (res []tsdbGaps, err error) { + for db, tsdb := range tsdbs { + id := db.Name() + + relevantMetas := make([]v1.FingerprintBounds, 0, len(metas)) + for _, meta := range metas { + for _, s := range meta.Sources { + if s.Name() == id { + relevantMetas = append(relevantMetas, meta.Bounds) + } + } + } + + gaps, err := FindGapsInFingerprintBounds(ownershipRange, relevantMetas) + if err != nil { + return nil, err + } + + if len(gaps) > 0 { + res = append(res, tsdbGaps{ + tsdbIdentifier: db, + tsdb: tsdb, + gaps: gaps, + }) + } + } + + return res, err +} + +// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks. +// This allows us to expedite bloom generation by using existing blocks to fill in the gaps +// since many will contain the same chunks. +func blockPlansForGaps( + ctx context.Context, + tenant string, + tsdbs []tsdbGaps, + metas []bloomshipper.Meta, +) ([]blockPlan, error) { + plans := make([]blockPlan, 0, len(tsdbs)) + + for _, idx := range tsdbs { + plan := blockPlan{ + tsdb: idx.tsdbIdentifier, + gaps: make([]protos.Gap, 0, len(idx.gaps)), + } + + for _, gap := range idx.gaps { + planGap := protos.Gap{ + Bounds: gap, + } + + seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap) + if err != nil { + return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err) + } + planGap.Series, err = iter.Collect(seriesItr) + if err != nil { + return nil, fmt.Errorf("failed to collect series: %w", err) + } + + for _, meta := range metas { + if meta.Bounds.Intersection(gap) == nil { + // this meta doesn't overlap the gap, skip + continue + } + + for _, block := range meta.Blocks { + if block.Bounds.Intersection(gap) == nil { + // this block doesn't overlap the gap, skip + continue + } + // this block overlaps the gap, add it to the plan + // for this gap + planGap.Blocks = append(planGap.Blocks, block) + } + } + + // ensure we sort blocks so deduping iterator works as expected + sort.Slice(planGap.Blocks, func(i, j int) bool { + return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds) + }) + + peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef]( + iter.NewSliceIter[bloomshipper.BlockRef]( + planGap.Blocks, + ), + ) + // dedupe blocks which could be in multiple metas + itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef]( + func(a, b bloomshipper.BlockRef) bool { + return a == b + }, + iter.Identity[bloomshipper.BlockRef], + func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef { + return a + }, + peekingBlocks, + ) + + deduped, err := iter.Collect[bloomshipper.BlockRef](itr) + if err != nil { + return nil, fmt.Errorf("failed to dedupe blocks: %w", err) + } + planGap.Blocks = deduped + + plan.gaps = append(plan.gaps, planGap) + } + + plans = append(plans, plan) + } + + return plans, nil +} diff --git a/pkg/bloombuild/planner/strategies/splitKeyspace/strategy_test.go b/pkg/bloombuild/planner/strategies/splitKeyspace/strategy_test.go new file mode 100644 index 0000000000000..8d576d08626d0 --- /dev/null +++ b/pkg/bloombuild/planner/strategies/splitKeyspace/strategy_test.go @@ -0,0 +1,433 @@ +package splitkeyspace + +import ( + "context" + "testing" + "time" + + "github.com/grafana/loki/v3/pkg/bloombuild/protos" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/prometheus/prometheus/model/labels" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/bloombuild/common" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +var testDay = parseDayTime("2023-09-01") +var testTable = config.NewDayTable(testDay, "index_") + +func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { + + for _, tc := range []struct { + desc string + err bool + exp []tsdbGaps + ownershipRange v1.FingerprintBounds + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries + metas []bloomshipper.Meta + }{ + { + desc: "non-overlapping tsdbs and metas", + err: true, + ownershipRange: v1.NewBounds(0, 10), + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + }, + metas: []bloomshipper.Meta{ + genMeta(11, 20, []int{0}, nil), + }, + }, + { + desc: "single tsdb", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + }, + metas: []bloomshipper.Meta{ + genMeta(4, 8, []int{0}, nil), + }, + exp: []tsdbGaps{ + { + tsdbIdentifier: tsdbID(0), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(0, 3), + v1.NewBounds(9, 10), + }, + }, + }, + }, + { + desc: "multiple tsdbs with separate blocks", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + tsdbID(1): nil, + }, + metas: []bloomshipper.Meta{ + genMeta(0, 5, []int{0}, nil), + genMeta(6, 10, []int{1}, nil), + }, + exp: []tsdbGaps{ + { + tsdbIdentifier: tsdbID(0), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(6, 10), + }, + }, + { + tsdbIdentifier: tsdbID(1), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(0, 5), + }, + }, + }, + }, + { + desc: "multiple tsdbs with the same blocks", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + tsdbID(1): nil, + }, + metas: []bloomshipper.Meta{ + genMeta(0, 5, []int{0, 1}, nil), + genMeta(6, 8, []int{1}, nil), + }, + exp: []tsdbGaps{ + { + tsdbIdentifier: tsdbID(0), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(6, 10), + }, + }, + { + tsdbIdentifier: tsdbID(1), + gaps: []v1.FingerprintBounds{ + v1.NewBounds(9, 10), + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) + if tc.err { + require.Error(t, err) + return + } + require.Equal(t, tc.exp, gaps) + }) + } +} + +func Test_blockPlansForGaps(t *testing.T) { + for _, tc := range []struct { + desc string + ownershipRange v1.FingerprintBounds + tsdbs []tsdb.SingleTenantTSDBIdentifier + metas []bloomshipper.Meta + err bool + exp []blockPlan + }{ + { + desc: "single overlapping meta+no overlapping block", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}), + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []protos.Gap{ + { + Bounds: v1.NewBounds(0, 10), + Series: genSeries(v1.NewBounds(0, 10)), + }, + }, + }, + }, + }, + { + desc: "single overlapping meta+one overlapping block", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []protos.Gap{ + { + Bounds: v1.NewBounds(0, 10), + Series: genSeries(v1.NewBounds(0, 10)), + Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)}, + }, + }, + }, + }, + }, + { + // the range which needs to be generated doesn't overlap with existing blocks + // from other tsdb versions since theres an up to date tsdb version block, + // but we can trim the range needing generation + desc: "trims up to date area", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []protos.Gap{ + { + Bounds: v1.NewBounds(0, 8), + Series: genSeries(v1.NewBounds(0, 8)), + }, + }, + }, + }, + }, + { + desc: "uses old block for overlapping range", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []protos.Gap{ + { + Bounds: v1.NewBounds(0, 8), + Series: genSeries(v1.NewBounds(0, 8)), + Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)}, + }, + }, + }, + }, + }, + { + desc: "multi case", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs + metas: []bloomshipper.Meta{ + genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ + genBlockRef(0, 1), + genBlockRef(1, 2), + }), // tsdb_0 + genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0 + + genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1 + genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1 + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []protos.Gap{ + // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) + { + Bounds: v1.NewBounds(3, 5), + Series: genSeries(v1.NewBounds(3, 5)), + Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)}, + }, + { + Bounds: v1.NewBounds(9, 10), + Series: genSeries(v1.NewBounds(9, 10)), + Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)}, + }, + }, + }, + // tsdb (id=1) can source chunks from the blocks built from tsdb (id=0) + { + tsdb: tsdbID(1), + gaps: []protos.Gap{ + { + Bounds: v1.NewBounds(0, 2), + Series: genSeries(v1.NewBounds(0, 2)), + Blocks: []bloomshipper.BlockRef{ + genBlockRef(0, 1), + genBlockRef(1, 2), + }, + }, + { + Bounds: v1.NewBounds(6, 7), + Series: genSeries(v1.NewBounds(6, 7)), + Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)}, + }, + }, + }, + }, + }, + { + desc: "dedupes block refs", + ownershipRange: v1.NewBounds(0, 10), + tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + metas: []bloomshipper.Meta{ + genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ + genBlockRef(1, 4), + genBlockRef(9, 20), + }), // blocks for first diff tsdb + genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{ + genBlockRef(5, 10), + genBlockRef(9, 20), // same block references in prior meta (will be deduped) + }), // block for second diff tsdb + }, + exp: []blockPlan{ + { + tsdb: tsdbID(0), + gaps: []protos.Gap{ + { + Bounds: v1.NewBounds(0, 10), + Series: genSeries(v1.NewBounds(0, 10)), + Blocks: []bloomshipper.BlockRef{ + genBlockRef(1, 4), + genBlockRef(5, 10), + genBlockRef(9, 20), + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // We add series spanning the whole FP ownership range + tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries) + for _, id := range tc.tsdbs { + tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange)) + } + + // we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested + // separately and it's used to generate input in our regular code path (easier to write tests this way). + gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tsdbs, tc.metas) + require.NoError(t, err) + + plans, err := blockPlansForGaps( + context.Background(), + "fakeTenant", + gaps, + tc.metas, + ) + if tc.err { + require.Error(t, err) + return + } + require.Equal(t, tc.exp, plans) + }) + } +} + +func genSeries(bounds v1.FingerprintBounds) []*v1.Series { + series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) + for i := bounds.Min; i <= bounds.Max; i++ { + series = append(series, &v1.Series{ + Fingerprint: i, + Chunks: v1.ChunkRefs{ + { + From: 0, + Through: 1, + Checksum: 1, + }, + }, + }) + } + return series +} + +func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { + m := bloomshipper.Meta{ + MetaRef: bloomshipper.MetaRef{ + Ref: bloomshipper.Ref{ + TenantID: "fakeTenant", + TableName: testTable.Addr(), + Bounds: v1.NewBounds(min, max), + }, + }, + Blocks: blocks, + } + for _, source := range sources { + m.Sources = append(m.Sources, tsdbID(source)) + } + return m +} + +func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { + startTS, endTS := testDay.Bounds() + return bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: "fakeTenant", + TableName: testTable.Addr(), + Bounds: v1.NewBounds(min, max), + StartTimestamp: startTS, + EndTimestamp: endTS, + Checksum: 0, + }, + } +} + +func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { + return tsdb.SingleTenantTSDBIdentifier{ + TS: time.Unix(int64(n), 0), + } +} + +func parseDayTime(s string) config.DayTime { + t, err := time.Parse("2006-01-02", s) + if err != nil { + panic(err) + } + return config.DayTime{ + Time: model.TimeFromUnix(t.Unix()), + } +} + +type fakeForSeries struct { + series []*v1.Series +} + +func newFakeForSeries(series []*v1.Series) *fakeForSeries { + return &fakeForSeries{ + series: series, + } +} + +func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error { + overlapping := make([]*v1.Series, 0, len(f.series)) + for _, s := range f.series { + if ff.Match(s.Fingerprint) { + overlapping = append(overlapping, s) + } + } + + for _, s := range overlapping { + chunks := make([]index.ChunkMeta, 0, len(s.Chunks)) + for _, c := range s.Chunks { + chunks = append(chunks, index.ChunkMeta{ + MinTime: int64(c.From), + MaxTime: int64(c.Through), + Checksum: c.Checksum, + }) + } + + if fn(labels.EmptyLabels(), s.Fingerprint, chunks) { + break + } + } + return nil +} + +func (f fakeForSeries) Close() error { + return nil +} diff --git a/pkg/bloombuild/planner/strategies/splitKeyspace/util.go b/pkg/bloombuild/planner/strategies/splitKeyspace/util.go new file mode 100644 index 0000000000000..91ae8fcdc16a7 --- /dev/null +++ b/pkg/bloombuild/planner/strategies/splitKeyspace/util.go @@ -0,0 +1,125 @@ +package splitkeyspace + +import ( + "fmt" + "math" + + "github.com/prometheus/common/model" + + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" +) + +// SplitFingerprintKeyspaceByFactor splits the keyspace covered by model.Fingerprint into contiguous non-overlapping ranges. +func SplitFingerprintKeyspaceByFactor(factor int) []v1.FingerprintBounds { + if factor <= 0 { + return nil + } + + bounds := make([]v1.FingerprintBounds, 0, factor) + + // The keyspace of a Fingerprint is from 0 to max uint64. + keyspaceSize := uint64(math.MaxUint64) + + // Calculate the size of each range. + rangeSize := keyspaceSize / uint64(factor) + + for i := 0; i < factor; i++ { + // Calculate the start and end of the range. + start := uint64(i) * rangeSize + end := start + rangeSize - 1 + + // For the last range, make sure it ends at the end of the keyspace. + if i == factor-1 { + end = keyspaceSize + } + + // Create a FingerprintBounds for the range and add it to the slice. + bounds = append(bounds, v1.FingerprintBounds{ + Min: model.Fingerprint(start), + Max: model.Fingerprint(end), + }) + } + + return bounds +} + +func FindGapsInFingerprintBounds(ownershipRange v1.FingerprintBounds, metas []v1.FingerprintBounds) (gaps []v1.FingerprintBounds, err error) { + if len(metas) == 0 { + return []v1.FingerprintBounds{ownershipRange}, nil + } + + // turn the available metas into a list of non-overlapping metas + // for easier processing + var nonOverlapping []v1.FingerprintBounds + // First, we reduce the metas into a smaller set by combining overlaps. They must be sorted. + var cur *v1.FingerprintBounds + for i := 0; i < len(metas); i++ { + j := i + 1 + + // first iteration (i == 0), set the current meta + if cur == nil { + cur = &metas[i] + } + + if j >= len(metas) { + // We've reached the end of the list. Add the last meta to the non-overlapping set. + nonOverlapping = append(nonOverlapping, *cur) + break + } + + combined := cur.Union(metas[j]) + if len(combined) == 1 { + // There was an overlap between the two tested ranges. Combine them and keep going. + cur = &combined[0] + continue + } + + // There was no overlap between the two tested ranges. Add the first to the non-overlapping set. + // and keep the second for the next iteration. + nonOverlapping = append(nonOverlapping, combined[0]) + cur = &combined[1] + } + + // Now, detect gaps between the non-overlapping metas and the ownership range. + // The left bound of the ownership range will be adjusted as we go. + leftBound := ownershipRange.Min + for _, meta := range nonOverlapping { + + clippedMeta := meta.Intersection(ownershipRange) + // should never happen as long as we are only combining metas + // that intersect with the ownership range + if clippedMeta == nil { + return nil, fmt.Errorf("meta is not within ownership range: %v", meta) + } + + searchRange := ownershipRange.Slice(leftBound, clippedMeta.Max) + // update the left bound for the next iteration + // We do the max to prevent the max bound to overflow from MaxUInt64 to 0 + leftBound = min( + max(clippedMeta.Max+1, clippedMeta.Max), + max(ownershipRange.Max+1, ownershipRange.Max), + ) + + // since we've already ensured that the meta is within the ownership range, + // we know the xor will be of length zero (when the meta is equal to the ownership range) + // or 1 (when the meta is a subset of the ownership range) + xors := searchRange.Unless(*clippedMeta) + if len(xors) == 0 { + // meta is equal to the ownership range. This means the meta + // covers this entire section of the ownership range. + continue + } + + gaps = append(gaps, xors[0]) + } + + // If the leftBound is less than the ownership range max, and it's smaller than MaxUInt64, + // There is a gap between the last meta and the end of the ownership range. + // Note: we check `leftBound < math.MaxUint64` since in the loop above we clamp the + // leftBound to MaxUint64 to prevent an overflow to 0: `max(clippedMeta.Max+1, clippedMeta.Max)` + if leftBound < math.MaxUint64 && leftBound <= ownershipRange.Max { + gaps = append(gaps, v1.NewBounds(leftBound, ownershipRange.Max)) + } + + return gaps, nil +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index efeb7d2cb5ae3..7f00b68fe5fbc 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -207,6 +207,7 @@ type Limits struct { BloomCompactorMaxBloomSize flagext.ByteSize `yaml:"bloom_compactor_max_bloom_size" json:"bloom_compactor_max_bloom_size" category:"experimental"` BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` + BloomPlanningStrategy string `yaml:"bloom_planning_strategy" json:"bloom_planning_strategy" category:"experimental"` BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"` BuilderResponseTimeout time.Duration `yaml:"bloom_build_builder_response_timeout" json:"bloom_build_builder_response_timeout" category:"experimental"` @@ -389,7 +390,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { ) f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.") - f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.") + f.StringVar(&l.BloomPlanningStrategy, "bloom-build.planning-strategy", "split", "Experimental. Bloom planning strategy to use in bloom creation. Can be one of: 'split'") + f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Only if `bloom-build.planning-strategy` is 'split'. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.") f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.") f.DurationVar(&l.BuilderResponseTimeout, "bloom-build.builder-response-timeout", 0, "Experimental. Timeout for a builder to finish a task. If a builder does not respond within this time, it is considered failed and the task will be requeued. 0 disables the timeout.") f.IntVar(&l.BloomTaskMaxRetries, "bloom-build.task-max-retries", 3, "Experimental. Maximum number of retries for a failed task. If a task fails more than this number of times, it is considered failed and will not be retried. A value of 0 disables this limit.") @@ -995,6 +997,10 @@ func (o *Overrides) BloomCreationEnabled(userID string) bool { return o.getOverridesForUser(userID).BloomCreationEnabled } +func (o *Overrides) BloomPlanningStrategy(userID string) string { + return o.getOverridesForUser(userID).BloomPlanningStrategy +} + func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int { return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy }