Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Extract task computing into a strategy interface #13690

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3764,9 +3764,14 @@ shard_streams:
# CLI flag: -bloom-build.enable
[bloom_creation_enabled: <boolean> | default = false]

# Experimental. Number of splits to create for the series keyspace when building
# blooms. The series keyspace is split into this many parts to parallelize bloom
# creation.
# Experimental. Bloom planning strategy to use in bloom creation. Can be one of:
# 'split'
# CLI flag: -bloom-build.planning-strategy
[bloom_planning_strategy: <string> | default = "split"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but IMO "split" is too generic for a planning strategy.

I'd rather call it "split_by_series_count", "split_by_series_keyspace", "split_by_series_volume", etc, or short without the "split_by_" prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to split_keyspace_by_factor


# Experimental. Only if `bloom-build.planning-strategy` is 'split'. Number of
# splits to create for the series keyspace when building blooms. The series
# keyspace is split into this many parts to parallelize bloom creation.
# CLI flag: -bloom-build.split-keyspace-by
[bloom_split_series_keyspace_by: <int> | default = 256]

Expand Down
4 changes: 3 additions & 1 deletion pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"time"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
)

// Config configures the bloom-planner component.
Expand Down Expand Up @@ -44,8 +46,8 @@ func (cfg *Config) Validate() error {

type Limits interface {
RetentionLimits
strategies.Limits
BloomCreationEnabled(tenantID string) bool
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
BuilderResponseTimeout(tenantID string) time.Duration
BloomTaskMaxRetries(tenantID string) int
Expand Down
239 changes: 26 additions & 213 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/queue"
Expand Down Expand Up @@ -254,7 +255,7 @@ func (p *Planner) runOne(ctx context.Context) error {
tables := p.tables(time.Now())
level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())

work, err := p.loadTenantWork(ctx, tables)
tenantTables, err := p.loadTenantTables(ctx, tables)
if err != nil {
return fmt.Errorf("error loading work: %w", err)
}
Expand All @@ -265,19 +266,21 @@ func (p *Planner) runOne(ctx context.Context) error {
tasksResultForTenantTable := make(map[tenantTable]tenantTableTaskResults)
var totalTasks int

for table, tenants := range work {
for tenant, ownershipRanges := range tenants {
for table, tenants := range tenantTables {
for _, tenant := range tenants {
logger := log.With(p.logger, "tenant", tenant, "table", table.Addr())

tt := tenantTable{
tenant: tenant,
table: table,
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, ownershipRanges)
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "error computing tasks", "err", err)
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
}

level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas))

var tenantTableEnqueuedTasks int
Expand Down Expand Up @@ -367,17 +370,20 @@ func (p *Planner) runOne(ctx context.Context) error {
return nil
}

// computeTasks computes the tasks for a given table and tenant and ownership range.
// It returns the tasks to be executed and the metas that are existing relevant for the ownership range.
// computeTasks computes the tasks for a given table and tenant.
// It returns the tasks to be executed and the existing metas.
func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
ownershipRanges []v1.FingerprintBounds,
) ([]*protos.Task, []bloomshipper.Meta, error) {
var tasks []*protos.Task
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)

strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
}

// Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms
metas, err := p.bloomStore.FetchMetas(
ctx,
Expand Down Expand Up @@ -421,22 +427,9 @@ func (p *Planner) computeTasks(
}
}()

for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())

// Filter only the metas that overlap in the ownership range
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)

// Find gaps in the TSDBs for this tenant/table
gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
}

for _, gap := range gaps {
tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
}
tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}

return tasks, metas, nil
Expand Down Expand Up @@ -649,15 +642,12 @@ func (p *Planner) tables(ts time.Time) *dayRangeIterator {
return newDayRangeIterator(fromDay, throughDay, p.schemaCfg)
}

type work map[config.DayTable]map[string][]v1.FingerprintBounds

