Skip to content

Commit

Permalink
fix(v2): avoid starvation of failed jobs in the compaction scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Nov 30, 2024
1 parent ee162aa commit 0736088
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 39 deletions.
1 change: 1 addition & 0 deletions pkg/experiment/metastore/compaction/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ type Schedule interface {
// AssignJob is called on behalf of the worker to request a new job.
AssignJob() (*raft_log.AssignedCompactionJob, error)
// AddJob is called on behalf of the planner to add a new job to the schedule.
// The scheduler may decline the job by returning a nil state.
AddJob(*raft_log.CompactionJobPlan) *raft_log.CompactionJobState
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func DefaultStrategy() Strategy {
MaxBlocksPerLevel: []uint{20, 10, 10},
MaxBlocksDefault: 10,
MaxLevel: 3,
MaxBatchAge: 3 * time.Minute.Nanoseconds(), //defaultMaxBlockBatchAge,
MaxBatchAge: defaultMaxBlockBatchAge,
CleanupBatchSize: 2,
CleanupDelay: 15 * time.Minute,
CleanupJobMaxLevel: 1,
Expand Down
54 changes: 42 additions & 12 deletions pkg/experiment/metastore/compaction/scheduler/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type schedule struct {
// Read-only.
scheduler *Scheduler
// Uncommitted schedule updates.
updates map[string]*raft_log.CompactionJobState
updates map[string]*raft_log.CompactionJobState
addedJobs int
// Modified copy of the job queue.
copied []priorityJobQueue
level int
Expand Down Expand Up @@ -90,10 +91,29 @@ func (p *schedule) newStateForStatusReport(status *raft_log.CompactionJobStatusU
return nil
}

// AddJob creates a state for the new plan. The method must be called
// after the last AssignJob and UpdateJob calls.
// AddJob creates a state for the newly planned job.
//
// The method must be called after the last AssignJob and UpdateJob calls.
// It returns an empty state if the queue size limit is reached.
//
// TODO(kolesnikovae): Implement displacement policy.
// When the scheduler queue is full, no new jobs can be added. Currently,
// it's possible that all jobs fail and can't be retried, and consequently,
// can't leave the queue, blocking the entire compaction process until the
// failure or queue limit is increased. Additionally, it's possible for a
// job to never be completed and thus remain in the queue indefinitely.
//
// One way to implement this is to evict the job with the highest number of
// failures (exceeding a configurable threshold, in addition to MaxFailures).
// This way, we can easily remove the job least likely to succeed.
// However, this needs to be handled explicitly in UpdateSchedule; at this
// point, we can only identify candidates for eviction.
func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.CompactionJobState {
// TODO(kolesnikovae): Job queue size limit.
if limit := p.scheduler.config.MaxQueueSize; limit > 0 {
if size := uint64(p.addedJobs + p.scheduler.queue.size()); size >= limit {
return nil
}
}
state := &raft_log.CompactionJobState{
Name: plan.Name,
CompactionLevel: plan.CompactionLevel,
Expand All @@ -102,13 +122,19 @@ func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.Compaction
Token: p.token,
}
p.updates[state.Name] = state
p.addedJobs++
return state
}

func (p *schedule) nextAssignment() *raft_log.CompactionJobState {
// We don't need to check the job ownership here: the worker asks
// for a job assigment (new ownership).
for p.level < len(p.scheduler.queue.levels) {
// We evict the job from our copy of the queue: each job is only
// accessible once. When we reach the bottom of the queue (the first
// failed job, or the last job in the queue), we move to the next
// level. Note that we check all in-progress jobs if there are not
// enough unassigned jobs in the queue.
pq := p.queueLevelCopy(p.level)
if pq.Len() == 0 {
p.level++
Expand All @@ -130,15 +156,17 @@ func (p *schedule) nextAssignment() *raft_log.CompactionJobState {
return p.assignJob(job)

case metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS:
if p.shouldReassign(job) {
if p.isFailed(job) {
// We reached the bottom of the queue: only failed jobs left.
p.level++
continue
}
if p.isAbandoned(job) {
state := p.assignJob(job)
state.Failures++
return state
}
}

// If no jobs can be assigned at this level.
p.level++
}

return nil
Expand All @@ -156,11 +184,13 @@ func (p *schedule) assignJob(e *jobEntry) *raft_log.CompactionJobState {
return job
}

func (p *schedule) shouldReassign(job *jobEntry) bool {
abandoned := p.now.UnixNano() > job.LeaseExpiresAt
func (p *schedule) isAbandoned(job *jobEntry) bool {
return p.now.UnixNano() > job.LeaseExpiresAt
}

func (p *schedule) isFailed(job *jobEntry) bool {
limit := p.scheduler.config.MaxFailures
faulty := limit > 0 && uint64(job.Failures) >= limit
return abandoned && !faulty
return limit > 0 && uint64(job.Failures) >= limit
}

// The queue must not be modified by the assigner. Therefore, we're copying the
Expand Down
96 changes: 72 additions & 24 deletions pkg/experiment/metastore/compaction/scheduler/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ func TestSchedule_Assign(t *testing.T) {
}

scheduler := NewScheduler(config, store, nil)
// The job plans are accessed when it's getting assigned.
// Their content is not important for the test.
plans := []*raft_log.CompactionJobPlan{
{Name: "2", Tenant: "A", Shard: 1, CompactionLevel: 0, SourceBlocks: []string{"d", "e", "f"}},
{Name: "3", Tenant: "A", Shard: 1, CompactionLevel: 0, SourceBlocks: []string{"j", "h", "i"}},
{Name: "1", Tenant: "A", Shard: 1, CompactionLevel: 1, SourceBlocks: []string{"a", "b", "c"}},
{Name: "2", CompactionLevel: 0},
{Name: "3", CompactionLevel: 0},
{Name: "1", CompactionLevel: 1},
}
for _, p := range plans {
store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil)
Expand Down Expand Up @@ -169,37 +171,55 @@ func TestSchedule_ReAssign(t *testing.T) {

scheduler := NewScheduler(config, store, nil)
plans := []*raft_log.CompactionJobPlan{
{Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}},
{Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}},
{Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}},
{Name: "1"},
{Name: "2"},
{Name: "3"},
{Name: "4"},
{Name: "5"},
{Name: "6"},
}
for _, p := range plans {
store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil)
}

now := int64(5)
states := []*raft_log.CompactionJobState{
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0},
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0},
{Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0},
// Jobs with expired leases (now > LeaseExpiresAt).
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1},
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1},
{Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1},
// This job can't be reassigned as its lease is still valid.
{Name: "4", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 10},
// The job has already failed in the past.
{Name: "5", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1, Failures: 1},
// The job has already failed in the past and exceeded the error threshold.
{Name: "6", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1, Failures: 3},
}
for _, s := range states {
scheduler.queue.put(s)
}

