diff --git a/pkg/experiment/metastore/compaction/compaction.go b/pkg/experiment/metastore/compaction/compaction.go index 3240b5bb08..626acc7792 100644 --- a/pkg/experiment/metastore/compaction/compaction.go +++ b/pkg/experiment/metastore/compaction/compaction.go @@ -55,5 +55,6 @@ type Schedule interface { // AssignJob is called on behalf of the worker to request a new job. AssignJob() (*raft_log.AssignedCompactionJob, error) // AddJob is called on behalf of the planner to add a new job to the schedule. + // The scheduler may decline the job by returning a nil state. AddJob(*raft_log.CompactionJobPlan) *raft_log.CompactionJobState } diff --git a/pkg/experiment/metastore/compaction/compactor/compactor_strategy.go b/pkg/experiment/metastore/compaction/compactor/compactor_strategy.go index 551b523bbf..2f3f951399 100644 --- a/pkg/experiment/metastore/compaction/compactor/compactor_strategy.go +++ b/pkg/experiment/metastore/compaction/compactor/compactor_strategy.go @@ -30,7 +30,7 @@ func DefaultStrategy() Strategy { MaxBlocksPerLevel: []uint{20, 10, 10}, MaxBlocksDefault: 10, MaxLevel: 3, - MaxBatchAge: 3 * time.Minute.Nanoseconds(), //defaultMaxBlockBatchAge, + MaxBatchAge: defaultMaxBlockBatchAge, CleanupBatchSize: 2, CleanupDelay: 15 * time.Minute, CleanupJobMaxLevel: 1, diff --git a/pkg/experiment/metastore/compaction/scheduler/schedule.go b/pkg/experiment/metastore/compaction/scheduler/schedule.go index 8773d6a469..e96e21c53a 100644 --- a/pkg/experiment/metastore/compaction/scheduler/schedule.go +++ b/pkg/experiment/metastore/compaction/scheduler/schedule.go @@ -21,7 +21,8 @@ type schedule struct { // Read-only. scheduler *Scheduler // Uncommitted schedule updates. - updates map[string]*raft_log.CompactionJobState + updates map[string]*raft_log.CompactionJobState + addedJobs int // Modified copy of the job queue. copied []priorityJobQueue level int @@ -90,10 +91,29 @@ func (p *schedule) newStateForStatusReport(status *raft_log.CompactionJobStatusU return nil } -// AddJob creates a state for the new plan. The method must be called -// after the last AssignJob and UpdateJob calls. +// AddJob creates a state for the newly planned job. +// +// The method must be called after the last AssignJob and UpdateJob calls. +// It returns an empty state if the queue size limit is reached. +// +// TODO(kolesnikovae): Implement displacement policy. +// When the scheduler queue is full, no new jobs can be added. Currently, +// it's possible that all jobs fail and can't be retried, and consequently, +// can't leave the queue, blocking the entire compaction process until the +// failure or queue limit is increased. Additionally, it's possible for a +// job to never be completed and thus remain in the queue indefinitely. +// +// One way to implement this is to evict the job with the highest number of +// failures (exceeding a configurable threshold, in addition to MaxFailures). +// This way, we can easily remove the job least likely to succeed. +// However, this needs to be handled explicitly in UpdateSchedule; at this +// point, we can only identify candidates for eviction. func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.CompactionJobState { - // TODO(kolesnikovae): Job queue size limit. + if limit := p.scheduler.config.MaxQueueSize; limit > 0 { + if size := uint64(p.addedJobs + p.scheduler.queue.size()); size >= limit { + return nil + } + } state := &raft_log.CompactionJobState{ Name: plan.Name, CompactionLevel: plan.CompactionLevel, @@ -102,6 +122,7 @@ func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.Compaction Token: p.token, } p.updates[state.Name] = state + p.addedJobs++ return state } @@ -109,6 +130,11 @@ func (p *schedule) nextAssignment() *raft_log.CompactionJobState { // We don't need to check the job ownership here: the worker asks // for a job assigment (new ownership). for p.level < len(p.scheduler.queue.levels) { + // We evict the job from our copy of the queue: each job is only + // accessible once. When we reach the bottom of the queue (the first + // failed job, or the last job in the queue), we move to the next + // level. Note that we check all in-progress jobs if there are not + // enough unassigned jobs in the queue. pq := p.queueLevelCopy(p.level) if pq.Len() == 0 { p.level++ @@ -130,15 +156,17 @@ func (p *schedule) nextAssignment() *raft_log.CompactionJobState { return p.assignJob(job) case metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS: - if p.shouldReassign(job) { + if p.isFailed(job) { + // We reached the bottom of the queue: only failed jobs left. + p.level++ + continue + } + if p.isAbandoned(job) { state := p.assignJob(job) state.Failures++ return state } } - - // If no jobs can be assigned at this level. - p.level++ } return nil @@ -156,11 +184,13 @@ func (p *schedule) assignJob(e *jobEntry) *raft_log.CompactionJobState { return job } -func (p *schedule) shouldReassign(job *jobEntry) bool { - abandoned := p.now.UnixNano() > job.LeaseExpiresAt +func (p *schedule) isAbandoned(job *jobEntry) bool { + return p.now.UnixNano() > job.LeaseExpiresAt +} + +func (p *schedule) isFailed(job *jobEntry) bool { limit := p.scheduler.config.MaxFailures - faulty := limit > 0 && uint64(job.Failures) >= limit - return abandoned && !faulty + return limit > 0 && uint64(job.Failures) >= limit } // The queue must not be modified by the assigner. Therefore, we're copying the diff --git a/pkg/experiment/metastore/compaction/scheduler/schedule_test.go b/pkg/experiment/metastore/compaction/scheduler/schedule_test.go index 6d24bca6e1..235e1e87d7 100644 --- a/pkg/experiment/metastore/compaction/scheduler/schedule_test.go +++ b/pkg/experiment/metastore/compaction/scheduler/schedule_test.go @@ -123,10 +123,12 @@ func TestSchedule_Assign(t *testing.T) { } scheduler := NewScheduler(config, store, nil) + // The job plans are accessed when it's getting assigned. + // Their content is not important for the test. plans := []*raft_log.CompactionJobPlan{ - {Name: "2", Tenant: "A", Shard: 1, CompactionLevel: 0, SourceBlocks: []string{"d", "e", "f"}}, - {Name: "3", Tenant: "A", Shard: 1, CompactionLevel: 0, SourceBlocks: []string{"j", "h", "i"}}, - {Name: "1", Tenant: "A", Shard: 1, CompactionLevel: 1, SourceBlocks: []string{"a", "b", "c"}}, + {Name: "2", CompactionLevel: 0}, + {Name: "3", CompactionLevel: 0}, + {Name: "1", CompactionLevel: 1}, } for _, p := range plans { store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil) @@ -169,37 +171,55 @@ func TestSchedule_ReAssign(t *testing.T) { scheduler := NewScheduler(config, store, nil) plans := []*raft_log.CompactionJobPlan{ - {Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}}, - {Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}}, - {Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}}, + {Name: "1"}, + {Name: "2"}, + {Name: "3"}, + {Name: "4"}, + {Name: "5"}, + {Name: "6"}, } for _, p := range plans { store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil) } + now := int64(5) states := []*raft_log.CompactionJobState{ - {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0}, - {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0}, - {Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0}, + // Jobs with expired leases (now > LeaseExpiresAt). + {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1}, + {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1}, + {Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1}, + // This job can't be reassigned as its lease is still valid. + {Name: "4", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 10}, + // The job has already failed in the past. + {Name: "5", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1, Failures: 1}, + // The job has already failed in the past and exceeded the error threshold. + {Name: "6", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1, Failures: 3}, } for _, s := range states { scheduler.queue.put(s) } + lease := now + int64(config.LeaseDuration) + expected := []*raft_log.CompactionJobState{ + {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1}, + {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1}, + {Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1}, + {Name: "5", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 2}, + } + test.AssertIdempotent(t, func(t *testing.T) { - s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, 1)}) - for j := range plans { + s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, now)}) + assigned := make([]*raft_log.CompactionJobState, 0, len(expected)) + for { update, err := s.AssignJob() require.NoError(t, err) - assert.Equal(t, plans[j], update.Plan) - assert.Equal(t, metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, update.State.Status) - assert.Equal(t, int64(config.LeaseDuration)+1, update.State.LeaseExpiresAt) - assert.Equal(t, uint64(2), update.State.Token) + if update == nil { + break + } + assigned = append(assigned, update.State) } - update, err := s.AssignJob() - require.NoError(t, err) - assert.Nil(t, update) + assert.Equal(t, expected, assigned) }) } @@ -212,9 +232,9 @@ func TestSchedule_UpdateAssign(t *testing.T) { scheduler := NewScheduler(config, store, nil) plans := []*raft_log.CompactionJobPlan{ - {Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}}, - {Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}}, - {Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}}, + {Name: "1"}, + {Name: "2"}, + {Name: "3"}, } for _, p := range plans { store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil) @@ -301,9 +321,9 @@ func TestSchedule_Add(t *testing.T) { scheduler := NewScheduler(config, store, nil) plans := []*raft_log.CompactionJobPlan{ - {Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}}, - {Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}}, - {Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}}, + {Name: "1"}, + {Name: "2"}, + {Name: "3"}, } states := []*raft_log.CompactionJobState{ @@ -319,3 +339,31 @@ func TestSchedule_Add(t *testing.T) { } }) } + +func TestSchedule_QueueSizeLimit(t *testing.T) { + store := new(mockscheduler.MockJobStore) + config := Config{ + MaxFailures: 3, + LeaseDuration: 10 * time.Second, + MaxQueueSize: 2, + } + + scheduler := NewScheduler(config, store, nil) + plans := []*raft_log.CompactionJobPlan{ + {Name: "1"}, + {Name: "2"}, + {Name: "3"}, + } + + states := []*raft_log.CompactionJobState{ + {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_UNSPECIFIED, AddedAt: 1, Token: 1}, + {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_UNSPECIFIED, AddedAt: 1, Token: 1}, + } + + test.AssertIdempotent(t, func(t *testing.T) { + s := scheduler.NewSchedule(nil, &raft.Log{Index: 1, AppendedAt: time.Unix(0, 1)}) + assert.Equal(t, states[0], s.AddJob(plans[0])) + assert.Equal(t, states[1], s.AddJob(plans[1])) + assert.Nil(t, s.AddJob(plans[2])) + }) +} diff --git a/pkg/experiment/metastore/compaction/scheduler/scheduler.go b/pkg/experiment/metastore/compaction/scheduler/scheduler.go index ee5c175146..e1206ef58d 100644 --- a/pkg/experiment/metastore/compaction/scheduler/scheduler.go +++ b/pkg/experiment/metastore/compaction/scheduler/scheduler.go @@ -45,11 +45,13 @@ type JobStore interface { type Config struct { MaxFailures uint64 `yaml:"compaction_max_failures" doc:""` LeaseDuration time.Duration `yaml:"compaction_job_lease_duration" doc:""` + MaxQueueSize uint64 `yaml:"compaction_max_job_queue_size" doc:""` } func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Uint64Var(&c.MaxFailures, prefix+"compaction-max-failures", 3, "") f.DurationVar(&c.LeaseDuration, prefix+"compaction-job-lease-duration", 15*time.Second, "") + f.Uint64Var(&c.MaxQueueSize, prefix+"compaction-max-job-queue-size", 2000, "") } type Scheduler struct { diff --git a/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go b/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go index a3cf294cc1..21981a4542 100644 --- a/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go +++ b/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go @@ -45,6 +45,16 @@ func (q *schedulerQueue) delete(name string) *raft_log.CompactionJobState { return nil } +func (q *schedulerQueue) size() int { + var size int + for _, level := range q.levels { + if level != nil { + size += level.jobs.Len() + } + } + return size +} + func (q *schedulerQueue) level(x uint32) *jobQueue { s := x + 1 // Levels are 0-based. if s >= uint32(len(q.levels)) { @@ -136,7 +146,9 @@ func compareJobs(a, b *jobEntry) int { if a.Status != b.Status { return int(a.Status) - int(b.Status) } - // Faulty jobs should wait. + // Faulty jobs should wait. Our aim is to put them at the + // end of the queue, after all the jobs we may consider + // for assigment. if a.Failures != b.Failures { return int(a.Failures) - int(b.Failures) } diff --git a/pkg/experiment/metastore/compaction_raft_handler.go b/pkg/experiment/metastore/compaction_raft_handler.go index 927206f9e9..bd995b0903 100644 --- a/pkg/experiment/metastore/compaction_raft_handler.go +++ b/pkg/experiment/metastore/compaction_raft_handler.go @@ -117,8 +117,14 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate( // No more jobs to create. break } + state := scheduler.AddJob(plan) + if state == nil { + // Scheduler declined the job. The only case when this may happen + // is when the scheduler queue is full. + break + } p.NewJobs = append(p.NewJobs, &raft_log.NewCompactionJob{ - State: scheduler.AddJob(plan), + State: state, Plan: plan, }) }