Skip to content

Commit

Permalink
Extract task computing into a strategy interface
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Jul 29, 2024
1 parent 450bbce commit 82bda3c
Show file tree
Hide file tree
Showing 9 changed files with 897 additions and 584 deletions.
11 changes: 8 additions & 3 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3796,9 +3796,14 @@ shard_streams:
# CLI flag: -bloom-build.enable
[bloom_creation_enabled: <boolean> | default = false]

# Experimental. Number of splits to create for the series keyspace when building
# blooms. The series keyspace is split into this many parts to parallelize bloom
# creation.
# Experimental. Bloom planning strategy to use in bloom creation. Can be one of:
# 'split'
# CLI flag: -bloom-build.planning-strategy
[bloom_planning_strategy: <string> | default = "split"]

# Experimental. Only if `bloom-build.planning-strategy` is 'split'. Number of
# splits to create for the series keyspace when building blooms. The series
# keyspace is split into this many parts to parallelize bloom creation.
# CLI flag: -bloom-build.split-keyspace-by
[bloom_split_series_keyspace_by: <int> | default = 256]

Expand Down
4 changes: 3 additions & 1 deletion pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"time"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
)

// Config configures the bloom-planner component.
Expand Down Expand Up @@ -44,8 +46,8 @@ func (cfg *Config) Validate() error {

type Limits interface {
RetentionLimits
strategies.Limits
BloomCreationEnabled(tenantID string) bool
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
BloomTaskMaxRetries(tenantID string) int
Expand Down
239 changes: 26 additions & 213 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/queue"
Expand Down Expand Up @@ -221,7 +222,7 @@ func (p *Planner) runOne(ctx context.Context) error {
tables := p.tables(time.Now())
level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())

work, err := p.loadTenantWork(ctx, tables)
tenantTables, err := p.loadTenantTables(ctx, tables)
if err != nil {
return fmt.Errorf("error loading work: %w", err)
}
Expand All @@ -232,19 +233,21 @@ func (p *Planner) runOne(ctx context.Context) error {
tasksResultForTenantTable := make(map[tenantTable]tenantTableTaskResults)
var totalTasks int

for table, tenants := range work {
for tenant, ownershipRanges := range tenants {
for table, tenants := range tenantTables {
for _, tenant := range tenants {
logger := log.With(p.logger, "tenant", tenant, "table", table.Addr())

tt := tenantTable{
tenant: tenant,
table: table,
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, ownershipRanges)
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "error computing tasks", "err", err)
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
}

level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas))

var tenantTableEnqueuedTasks int
Expand Down Expand Up @@ -334,17 +337,20 @@ func (p *Planner) runOne(ctx context.Context) error {
return nil
}

// computeTasks computes the tasks for a given table and tenant and ownership range.
// It returns the tasks to be executed and the metas that are existing relevant for the ownership range.
// computeTasks computes the tasks for a given table and tenant.
// It returns the tasks to be executed and the existing metas.
func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
ownershipRanges []v1.FingerprintBounds,
) ([]*protos.Task, []bloomshipper.Meta, error) {
var tasks []*protos.Task
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)

strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
}

// Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms
metas, err := p.bloomStore.FetchMetas(
ctx,
Expand Down Expand Up @@ -388,22 +394,9 @@ func (p *Planner) computeTasks(
}
}()

for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())

// Filter only the metas that overlap in the ownership range
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)

// Find gaps in the TSDBs for this tenant/table
gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
}

for _, gap := range gaps {
tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
}
tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}

return tasks, metas, nil
Expand Down Expand Up @@ -616,15 +609,12 @@ func (p *Planner) tables(ts time.Time) *dayRangeIterator {
return newDayRangeIterator(fromDay, throughDay, p.schemaCfg)
}

type work map[config.DayTable]map[string][]v1.FingerprintBounds