lease := now + int64(config.LeaseDuration)
expected := []*raft_log.CompactionJobState{
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1},
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1},
{Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1},
{Name: "5", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 2},
}

test.AssertIdempotent(t, func(t *testing.T) {
s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, 1)})
for j := range plans {
s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, now)})
assigned := make([]*raft_log.CompactionJobState, 0, len(expected))
for {
update, err := s.AssignJob()
require.NoError(t, err)
assert.Equal(t, plans[j], update.Plan)
assert.Equal(t, metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, update.State.Status)
assert.Equal(t, int64(config.LeaseDuration)+1, update.State.LeaseExpiresAt)
assert.Equal(t, uint64(2), update.State.Token)
if update == nil {
break
}
assigned = append(assigned, update.State)
}

update, err := s.AssignJob()
require.NoError(t, err)
assert.Nil(t, update)
assert.Equal(t, expected, assigned)
})
}

Expand All @@ -212,9 +232,9 @@ func TestSchedule_UpdateAssign(t *testing.T) {

scheduler := NewScheduler(config, store, nil)
plans := []*raft_log.CompactionJobPlan{
{Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}},
{Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}},
{Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}},
{Name: "1"},
{Name: "2"},
{Name: "3"},
}
for _, p := range plans {
store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil)
Expand Down Expand Up @@ -301,9 +321,9 @@ func TestSchedule_Add(t *testing.T) {

scheduler := NewScheduler(config, store, nil)
plans := []*raft_log.CompactionJobPlan{
{Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}},
{Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}},
{Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}},
{Name: "1"},
{Name: "2"},
{Name: "3"},
}

