Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blockbuilder): consolidate on record counting planner #15247

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,21 @@ block_scheduler:
# CLI flag: -block-scheduler.interval
[interval: <duration> | default = 5m]

# Period used by the planner to calculate the start and end offset such that
# each job consumes records spanning the target period.
# CLI flag: -block-scheduler.target-records-spanning-period
[target_records_spanning_period: <duration> | default = 1h]

# Lookback period in milliseconds used by the scheduler to plan jobs when the
# consumer group has no commits. -1 consumes from the latest offset. -2
# consumes from the start of the partition.
# CLI flag: -block-scheduler.lookback-period
[lookback_period: <int> | default = -2]

# Strategy used by the planner to plan jobs. One of record-count
# CLI flag: -block-scheduler.strategy
[strategy: <string> | default = "record-count"]

# Target record count used by the planner to plan jobs. Only used when
# strategy is record-count
# CLI flag: -block-scheduler.target-record-count
[target_record_count: <int> | default = 1000]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
44 changes: 37 additions & 7 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"flag"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand All @@ -22,17 +24,36 @@ var (
)

type Config struct {
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
TargetRecordConsumptionPeriod time.Duration `yaml:"target_records_spanning_period"`
LookbackPeriod int64 `yaml:"lookback_period"`
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod int64 `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
planner Planner `yaml:"-"` // validated planner
TargetRecordCount int64 `yaml:"target_record_count"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.")
f.DurationVar(&cfg.TargetRecordConsumptionPeriod, prefix+"target-records-spanning-period", time.Hour, "Period used by the planner to calculate the start and end offset such that each job consumes records spanning the target period.")
f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.")
f.Int64Var(&cfg.LookbackPeriod, prefix+"lookback-period", -2, "Lookback period in milliseconds used by the scheduler to plan jobs when the consumer group has no commits. -1 consumes from the latest offset. -2 consumes from the start of the partition.")
f.StringVar(
&cfg.Strategy,
prefix+"strategy",
RecordCountStrategy,
fmt.Sprintf(
"Strategy used by the planner to plan jobs. One of %s",
strings.Join(validStrategies, ", "),
),
)
f.Int64Var(
&cfg.TargetRecordCount,
prefix+"target-record-count",
1000,
fmt.Sprintf(
"Target record count used by the planner to plan jobs. Only used when strategy is %s",
RecordCountStrategy,
),
)
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -48,6 +69,16 @@ func (cfg *Config) Validate() error {
return errors.New("only -1(latest) and -2(earliest) are valid as negative values for lookback_period")
}

switch cfg.Strategy {
case RecordCountStrategy:
if cfg.TargetRecordCount <= 0 {
return errors.New("target record count must be a non-zero value")
}
cfg.planner = NewRecordCountPlanner(cfg.TargetRecordCount)
default:
return fmt.Errorf("invalid strategy: %s", cfg.Strategy)
}

return nil
}

Expand All @@ -66,10 +97,9 @@ type BlockScheduler struct {

// NewScheduler creates a new scheduler instance
func NewScheduler(cfg Config, queue *JobQueue, offsetReader OffsetReader, logger log.Logger, r prometheus.Registerer) *BlockScheduler {
planner := NewTimeRangePlanner(cfg.TargetRecordConsumptionPeriod, offsetReader, func() time.Time { return time.Now().UTC() }, logger)
s := &BlockScheduler{
cfg: cfg,
planner: planner,
planner: cfg.planner,
offsetReader: offsetReader,
logger: logger,
metrics: NewMetrics(r),
Expand Down
101 changes: 101 additions & 0 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,104 @@ func TestMultipleBuilders(t *testing.T) {
t.Error("builder1 got unexpected second job")
}
}

func TestConfig_Validate(t *testing.T) {
tests := []struct {
name string
cfg Config
wantErr string
}{
{
name: "valid config with record count strategy",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
},
{
name: "zero interval",
cfg: Config{
Interval: 0,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
wantErr: "interval must be a non-zero value",
},
{
name: "negative interval",
cfg: Config{
Interval: -time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
wantErr: "interval must be a non-zero value",
},
{
name: "invalid lookback period",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -3,
Strategy: RecordCountStrategy,
TargetRecordCount: 1000,
},
wantErr: "only -1(latest) and -2(earliest) are valid as negative values for lookback_period",
},
{
name: "invalid strategy",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: "invalid",
TargetRecordCount: 1000,
},
wantErr: "invalid strategy: invalid",
},
{
name: "zero target record count",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: 0,
},
wantErr: "target record count must be a non-zero value",
},
{
name: "negative target record count",
cfg: Config{
Interval: time.Minute,
LookbackPeriod: -1,
Strategy: RecordCountStrategy,
TargetRecordCount: -1000,
},
wantErr: "target record count must be a non-zero value",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr != "" {
if err == nil {
t.Errorf("Validate() error = nil, wantErr %v", tt.wantErr)
return
}
if err.Error() != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
if err != nil {
t.Errorf("Validate() error = %v, wantErr nil", err)
}
// Check that planner is set for valid configs
if tt.cfg.planner == nil {
t.Error("Validate() did not set planner for valid config")
}
})
}
}
112 changes: 26 additions & 86 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -14,7 +13,6 @@ import (

// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
type OffsetReader interface {
ListOffsetsAfterMilli(context.Context, int64) (map[int32]kadm.ListedOffset, error)
GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error)
}

Expand All @@ -24,10 +22,13 @@ type Planner interface {
}

const (
RecordCountStrategy = "record_count"
TimeRangeStrategy = "time_range"
RecordCountStrategy = "record-count"
)

var validStrategies = []string{
RecordCountStrategy,
}

// tries to consume upto targetRecordCount records per partition
type RecordCountPlanner struct {
targetRecordCount int64
Expand All @@ -52,101 +53,40 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
return nil, err
}

var jobs []*JobWithPriority[int]
jobs := make([]*JobWithPriority[int], 0, len(offsets))
for _, partitionOffset := range offsets {
// kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
startOffset := partitionOffset.Commit.At + 1
endOffset := min(startOffset+p.targetRecordCount, partitionOffset.End.Offset)

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
Min: startOffset,
Max: endOffset,
}), int(partitionOffset.End.Offset-startOffset),
)