// loadTenantWork loads the work for each tenant and table tuple.
// work is the list of fingerprint ranges that need to be indexed in bloom filters.
func (p *Planner) loadTenantWork(
// loadTenantTables loads all tenants with bloom build enabled for each table.
func (p *Planner) loadTenantTables(
ctx context.Context,
tables *dayRangeIterator,
) (work, error) {
tenantTableWork := make(map[config.DayTable]map[string][]v1.FingerprintBounds, tables.TotalDays())
) (map[config.DayTable][]string, error) {
tenantTables := make(map[config.DayTable][]string, tables.TotalDays())

for tables.Next() && tables.Err() == nil && ctx.Err() == nil {
table := tables.At()
Expand All @@ -670,8 +660,8 @@ func (p *Planner) loadTenantWork(
level.Debug(p.logger).Log("msg", "loaded tenants", "table", table, "tenants", tenants.Remaining())

// If this is the first this we see this table, initialize the map
if tenantTableWork[table] == nil {
tenantTableWork[table] = make(map[string][]v1.FingerprintBounds, tenants.Remaining())
if tenantTables[table] == nil {
tenantTables[table] = make([]string, tenants.Remaining())
}

for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
Expand All @@ -683,19 +673,14 @@ func (p *Planner) loadTenantWork(
continue
}

splitFactor := p.limits.BloomSplitSeriesKeyspaceBy(tenant)
bounds := SplitFingerprintKeyspaceByFactor(splitFactor)

tenantTableWork[table][tenant] = bounds

// Reset progress tracking metrics for this tenant
// NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal.
// Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about.
p.metrics.tenantTasksPlanned.WithLabelValues(tenant).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Set(0)

level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor)
tenantTables[table] = append(tenantTables[table], tenant)
}
if err := tenants.Err(); err != nil {
level.Error(p.logger).Log("msg", "error iterating tenants", "err", err)
Expand All @@ -708,7 +693,7 @@ func (p *Planner) loadTenantWork(
return nil, fmt.Errorf("error iterating tables: %w", err)
}

return tenantTableWork, ctx.Err()
return tenantTables, ctx.Err()
}

func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.SliceIter[string], error) {
Expand All @@ -720,178 +705,6 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
return iter.NewSliceIter(tenants), nil
}

// blockPlan is a plan for all the work needed to build a meta.json
// It includes:
// - the tsdb (source of truth) which contains all the series+chunks
// we need to ensure are indexed in bloom blocks
// - a list of gaps that are out of date and need to be checked+built
// - within each gap, a list of block refs which overlap the gap are included
// so we can use them to accelerate bloom generation. They likely contain many
// of the same chunks we need to ensure are indexed, just from previous tsdb iterations.
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.Gap
}

func (p *Planner) findOutdatedGaps(
ctx context.Context,
tenant string,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to find gaps", "err", err)
return nil, fmt.Errorf("failed to find gaps: %w", err)
}

if len(tsdbsWithGaps) == 0 {
level.Debug(logger).Log("msg", "blooms exist for all tsdbs")
return nil, nil
}

work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
}

return work, nil
}

// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
tsdb common.ClosableForSeries
gaps []v1.FingerprintBounds
}

// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for db, tsdb := range tsdbs {
id := db.Name()

relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
for _, meta := range metas {
for _, s := range meta.Sources {
if s.Name() == id {
relevantMetas = append(relevantMetas, meta.Bounds)
}
}
}

gaps, err := FindGapsInFingerprintBounds(ownershipRange, relevantMetas)
if err != nil {
return nil, err
}

if len(gaps) > 0 {
res = append(res, tsdbGaps{
tsdbIdentifier: db,
tsdb: tsdb,
gaps: gaps,
})
}
}

return res, err
}

// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(
ctx context.Context,
tenant string,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))

for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdbIdentifier,
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := protos.Gap{
Bounds: gap,
}

seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}

for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
}

for _, block := range meta.Blocks {
if block.Bounds.Intersection(gap) == nil {
// this block doesn't overlap the gap, skip
continue
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.Blocks = append(planGap.Blocks, block)
}
}

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.Blocks, func(i, j int) bool {
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})

peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef](
iter.NewSliceIter[bloomshipper.BlockRef](
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
iter.Identity[bloomshipper.BlockRef],
func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef {
return a
},
peekingBlocks,
)

deduped, err := iter.Collect[bloomshipper.BlockRef](itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
planGap.Blocks = deduped

plan.gaps = append(plan.gaps, planGap)
}

plans = append(plans, plan)
}

return plans, nil
}

func (p *Planner) addPendingTask(task *QueueTask) {
p.pendingTasks.Store(task.ID, task)
}
Expand Down
Loading
Loading