states := []*raft_log.CompactionJobState{
Expand All @@ -319,3 +339,31 @@ func TestSchedule_Add(t *testing.T) {
}
})
}

func TestSchedule_QueueSizeLimit(t *testing.T) {
store := new(mockscheduler.MockJobStore)
config := Config{
MaxFailures: 3,
LeaseDuration: 10 * time.Second,
MaxQueueSize: 2,
}

scheduler := NewScheduler(config, store, nil)
plans := []*raft_log.CompactionJobPlan{
{Name: "1"},
{Name: "2"},
{Name: "3"},
}

states := []*raft_log.CompactionJobState{
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_UNSPECIFIED, AddedAt: 1, Token: 1},
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_UNSPECIFIED, AddedAt: 1, Token: 1},
}

test.AssertIdempotent(t, func(t *testing.T) {
s := scheduler.NewSchedule(nil, &raft.Log{Index: 1, AppendedAt: time.Unix(0, 1)})
assert.Equal(t, states[0], s.AddJob(plans[0]))
assert.Equal(t, states[1], s.AddJob(plans[1]))
assert.Nil(t, s.AddJob(plans[2]))
})
}
2 changes: 2 additions & 0 deletions pkg/experiment/metastore/compaction/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ type JobStore interface {
type Config struct {
MaxFailures uint64 `yaml:"compaction_max_failures" doc:""`
LeaseDuration time.Duration `yaml:"compaction_job_lease_duration" doc:""`
MaxQueueSize uint64 `yaml:"compaction_max_job_queue_size" doc:""`
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Uint64Var(&c.MaxFailures, prefix+"compaction-max-failures", 3, "")
f.DurationVar(&c.LeaseDuration, prefix+"compaction-job-lease-duration", 15*time.Second, "")
f.Uint64Var(&c.MaxQueueSize, prefix+"compaction-max-job-queue-size", 2000, "")
}

type Scheduler struct {
Expand Down
14 changes: 13 additions & 1 deletion pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ func (q *schedulerQueue) delete(name string) *raft_log.CompactionJobState {
return nil
}

func (q *schedulerQueue) size() int {
var size int
for _, level := range q.levels {
if level != nil {
size += level.jobs.Len()
}
}
return size
}

func (q *schedulerQueue) level(x uint32) *jobQueue {
s := x + 1 // Levels are 0-based.
if s >= uint32(len(q.levels)) {
Expand Down Expand Up @@ -136,7 +146,9 @@ func compareJobs(a, b *jobEntry) int {
if a.Status != b.Status {
return int(a.Status) - int(b.Status)
}
// Faulty jobs should wait.
// Faulty jobs should wait. Our aim is to put them at the
// end of the queue, after all the jobs we may consider
// for assigment.
if a.Failures != b.Failures {
return int(a.Failures) - int(b.Failures)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/experiment/metastore/compaction_raft_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,14 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate(
// No more jobs to create.
break
}
state := scheduler.AddJob(plan)
if state == nil {
// Scheduler declined the job. The only case when this may happen
// is when the scheduler queue is full.
break
}
p.NewJobs = append(p.NewJobs, &raft_log.NewCompactionJob{
State: scheduler.AddJob(plan),
State: state,
Plan: plan,
})
}
Expand Down

0 comments on commit 0736088

Please sign in to comment.