jobs = append(jobs, job)
}

// Sort jobs by partition number to ensure consistent ordering
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].Job.Partition < jobs[j].Job.Partition
})

return jobs, nil
}

// Targets consuming records spanning a configured period.
// This is a stateless planner, it is upto the caller to deduplicate or update jobs that are already in queue or progress.
type TimeRangePlanner struct {
offsetReader OffsetReader

buffer time.Duration
targetPeriod time.Duration
now func() time.Time

logger log.Logger
}

func NewTimeRangePlanner(interval time.Duration, offsetReader OffsetReader, now func() time.Time, logger log.Logger) *TimeRangePlanner {
return &TimeRangePlanner{
targetPeriod: interval,
buffer: interval,
offsetReader: offsetReader,
now: now,
logger: logger,
}
}

func (p *TimeRangePlanner) Name() string {
return TimeRangeStrategy
}

func (p *TimeRangePlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], error) {
// truncate to the nearest Interval
consumeUptoTS := p.now().Add(-p.buffer).Truncate(p.targetPeriod)

// this will return the latest offset in the partition if no records are produced after this ts.
consumeUptoOffsets, err := p.offsetReader.ListOffsetsAfterMilli(ctx, consumeUptoTS.UnixMilli())
if err != nil {
level.Error(p.logger).Log("msg", "failed to list offsets after timestamp", "err", err)
return nil, err
}

offsets, err := p.offsetReader.GroupLag(ctx)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
return nil, err
}

var jobs []*JobWithPriority[int]
for _, partitionOffset := range offsets {
startOffset := partitionOffset.Commit.At + 1
// TODO: we could further break down the work into Interval sized chunks if this partition has pending records spanning a long time range
// or have the builder consume in chunks and commit the job status back to scheduler.
endOffset := consumeUptoOffsets[partitionOffset.Partition].Offset
endOffset := partitionOffset.End.Offset

// Skip if there's no lag
if startOffset >= endOffset {
level.Info(p.logger).Log("msg", "no pending records to process", "partition", partitionOffset.Partition,
"commitOffset", partitionOffset.Commit.At,
"consumeUptoOffset", consumeUptoOffsets[partitionOffset.Partition].Offset)
continue
}

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
Min: startOffset,
Max: endOffset,
}), int(endOffset-startOffset),
)
// Create jobs of size targetRecordCount until we reach endOffset
for currentStart := startOffset; currentStart < endOffset; {
currentEnd := min(currentStart+p.targetRecordCount, endOffset)

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
Min: currentStart,
Max: currentEnd,
}), int(endOffset-currentStart), // priority is remaining records to process
)
jobs = append(jobs, job)

jobs = append(jobs, job)
currentStart = currentEnd
}
}

// Sort jobs by partition number to ensure consistent ordering
// Sort jobs by partition then priority
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].Job.Partition < jobs[j].Job.Partition
if jobs[i].Job.Partition != jobs[j].Job.Partition {
return jobs[i].Job.Partition < jobs[j].Job.Partition
}
return jobs[i].Priority > jobs[j].Priority
})

return jobs, nil
Expand Down
Loading
Loading