diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index a14a0ff556a..c504eafd16f 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -20,7 +20,6 @@ import ( "net/http" "strconv" "sync" - "sync/atomic" "time" "github.com/pingcap/errors" @@ -31,10 +30,9 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/checker" sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/diagnostic" "github.com/tikv/pd/pkg/schedule/hbstream" - "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/schedule/splitter" @@ -51,7 +49,6 @@ const ( checkSuspectRangesInterval = 100 * time.Millisecond collectFactor = 0.9 collectTimeout = 5 * time.Minute - maxScheduleRetries = 10 maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond @@ -65,9 +62,8 @@ const ( var ( // WithLabelValues is a heavy operation, define variable to avoid call it every time. - waitingListGauge = regionListGauge.WithLabelValues("waiting_list") - priorityListGauge = regionListGauge.WithLabelValues("priority_list") - denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") + waitingListGauge = regionListGauge.WithLabelValues("waiting_list") + priorityListGauge = regionListGauge.WithLabelValues("priority_list") ) // Coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled. @@ -82,18 +78,18 @@ type Coordinator struct { checkers *checker.Controller regionScatterer *scatter.RegionScatterer regionSplitter *splitter.RegionSplitter - schedulers map[string]*scheduleController + schedulers map[string]*schedulers.ScheduleController opController *operator.Controller hbStreams *hbstream.HeartbeatStreams pluginInterface *PluginInterface - diagnosticManager *diagnosticManager + diagnosticManager *diagnostic.Manager } // NewCoordinator creates a new Coordinator. func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator { ctx, cancel := context.WithCancel(ctx) opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams) - schedulers := make(map[string]*scheduleController) + schedulers := make(map[string]*schedulers.ScheduleController) c := &Coordinator{ ctx: ctx, cancel: cancel, @@ -107,7 +103,7 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams hbStreams: hbStreams, pluginInterface: NewPluginInterface(), } - c.diagnosticManager = newDiagnosticManager(c, cluster.GetPersistOptions()) + c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetPersistOptions()) return c } @@ -679,7 +675,7 @@ func (c *Coordinator) AddScheduler(scheduler schedulers.Scheduler, args ...strin return errs.ErrSchedulerExisted.FastGenByArgs() } - s := NewScheduleController(c, scheduler) + s := schedulers.NewScheduleController(c.ctx, c.cluster, c.opController, scheduler) if err := s.Scheduler.Prepare(c.cluster); err != nil { return err } @@ -756,7 +752,7 @@ func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error { if c.cluster == nil { return errs.ErrNotBootstrapped.FastGenByArgs() } - var s []*scheduleController + var s []*schedulers.ScheduleController if name != "all" { sc, ok := c.schedulers[name] if !ok { @@ -775,8 +771,7 @@ func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error { delayAt = time.Now().Unix() delayUntil = delayAt + t } - atomic.StoreInt64(&sc.delayAt, delayAt) - atomic.StoreInt64(&sc.delayUntil, delayUntil) + sc.SetDelay(delayAt, delayUntil) } return err } @@ -844,7 +839,7 @@ func (c *Coordinator) IsSchedulerExisted(name string) (bool, error) { return true, nil } -func (c *Coordinator) runScheduler(s *scheduleController) { +func (c *Coordinator) runScheduler(s *schedulers.ScheduleController) { defer logutil.LogPanic() defer c.wg.Done() defer s.Scheduler.Cleanup(c.cluster) @@ -854,7 +849,7 @@ func (c *Coordinator) runScheduler(s *scheduleController) { for { select { case <-ticker.C: - diagnosable := s.diagnosticRecorder.isAllowed() + diagnosable := s.IsDiagnosticAllowed() if !s.AllowSchedule(diagnosable) { continue } @@ -948,8 +943,10 @@ func (c *Coordinator) GetCluster() sche.ClusterInformer { } // GetDiagnosticResult returns the diagnostic result. -func (c *Coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error) { - return c.diagnosticManager.getDiagnosticResult(name) +func (c *Coordinator) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) { + c.RLock() + defer c.RUnlock() + return c.diagnosticManager.GetDiagnosticResult(name) } // RecordOpStepWithTTL records OpStep with TTL @@ -957,148 +954,6 @@ func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) { c.GetRuleChecker().RecordRegionPromoteToNonWitness(regionID) } -// scheduleController is used to manage a scheduler to schedulers. -type scheduleController struct { - schedulers.Scheduler - cluster sche.ScheduleCluster - opController *operator.Controller - nextInterval time.Duration - ctx context.Context - cancel context.CancelFunc - delayAt int64 - delayUntil int64 - diagnosticRecorder *diagnosticRecorder -} - -// NewScheduleController creates a new scheduleController. -func NewScheduleController(c *Coordinator, s schedulers.Scheduler) *scheduleController { - ctx, cancel := context.WithCancel(c.ctx) - return &scheduleController{ - Scheduler: s, - cluster: c.cluster, - opController: c.opController, - nextInterval: s.GetMinInterval(), - ctx: ctx, - cancel: cancel, - diagnosticRecorder: c.diagnosticManager.getRecorder(s.GetName()), - } -} - -func (s *scheduleController) Ctx() context.Context { - return s.ctx -} - -func (s *scheduleController) Stop() { - s.cancel() -} - -func (s *scheduleController) Schedule(diagnosable bool) []*operator.Operator { - for i := 0; i < maxScheduleRetries; i++ { - // no need to retry if schedule should stop to speed exit - select { - case <-s.ctx.Done(): - return nil - default: - } - cacheCluster := newCacheCluster(s.cluster) - // we need only process diagnostic once in the retry loop - diagnosable = diagnosable && i == 0 - ops, plans := s.Scheduler.Schedule(cacheCluster, diagnosable) - if diagnosable { - s.diagnosticRecorder.setResultFromPlans(ops, plans) - } - foundDisabled := false - for _, op := range ops { - if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil { - region := s.cluster.GetRegion(op.RegionID()) - if region == nil { - continue - } - if labelMgr.ScheduleDisabled(region) { - denySchedulersByLabelerCounter.Inc() - foundDisabled = true - break - } - } - } - if len(ops) > 0 { - // If we have schedule, reset interval to the minimal interval. - s.nextInterval = s.Scheduler.GetMinInterval() - // try regenerating operators - if foundDisabled { - continue - } - return ops - } - } - s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval) - return nil -} - -func (s *scheduleController) DiagnoseDryRun() ([]*operator.Operator, []plan.Plan) { - cacheCluster := newCacheCluster(s.cluster) - return s.Scheduler.Schedule(cacheCluster, true) -} - -// GetInterval returns the interval of scheduling for a scheduler. -func (s *scheduleController) GetInterval() time.Duration { - return s.nextInterval -} - -// SetInterval sets the interval of scheduling for a scheduler. for test purpose. -func (s *scheduleController) SetInterval(interval time.Duration) { - s.nextInterval = interval -} - -// AllowSchedule returns if a scheduler is allowed to schedulers. -func (s *scheduleController) AllowSchedule(diagnosable bool) bool { - if !s.Scheduler.IsScheduleAllowed(s.cluster) { - if diagnosable { - s.diagnosticRecorder.setResultFromStatus(pending) - } - return false - } - if s.isSchedulingHalted() { - if diagnosable { - s.diagnosticRecorder.setResultFromStatus(halted) - } - return false - } - if s.IsPaused() { - if diagnosable { - s.diagnosticRecorder.setResultFromStatus(paused) - } - return false - } - return true -} - -func (s *scheduleController) isSchedulingHalted() bool { - return s.cluster.GetOpts().IsSchedulingHalted() -} - -// isPaused returns if a scheduler is paused. -func (s *scheduleController) IsPaused() bool { - delayUntil := atomic.LoadInt64(&s.delayUntil) - return time.Now().Unix() < delayUntil -} - -// getDelayAt returns paused timestamp of a paused scheduler -func (s *scheduleController) getDelayAt() int64 { - if s.IsPaused() { - return atomic.LoadInt64(&s.delayAt) - } - return 0 -} - -// getDelayUntil returns resume timestamp of a paused scheduler -func (s *scheduleController) getDelayUntil() int64 { - if s.IsPaused() { - return atomic.LoadInt64(&s.delayUntil) - } - return 0 -} - // GetPausedSchedulerDelayAt returns paused timestamp of a paused scheduler func (c *Coordinator) GetPausedSchedulerDelayAt(name string) (int64, error) { c.RLock() @@ -1110,7 +965,7 @@ func (c *Coordinator) GetPausedSchedulerDelayAt(name string) (int64, error) { if !ok { return -1, errs.ErrSchedulerNotFound.FastGenByArgs() } - return s.getDelayAt(), nil + return s.GetDelayAt(), nil } // GetPausedSchedulerDelayUntil returns the delay time until the scheduler is paused. @@ -1124,7 +979,7 @@ func (c *Coordinator) GetPausedSchedulerDelayUntil(name string) (int64, error) { if !ok { return -1, errs.ErrSchedulerNotFound.FastGenByArgs() } - return s.getDelayUntil(), nil + return s.GetDelayUntil(), nil } // CheckTransferWitnessLeader determines if transfer leader is required, then sends to the scheduler if needed @@ -1142,22 +997,3 @@ func (c *Coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) { } } } - -// cacheCluster include cache info to improve the performance. -type cacheCluster struct { - sche.ScheduleCluster - stores []*core.StoreInfo -} - -// GetStores returns store infos from cache -func (c *cacheCluster) GetStores() []*core.StoreInfo { - return c.stores -} - -// newCacheCluster constructor for cache -func newCacheCluster(c sche.ScheduleCluster) *cacheCluster { - return &cacheCluster{ - ScheduleCluster: c, - stores: c.GetStores(), - } -} diff --git a/pkg/schedule/diagnostic/diagnostic_manager.go b/pkg/schedule/diagnostic/diagnostic_manager.go new file mode 100644 index 00000000000..6f865876fab --- /dev/null +++ b/pkg/schedule/diagnostic/diagnostic_manager.go @@ -0,0 +1,79 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diagnostic + +import ( + "time" + + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/server/config" +) + +// Manager is used to manage the diagnostic result of schedulers for now. +type Manager struct { + config *config.PersistOptions + schedulerController map[string]*schedulers.ScheduleController +} + +// NewManager creates a new Manager. +func NewManager(schedulerController map[string]*schedulers.ScheduleController, config *config.PersistOptions) *Manager { + return &Manager{ + config: config, + schedulerController: schedulerController, + } +} + +// GetDiagnosticResult gets the diagnostic result of the scheduler. +func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) { + if !d.config.IsDiagnosticAllowed() { + return nil, errs.ErrDiagnosticDisabled + } + + scheduler, isSchedulerExisted := d.schedulerController[name] + if !isSchedulerExisted { + ts := uint64(time.Now().Unix()) + res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled} + return res, nil + } + var isDisabled bool + t := scheduler.Scheduler.GetType() + scheduleConfig := d.config.GetScheduleConfig() + for _, s := range scheduleConfig.Schedulers { + if t == s.Type { + isDisabled = s.Disable + break + } + } + if isDisabled { + ts := uint64(time.Now().Unix()) + res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled} + return res, nil + } + + recorder := d.getSchedulerRecorder(name) + if recorder == nil { + return nil, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name) + } + result := recorder.GetLastResult() + if result == nil { + return nil, errs.ErrNoDiagnosticResult.FastGenByArgs(name) + } + return result, nil +} + +func (d *Manager) getSchedulerRecorder(name string) *schedulers.DiagnosticRecorder { + return d.schedulerController[name].GetDiagnosticRecorder() +} diff --git a/pkg/schedule/diagnostic_manager.go b/pkg/schedule/schedulers/diagnostic_recorder.go similarity index 57% rename from pkg/schedule/diagnostic_manager.go rename to pkg/schedule/schedulers/diagnostic_recorder.go index 3f83d13baad..49c775c2b08 100644 --- a/pkg/schedule/diagnostic_manager.go +++ b/pkg/schedule/schedulers/diagnostic_recorder.go @@ -1,4 +1,4 @@ -// Copyright 2022 TiKV Project Authors. +// Copyright 2023 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,112 +12,60 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package schedulers import ( "fmt" "time" - "github.com/pingcap/log" "github.com/tikv/pd/pkg/cache" - "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/movingaverage" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" - "github.com/tikv/pd/pkg/schedule/schedulers" - "go.uber.org/zap" ) const ( - // disabled means the current scheduler is unavailable or removed - disabled = "disabled" - // paused means the current scheduler is paused - paused = "paused" - // halted means the current scheduler is halted - halted = "halted" - // scheduling means the current scheduler is generating. - scheduling = "scheduling" - // pending means the current scheduler cannot generate scheduling operator - pending = "pending" - // normal means that there is no need to create operators since everything is fine. - normal = "normal" + maxDiagnosticResultNum = 10 ) const ( - maxDiagnosticResultNum = 10 + // Disabled means the current scheduler is unavailable or removed + Disabled = "disabled" + // Paused means the current scheduler is paused + Paused = "paused" + // Halted means the current scheduler is halted + Halted = "halted" + // Scheduling means the current scheduler is generating. + Scheduling = "scheduling" + // Pending means the current scheduler cannot generate scheduling operator + Pending = "pending" + // Normal means that there is no need to create operators since everything is fine. + Normal = "normal" ) // DiagnosableSummaryFunc includes all implementations of plan.Summary. // And it also includes all schedulers which pd support to diagnose. var DiagnosableSummaryFunc = map[string]plan.Summary{ - schedulers.BalanceRegionName: plan.BalancePlanSummary, - schedulers.BalanceLeaderName: plan.BalancePlanSummary, -} - -type diagnosticManager struct { - coordinator *Coordinator - config sc.Config - recorders map[string]*diagnosticRecorder -} - -func newDiagnosticManager(coordinator *Coordinator, config sc.Config) *diagnosticManager { - recorders := make(map[string]*diagnosticRecorder) - for name := range DiagnosableSummaryFunc { - recorders[name] = newDiagnosticRecorder(name, coordinator, config) - } - return &diagnosticManager{ - coordinator: coordinator, - config: config, - recorders: recorders, - } -} - -func (d *diagnosticManager) getDiagnosticResult(name string) (*DiagnosticResult, error) { - if !d.config.IsDiagnosticAllowed() { - return nil, errs.ErrDiagnosticDisabled - } - - isSchedulerExisted, _ := d.coordinator.IsSchedulerExisted(name) - isDisabled, _ := d.coordinator.IsSchedulerDisabled(name) - if !isSchedulerExisted || isDisabled { - ts := uint64(time.Now().Unix()) - res := &DiagnosticResult{Name: name, Timestamp: ts, Status: disabled} - return res, nil - } - - recorder := d.getRecorder(name) - if recorder == nil { - return nil, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name) - } - result := recorder.getLastResult() - if result == nil { - return nil, errs.ErrNoDiagnosticResult.FastGenByArgs(name) - } - return result, nil -} - -func (d *diagnosticManager) getRecorder(name string) *diagnosticRecorder { - return d.recorders[name] + BalanceRegionName: plan.BalancePlanSummary, + BalanceLeaderName: plan.BalancePlanSummary, } -// diagnosticRecorder is used to manage diagnostic for one scheduler. -type diagnosticRecorder struct { +// DiagnosticRecorder is used to manage diagnostic for one scheduler. +type DiagnosticRecorder struct { schedulerName string - coordinator *Coordinator config sc.Config summaryFunc plan.Summary results *cache.FIFO } -func newDiagnosticRecorder(name string, coordinator *Coordinator, config sc.Config) *diagnosticRecorder { +// NewDiagnosticRecorder creates a new DiagnosticRecorder. +func NewDiagnosticRecorder(name string, config sc.Config) *DiagnosticRecorder { summaryFunc, ok := DiagnosableSummaryFunc[name] if !ok { - log.Error("can't find summary function", zap.String("scheduler-name", name)) return nil } - return &diagnosticRecorder{ - coordinator: coordinator, + return &DiagnosticRecorder{ schedulerName: name, config: config, summaryFunc: summaryFunc, @@ -125,14 +73,16 @@ func newDiagnosticRecorder(name string, coordinator *Coordinator, config sc.Conf } } -func (d *diagnosticRecorder) isAllowed() bool { +// IsAllowed is used to check whether the diagnostic is allowed. +func (d *DiagnosticRecorder) IsAllowed() bool { if d == nil { return false } return d.config.IsDiagnosticAllowed() } -func (d *diagnosticRecorder) getLastResult() *DiagnosticResult { +// GetLastResult is used to get the last diagnostic result. +func (d *DiagnosticRecorder) GetLastResult() *DiagnosticResult { if d.results.Len() == 0 { return nil } @@ -150,7 +100,7 @@ func (d *diagnosticRecorder) getLastResult() *DiagnosticResult { var resStr string firstStatus := items[0].Value.(*DiagnosticResult).Status - if firstStatus == pending || firstStatus == normal { + if firstStatus == Pending || firstStatus == Normal { wa := movingaverage.NewWeightAllocator(length, 3) counter := make(map[uint64]map[plan.Status]float64) for i := 0; i < length; i++ { @@ -179,7 +129,7 @@ func (d *diagnosticRecorder) getLastResult() *DiagnosticResult { for k, v := range statusCounter { resStr += fmt.Sprintf("%d store(s) %s; ", v, k.String()) } - } else if firstStatus == pending { + } else if firstStatus == Pending { // This is used to handle pending status because of reach limit in `IsScheduleAllowed` resStr = fmt.Sprintf("%s reach limit", d.schedulerName) } @@ -192,7 +142,8 @@ func (d *diagnosticRecorder) getLastResult() *DiagnosticResult { } } -func (d *diagnosticRecorder) setResultFromStatus(status string) { +// SetResultFromStatus is used to set result from status. +func (d *DiagnosticRecorder) SetResultFromStatus(status string) { if d == nil { return } @@ -200,7 +151,8 @@ func (d *diagnosticRecorder) setResultFromStatus(status string) { d.results.Put(result.Timestamp, result) } -func (d *diagnosticRecorder) setResultFromPlans(ops []*operator.Operator, plans []plan.Plan) { +// SetResultFromPlans is used to set result from plans. +func (d *DiagnosticRecorder) SetResultFromPlans(ops []*operator.Operator, plans []plan.Plan) { if d == nil { return } @@ -208,22 +160,22 @@ func (d *diagnosticRecorder) setResultFromPlans(ops []*operator.Operator, plans d.results.Put(result.Timestamp, result) } -func (d *diagnosticRecorder) analyze(ops []*operator.Operator, plans []plan.Plan, ts uint64) *DiagnosticResult { - res := &DiagnosticResult{Name: d.schedulerName, Timestamp: ts, Status: normal} +func (d *DiagnosticRecorder) analyze(ops []*operator.Operator, plans []plan.Plan, ts uint64) *DiagnosticResult { + res := &DiagnosticResult{Name: d.schedulerName, Timestamp: ts, Status: Normal} name := d.schedulerName // TODO: support more schedulers and checkers switch name { - case schedulers.BalanceRegionName, schedulers.BalanceLeaderName: + case BalanceRegionName, BalanceLeaderName: if len(ops) != 0 { - res.Status = scheduling + res.Status = Scheduling return res } - res.Status = pending + res.Status = Pending if d.summaryFunc != nil { isAllNormal := false res.StoreStatus, isAllNormal, _ = d.summaryFunc(plans) if isAllNormal { - res.Status = normal + res.Status = Normal } } return res diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go new file mode 100644 index 00000000000..6c9db5888bb --- /dev/null +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -0,0 +1,212 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "context" + "sync/atomic" + "time" + + "github.com/tikv/pd/pkg/core" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" +) + +const maxScheduleRetries = 10 + +var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") + +// ScheduleController is used to manage a scheduler to +type ScheduleController struct { + Scheduler + cluster sche.ScheduleCluster + opController *operator.Controller + nextInterval time.Duration + ctx context.Context + cancel context.CancelFunc + delayAt int64 + delayUntil int64 + diagnosticRecorder *DiagnosticRecorder +} + +// NewScheduleController creates a new ScheduleController. +func NewScheduleController(ctx context.Context, cluster sche.ScheduleCluster, opController *operator.Controller, s Scheduler) *ScheduleController { + ctx, cancel := context.WithCancel(ctx) + return &ScheduleController{ + Scheduler: s, + cluster: cluster, + opController: opController, + nextInterval: s.GetMinInterval(), + ctx: ctx, + cancel: cancel, + diagnosticRecorder: NewDiagnosticRecorder(s.GetName(), cluster.GetOpts()), + } +} + +// Ctx returns the context of ScheduleController +func (s *ScheduleController) Ctx() context.Context { + return s.ctx +} + +// Stop stops the ScheduleController +func (s *ScheduleController) Stop() { + s.cancel() +} + +// Schedule tries to create some operators. +func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator { + for i := 0; i < maxScheduleRetries; i++ { + // no need to retry if schedule should stop to speed exit + select { + case <-s.ctx.Done(): + return nil + default: + } + cacheCluster := newCacheCluster(s.cluster) + // we need only process diagnostic once in the retry loop + diagnosable = diagnosable && i == 0 + ops, plans := s.Scheduler.Schedule(cacheCluster, diagnosable) + if diagnosable { + s.diagnosticRecorder.SetResultFromPlans(ops, plans) + } + foundDisabled := false + for _, op := range ops { + if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil { + region := s.cluster.GetRegion(op.RegionID()) + if region == nil { + continue + } + if labelMgr.ScheduleDisabled(region) { + denySchedulersByLabelerCounter.Inc() + foundDisabled = true + break + } + } + } + if len(ops) > 0 { + // If we have schedule, reset interval to the minimal interval. + s.nextInterval = s.Scheduler.GetMinInterval() + // try regenerating operators + if foundDisabled { + continue + } + return ops + } + } + s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval) + return nil +} + +// DiagnoseDryRun returns the operators and plans of a scheduler. +func (s *ScheduleController) DiagnoseDryRun() ([]*operator.Operator, []plan.Plan) { + cacheCluster := newCacheCluster(s.cluster) + return s.Scheduler.Schedule(cacheCluster, true) +} + +// GetInterval returns the interval of scheduling for a scheduler. +func (s *ScheduleController) GetInterval() time.Duration { + return s.nextInterval +} + +// SetInterval sets the interval of scheduling for a scheduler. for test purpose. +func (s *ScheduleController) SetInterval(interval time.Duration) { + s.nextInterval = interval +} + +// AllowSchedule returns if a scheduler is allowed to +func (s *ScheduleController) AllowSchedule(diagnosable bool) bool { + if !s.Scheduler.IsScheduleAllowed(s.cluster) { + if diagnosable { + s.diagnosticRecorder.SetResultFromStatus(Pending) + } + return false + } + if s.isSchedulingHalted() { + if diagnosable { + s.diagnosticRecorder.SetResultFromStatus(Halted) + } + return false + } + if s.IsPaused() { + if diagnosable { + s.diagnosticRecorder.SetResultFromStatus(Paused) + } + return false + } + return true +} + +func (s *ScheduleController) isSchedulingHalted() bool { + return s.cluster.GetOpts().IsSchedulingHalted() +} + +// IsPaused returns if a scheduler is paused. +func (s *ScheduleController) IsPaused() bool { + delayUntil := atomic.LoadInt64(&s.delayUntil) + return time.Now().Unix() < delayUntil +} + +// GetDelayAt returns paused timestamp of a paused scheduler +func (s *ScheduleController) GetDelayAt() int64 { + if s.IsPaused() { + return atomic.LoadInt64(&s.delayAt) + } + return 0 +} + +// GetDelayUntil returns resume timestamp of a paused scheduler +func (s *ScheduleController) GetDelayUntil() int64 { + if s.IsPaused() { + return atomic.LoadInt64(&s.delayUntil) + } + return 0 +} + +// SetDelay sets the delay of a scheduler. +func (s *ScheduleController) SetDelay(delayAt, delayUntil int64) { + atomic.StoreInt64(&s.delayAt, delayAt) + atomic.StoreInt64(&s.delayUntil, delayUntil) +} + +// GetDiagnosticRecorder returns the diagnostic recorder of a scheduler. +func (s *ScheduleController) GetDiagnosticRecorder() *DiagnosticRecorder { + return s.diagnosticRecorder +} + +// IsDiagnosticAllowed returns if a scheduler is allowed to do diagnostic. +func (s *ScheduleController) IsDiagnosticAllowed() bool { + return s.diagnosticRecorder.IsAllowed() +} + +// cacheCluster include cache info to improve the performance. +type cacheCluster struct { + sche.ScheduleCluster + stores []*core.StoreInfo +} + +// GetStores returns store infos from cache +func (c *cacheCluster) GetStores() []*core.StoreInfo { + return c.stores +} + +// newCacheCluster constructor for cache +func newCacheCluster(c sche.ScheduleCluster) *cacheCluster { + return &cacheCluster{ + ScheduleCluster: c, + stores: c.GetStores(), + } +} diff --git a/server/api/diagnostic.go b/server/api/diagnostic.go index 7de60b82b10..f83f9c83efb 100644 --- a/server/api/diagnostic.go +++ b/server/api/diagnostic.go @@ -19,7 +19,7 @@ import ( "github.com/gorilla/mux" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -38,7 +38,7 @@ func newDiagnosticHandler(svr *server.Server, rd *render.Render) *diagnosticHand func (h *diagnosticHandler) GetDiagnosticResult(w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] - if _, ok := schedule.DiagnosableSummaryFunc[name]; !ok { + if _, ok := schedulers.DiagnosableSummaryFunc[name]; !ok { h.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name).Error()) return } diff --git a/server/api/diagnostic_test.go b/server/api/diagnostic_test.go index fb5946f5f94..8a39b2e0007 100644 --- a/server/api/diagnostic_test.go +++ b/server/api/diagnostic_test.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/utils/apiutil" tu "github.com/tikv/pd/pkg/utils/testutil" @@ -66,7 +65,7 @@ func (suite *diagnosticTestSuite) TearDownSuite() { func (suite *diagnosticTestSuite) checkStatus(status string, url string) { re := suite.Require() suite.Eventually(func() bool { - result := &schedule.DiagnosticResult{} + result := &schedulers.DiagnosticResult{} err := tu.ReadGetJSON(re, testDialClient, url, result) suite.NoError(err) return result.Status == status @@ -95,7 +94,7 @@ func (suite *diagnosticTestSuite) TestSchedulerDiagnosticAPI() { suite.True(cfg.Schedule.EnableDiagnostic) balanceRegionURL := suite.urlPrefix + "/" + schedulers.BalanceRegionName - result := &schedule.DiagnosticResult{} + result := &schedulers.DiagnosticResult{} err = tu.ReadGetJSON(re, testDialClient, balanceRegionURL, result) suite.NoError(err) suite.Equal("disabled", result.Status) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index dea913f3d47..e0cab979859 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3397,7 +3397,7 @@ func TestController(t *testing.T) { kind: operator.OpLeader, } - sc := schedule.NewScheduleController(co, lb) + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) { re.Equal(i, sc.GetInterval()) @@ -3472,12 +3472,12 @@ func TestController(t *testing.T) { func TestInterval(t *testing.T) { re := require.New(t) - _, co, cleanup := prepare(nil, nil, nil, re) + tc, co, cleanup := prepare(nil, nil, nil, re) defer cleanup() lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) - sc := schedule.NewScheduleController(co, lb) + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) // If no operator for x seconds, the next check should be in x/2 seconds. idleSeconds := []int{5, 10, 20, 30, 60} diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index db5ea2efe72..651eb11dbb2 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -117,7 +117,6 @@ func TestScheduler(t *testing.T) { mightExec([]string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) return len(result) != 0 }, testutil.WithTickInterval(50*time.Millisecond)) - re.Equal(expectedStatus, result["status"]) re.Equal(expectedSummary, result["summary"]) }