// loadTenantWork loads the work for each tenant and table tuple.
// work is the list of fingerprint ranges that need to be indexed in bloom filters.
func (p *Planner) loadTenantWork(
// loadTenantTables loads all tenants with bloom build enabled for each table.
func (p *Planner) loadTenantTables(
ctx context.Context,
tables *dayRangeIterator,
) (work, error) {
tenantTableWork := make(map[config.DayTable]map[string][]v1.FingerprintBounds, tables.TotalDays())
) (map[config.DayTable][]string, error) {
tenantTables := make(map[config.DayTable][]string, tables.TotalDays())

for tables.Next() && tables.Err() == nil && ctx.Err() == nil {
table := tables.At()
Expand All @@ -637,8 +627,8 @@ func (p *Planner) loadTenantWork(
level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Remaining())

// If this is the first this we see this table, initialize the map
if tenantTableWork[table] == nil {
tenantTableWork[table] = make(map[string][]v1.FingerprintBounds, tenants.Remaining())
if tenantTables[table] == nil {
tenantTables[table] = make([]string, tenants.Remaining())
}

for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
Expand All @@ -650,19 +640,14 @@ func (p *Planner) loadTenantWork(
continue
}

splitFactor := p.limits.BloomSplitSeriesKeyspaceBy(tenant)
bounds := SplitFingerprintKeyspaceByFactor(splitFactor)

tenantTableWork[table][tenant] = bounds

// Reset progress tracking metrics for this tenant
// NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal.
// Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about.
p.metrics.tenantTasksPlanned.WithLabelValues(tenant).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Set(0)

level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor)
tenantTables[table] = append(tenantTables[table], tenant)
}
if err := tenants.Err(); err != nil {
level.Error(p.logger).Log("msg", "error iterating tenants", "err", err)
Expand All @@ -675,7 +660,7 @@ func (p *Planner) loadTenantWork(
return nil, fmt.Errorf("error iterating tables: %w", err)
}

return tenantTableWork, ctx.Err()
return tenantTables, ctx.Err()
}

func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.SliceIter[string], error) {
Expand All @@ -687,178 +672,6 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
return iter.NewSliceIter(tenants), nil
}

// blockPlan is a plan for all the work needed to build a meta.json
// It includes:
// - the tsdb (source of truth) which contains all the series+chunks
// we need to ensure are indexed in bloom blocks
// - a list of gaps that are out of date and need to be checked+built
// - within each gap, a list of block refs which overlap the gap are included
// so we can use them to accelerate bloom generation. They likely contain many
// of the same chunks we need to ensure are indexed, just from previous tsdb iterations.
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.Gap
}

func (p *Planner) findOutdatedGaps(
ctx context.Context,
tenant string,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to find gaps", "err", err)
return nil, fmt.Errorf("failed to find gaps: %w", err)
}

if len(tsdbsWithGaps) == 0 {
level.Debug(logger).Log("msg", "blooms exist for all tsdbs")
return nil, nil
}

work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
}

return work, nil
}

// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
tsdb common.ClosableForSeries
gaps []v1.FingerprintBounds
}

// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for db, tsdb := range tsdbs {
id := db.Name()

relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
for _, meta := range metas {
for _, s := range meta.Sources {
if s.Name() == id {
relevantMetas = append(relevantMetas, meta.Bounds)
}
}
}

gaps, err := FindGapsInFingerprintBounds(ownershipRange, relevantMetas)
if err != nil {
return nil, err
}

if len(gaps) > 0 {
res = append(res, tsdbGaps{
tsdbIdentifier: db,
tsdb: tsdb,
gaps: gaps,
})
}
}

return res, err
}

// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(
ctx context.Context,
tenant string,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))

for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdbIdentifier,
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := protos.Gap{
Bounds: gap,
}

seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}

for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
}

for _, block := range meta.Blocks {
if block.Bounds.Intersection(gap) == nil {
// this block doesn't overlap the gap, skip
continue
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.Blocks = append(planGap.Blocks, block)
}
}

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.Blocks, func(i, j int) bool {
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})

peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef](
iter.NewSliceIter[bloomshipper.BlockRef](
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
iter.Identity[bloomshipper.BlockRef],
func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef {
return a
},
peekingBlocks,
)

deduped, err := iter.Collect[bloomshipper.BlockRef](itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
planGap.Blocks = deduped

plan.gaps = append(plan.gaps, planGap)
}

plans = append(plans, plan)
}

return plans, nil
}

func (p *Planner) addPendingTask(task *QueueTask) {
p.pendingTasks.Store(task.ID, task)
}
Expand Down
Loading

0 comments on commit 82bda3c

Please sign in to comment.