diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index eb2240efbaf..c678c46eb2f 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -40,7 +40,7 @@ var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues( type Controller struct { cluster sche.ClusterInformer conf config.Config - opController *schedule.OperatorController + opController *operator.Controller learnerChecker *LearnerChecker replicaChecker *ReplicaChecker ruleChecker *RuleChecker @@ -54,7 +54,7 @@ type Controller struct { } // NewController create a new Controller. -func NewController(ctx context.Context, cluster sche.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller { +func NewController(ctx context.Context, cluster sche.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller { regionWaitingList := cache.NewDefaultCache(DefaultCacheSize) return &Controller{ cluster: cluster, diff --git a/pkg/schedule/checker/merge_checker_test.go b/pkg/schedule/checker/merge_checker_test.go index 28dd203d502..0da950cc77f 100644 --- a/pkg/schedule/checker/merge_checker_test.go +++ b/pkg/schedule/checker/merge_checker_test.go @@ -26,7 +26,6 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/mock/mockconfig" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" @@ -464,7 +463,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() { mc := NewMergeChecker(suite.ctx, tc, tc.GetOpts()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := schedule.NewOperatorController(suite.ctx, tc, stream) + oc := operator.NewController(suite.ctx, tc, stream) regions[2] = regions[2].Clone( core.SetPeers([]*metapb.Peer{ diff --git a/pkg/schedule/metrics.go b/pkg/schedule/metrics.go index 2e0cabadec6..74f368daf77 100644 --- a/pkg/schedule/metrics.go +++ b/pkg/schedule/metrics.go @@ -17,58 +17,6 @@ package schedule import "github.com/prometheus/client_golang/prometheus" var ( - // TODO: pre-allocate gauge metrics - operatorCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "operators_count", - Help: "Counter of schedule operators.", - }, []string{"type", "event"}) - - operatorDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "finish_operators_duration_seconds", - Help: "Bucketed histogram of processing time (s) of finished operator.", - Buckets: []float64{0.5, 1, 2, 4, 8, 16, 20, 40, 60, 90, 120, 180, 240, 300, 480, 600, 720, 900, 1200, 1800, 3600}, - }, []string{"type"}) - - operatorSizeHist = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "operator_region_size", - Help: "Bucketed histogram of the operator region size.", - Buckets: prometheus.ExponentialBuckets(1, 2, 20), // 1MB~1TB - }, []string{"type"}) - - operatorWaitCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "operators_waiting_count", - Help: "Counter of schedule waiting operators.", - }, []string{"type", "event"}) - - operatorWaitDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "waiting_operators_duration_seconds", - Help: "Bucketed histogram of waiting time (s) of operator for being promoted.", - Buckets: prometheus.ExponentialBuckets(0.01, 2, 16), - }, []string{"type"}) - - storeLimitCostCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "store_limit_cost", - Help: "limit rate cost of store.", - }, []string{"store", "limit_type"}) - scatterCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", @@ -96,13 +44,7 @@ var ( ) func init() { - prometheus.MustRegister(operatorCounter) - prometheus.MustRegister(operatorDuration) - prometheus.MustRegister(operatorWaitDuration) - prometheus.MustRegister(storeLimitCostCounter) - prometheus.MustRegister(operatorWaitCounter) prometheus.MustRegister(scatterCounter) prometheus.MustRegister(scatterDistributionCounter) - prometheus.MustRegister(operatorSizeHist) prometheus.MustRegister(LabelerEventCounter) } diff --git a/pkg/schedule/operator/metrics.go b/pkg/schedule/operator/metrics.go index 33c23e925cf..e3623286d59 100644 --- a/pkg/schedule/operator/metrics.go +++ b/pkg/schedule/operator/metrics.go @@ -35,6 +35,14 @@ var ( Help: "Counter of operator meeting limit", }, []string{"type", "name"}) + storeLimitCostCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "store_limit_cost", + Help: "limit rate cost of store.", + }, []string{"store", "limit_type"}) + // OperatorExceededStoreLimitCounter exposes the counter when operator meet exceeded store limit. OperatorExceededStoreLimitCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -43,10 +51,60 @@ var ( Name: "operator_exceeded_store_limit", Help: "Counter of operator meeting store limit", }, []string{"desc"}) + + // TODO: pre-allocate gauge metrics + operatorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "operators_count", + Help: "Counter of schedule operators.", + }, []string{"type", "event"}) + + operatorDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "finish_operators_duration_seconds", + Help: "Bucketed histogram of processing time (s) of finished operator.", + Buckets: []float64{0.5, 1, 2, 4, 8, 16, 20, 40, 60, 90, 120, 180, 240, 300, 480, 600, 720, 900, 1200, 1800, 3600}, + }, []string{"type"}) + + operatorSizeHist = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "operator_region_size", + Help: "Bucketed histogram of the operator region size.", + Buckets: prometheus.ExponentialBuckets(1, 2, 20), // 1MB~1TB + }, []string{"type"}) + + operatorWaitCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "operators_waiting_count", + Help: "Counter of schedule waiting operators.", + }, []string{"type", "event"}) + + operatorWaitDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "waiting_operators_duration_seconds", + Help: "Bucketed histogram of waiting time (s) of operator for being promoted.", + Buckets: prometheus.ExponentialBuckets(0.01, 2, 16), + }, []string{"type"}) ) func init() { prometheus.MustRegister(operatorStepDuration) prometheus.MustRegister(OperatorLimitCounter) prometheus.MustRegister(OperatorExceededStoreLimitCounter) + prometheus.MustRegister(operatorCounter) + prometheus.MustRegister(operatorDuration) + prometheus.MustRegister(operatorWaitDuration) + prometheus.MustRegister(operatorWaitCounter) + prometheus.MustRegister(operatorSizeHist) + prometheus.MustRegister(storeLimitCostCounter) } diff --git a/pkg/schedule/operator_controller.go b/pkg/schedule/operator/operator_controller.go similarity index 76% rename from pkg/schedule/operator_controller.go rename to pkg/schedule/operator/operator_controller.go index 25fa334422b..9561420f804 100644 --- a/pkg/schedule/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package operator import ( "container/heap" @@ -31,7 +31,6 @@ import ( "github.com/tikv/pd/pkg/errs" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/hbstream" - "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" @@ -47,7 +46,7 @@ const ( var ( slowNotifyInterval = 5 * time.Second fastNotifyInterval = 2 * time.Second - // PushOperatorTickInterval is the interval try to push the operator. + // PushOperatorTickInterval is the interval try to push the PushOperatorTickInterval = 500 * time.Millisecond // StoreBalanceBaseTime represents the base time of balance rate. StoreBalanceBaseTime float64 = 60 @@ -55,53 +54,58 @@ var ( FastOperatorFinishTime = 10 * time.Second ) -// OperatorController is used to limit the speed of scheduling. -type OperatorController struct { +// Controller is used to limit the speed of scheduling. +type Controller struct { syncutil.RWMutex ctx context.Context cluster sche.ClusterInformer - operators map[uint64]*operator.Operator + operators map[uint64]*Operator hbStreams *hbstream.HeartbeatStreams fastOperators *cache.TTLUint64 - counts map[operator.OpKind]uint64 - opRecords *OperatorRecords + counts map[OpKind]uint64 + records *records wop WaitingOperator - wopStatus *WaitingOperatorStatus + wopStatus *waitingOperatorStatus opNotifierQueue operatorQueue } -// NewOperatorController creates a OperatorController. -func NewOperatorController(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *OperatorController { - return &OperatorController{ +// NewController creates a Controller. +func NewController(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Controller { + return &Controller{ ctx: ctx, cluster: cluster, - operators: make(map[uint64]*operator.Operator), + operators: make(map[uint64]*Operator), hbStreams: hbStreams, fastOperators: cache.NewIDTTL(ctx, time.Minute, FastOperatorFinishTime), - counts: make(map[operator.OpKind]uint64), - opRecords: NewOperatorRecords(ctx), - wop: NewRandBuckets(), - wopStatus: NewWaitingOperatorStatus(), + counts: make(map[OpKind]uint64), + records: newRecords(ctx), + wop: newRandBuckets(), + wopStatus: newWaitingOperatorStatus(), opNotifierQueue: make(operatorQueue, 0), } } // Ctx returns a context which will be canceled once RaftCluster is stopped. // For now, it is only used to control the lifetime of TTL cache in schedulers. -func (oc *OperatorController) Ctx() context.Context { +func (oc *Controller) Ctx() context.Context { return oc.ctx } // GetCluster exports cluster to evict-scheduler for check store status. -func (oc *OperatorController) GetCluster() sche.ClusterInformer { +func (oc *Controller) GetCluster() sche.ClusterInformer { oc.RLock() defer oc.RUnlock() return oc.cluster } +// GetHBStreams returns the heartbeat steams. +func (oc *Controller) GetHBStreams() *hbstream.HeartbeatStreams { + return oc.hbStreams +} + // Dispatch is used to dispatch the operator of a region. -func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { - // Check existed operator. +func (oc *Controller) Dispatch(region *core.RegionInfo, source string) { + // Check existed if op := oc.GetOperator(region.GetID()); op != nil { failpoint.Inject("concurrentRemoveOperator", func() { time.Sleep(500 * time.Millisecond) @@ -111,13 +115,13 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { // Check will call CheckSuccess and CheckTimeout. step := op.Check(region) switch op.Status() { - case operator.STARTED: + case STARTED: operatorCounter.WithLabelValues(op.Desc(), "check").Inc() if source == DispatchFromHeartBeat && oc.checkStaleOperator(op, step, region) { return } oc.SendScheduleCommand(region, step, source) - case operator.SUCCESS: + case SUCCESS: if op.ContainNonWitnessStep() { oc.cluster.RecordOpStepWithTTL(op.RegionID()) } @@ -129,7 +133,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { log.Debug("op finish duration less than 10s", zap.Uint64("region-id", op.RegionID())) oc.pushFastOperator(op) } - case operator.TIMEOUT: + case TIMEOUT: if oc.RemoveOperator(op) { operatorCounter.WithLabelValues(op.Desc(), "promote-timeout").Inc() oc.PromoteWaitingOperator() @@ -140,7 +144,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { // CANCELED, REPLACED must remove before transition. log.Error("dispatching operator with unexpected status", zap.Uint64("region-id", op.RegionID()), - zap.String("status", operator.OpStatusToString(op.Status())), + zap.String("status", OpStatusToString(op.Status())), zap.Reflect("operator", op), errs.ZapError(errs.ErrUnexpectedOperatorStatus)) operatorWaitCounter.WithLabelValues(op.Desc(), "unexpected").Inc() failpoint.Inject("unexpectedOperator", func() { @@ -155,7 +159,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { } } -func (oc *OperatorController) checkStaleOperator(op *operator.Operator, step operator.OpStep, region *core.RegionInfo) bool { +func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core.RegionInfo) bool { err := step.CheckInProgress(oc.cluster, region) if err != nil { if oc.RemoveOperator(op, zap.String("reason", err.Error())) { @@ -189,10 +193,10 @@ func (oc *OperatorController) checkStaleOperator(op *operator.Operator, step ope return false } -func (oc *OperatorController) getNextPushOperatorTime(step operator.OpStep, now time.Time) time.Time { +func (oc *Controller) getNextPushOperatorTime(step OpStep, now time.Time) time.Time { nextTime := slowNotifyInterval switch step.(type) { - case operator.TransferLeader, operator.PromoteLearner, operator.ChangePeerV2Enter, operator.ChangePeerV2Leave: + case TransferLeader, PromoteLearner, ChangePeerV2Enter, ChangePeerV2Leave: nextTime = fastNotifyInterval } return now.Add(nextTime) @@ -201,7 +205,7 @@ func (oc *OperatorController) getNextPushOperatorTime(step operator.OpStep, now // pollNeedDispatchRegion returns the region need to dispatch, // "next" is true to indicate that it may exist in next attempt, // and false is the end for the poll. -func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { +func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { oc.Lock() defer oc.Unlock() if oc.opNotifierQueue.Len() == 0 { @@ -242,7 +246,7 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next } // PushOperators periodically pushes the unfinished operator to the executor(TiKV). -func (oc *OperatorController) PushOperators() { +func (oc *Controller) PushOperators() { for { r, next := oc.pollNeedDispatchRegion() if !next { @@ -257,7 +261,7 @@ func (oc *OperatorController) PushOperators() { } // AddWaitingOperator adds operators to waiting operators. -func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int { +func (oc *Controller) AddWaitingOperator(ops ...*Operator) int { oc.Lock() added := 0 needPromoted := 0 @@ -266,14 +270,14 @@ func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int op := ops[i] desc := op.Desc() isMerge := false - if op.Kind()&operator.OpMerge != 0 { + if op.Kind()&OpMerge != 0 { if i+1 >= len(ops) { // should not be here forever log.Error("orphan merge operators found", zap.String("desc", desc), errs.ZapError(errs.ErrMergeOperator.FastGenByArgs("orphan operator found"))) oc.Unlock() return added } - if ops[i+1].Kind()&operator.OpMerge == 0 { + if ops[i+1].Kind()&OpMerge == 0 { log.Error("merge operator should be paired", zap.String("desc", ops[i+1].Desc()), errs.ZapError(errs.ErrMergeOperator.FastGenByArgs("operator should be paired"))) oc.Unlock() @@ -316,7 +320,7 @@ func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int } // AddOperator adds operators to the running operators. -func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool { +func (oc *Controller) AddOperator(ops ...*Operator) bool { oc.Lock() defer oc.Unlock() @@ -339,10 +343,10 @@ func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool { } // PromoteWaitingOperator promotes operators from waiting operators. -func (oc *OperatorController) PromoteWaitingOperator() { +func (oc *Controller) PromoteWaitingOperator() { oc.Lock() defer oc.Unlock() - var ops []*operator.Operator + var ops []*Operator for { // GetOperator returns one operator or two merge operators ops = oc.wop.GetOperator() @@ -375,10 +379,10 @@ func (oc *OperatorController) PromoteWaitingOperator() { // There are several situations that cannot be added: // - There is no such region in the cluster // - The epoch of the operator and the epoch of the corresponding region are no longer consistent. -// - The region already has a higher priority or same priority operator. +// - The region already has a higher priority or same priority // - Exceed the max number of waiting operators // - At least one operator is expired. -func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operator.Operator) bool { +func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool { for _, op := range ops { region := oc.cluster.GetRegion(op.RegionID()) if region == nil { @@ -403,10 +407,10 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato operatorWaitCounter.WithLabelValues(op.Desc(), "already-have").Inc() return false } - if op.Status() != operator.CREATED { + if op.Status() != CREATED { log.Error("trying to add operator with unexpected status", zap.Uint64("region-id", op.RegionID()), - zap.String("status", operator.OpStatusToString(op.Status())), + zap.String("status", OpStatusToString(op.Status())), zap.Reflect("operator", op), errs.ZapError(errs.ErrUnexpectedOperatorStatus)) failpoint.Inject("unexpectedOperator", func() { panic(op) @@ -420,7 +424,7 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato return false } - if op.SchedulerKind() == operator.OpAdmin || op.IsLeaveJointStateOperator() { + if op.SchedulerKind() == OpAdmin || op.IsLeaveJointStateOperator() { continue } } @@ -434,11 +438,11 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato return !expired } -func isHigherPriorityOperator(new, old *operator.Operator) bool { +func isHigherPriorityOperator(new, old *Operator) bool { return new.GetPriorityLevel() > old.GetPriorityLevel() } -func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { +func (oc *Controller) addOperatorLocked(op *Operator) bool { regionID := op.RegionID() log.Info("add operator", zap.Uint64("region-id", regionID), @@ -456,7 +460,7 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { if !op.Start() { log.Error("adding operator with unexpected status", zap.Uint64("region-id", regionID), - zap.String("status", operator.OpStatusToString(op.Status())), + zap.String("status", OpStatusToString(op.Status())), zap.Reflect("operator", op), errs.ZapError(errs.ErrUnexpectedOperatorStatus)) failpoint.Inject("unexpectedOperator", func() { panic(op) @@ -468,7 +472,7 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { operatorCounter.WithLabelValues(op.Desc(), "start").Inc() operatorSizeHist.WithLabelValues(op.Desc()).Observe(float64(op.ApproximateSize)) operatorWaitDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds()) - opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster) + opInfluence := NewTotalOpInfluence([]*Operator{op}, oc.cluster) for storeID := range opInfluence.StoresInfluence { store := oc.cluster.GetStore(storeID) if store == nil { @@ -487,7 +491,7 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { } oc.updateCounts(oc.operators) - var step operator.OpStep + var step OpStep if region := oc.cluster.GetRegion(op.RegionID()); region != nil { if step = op.Check(region); step != nil { oc.SendScheduleCommand(region, step, DispatchFromCreate) @@ -502,8 +506,8 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { return true } -func (oc *OperatorController) ack(op *operator.Operator) { - opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster) +func (oc *Controller) ack(op *Operator) { + opInfluence := NewTotalOpInfluence([]*Operator{op}, oc.cluster) for storeID := range opInfluence.StoresInfluence { for _, v := range storelimit.TypeNameValue { limiter := oc.getOrCreateStoreLimit(storeID, v) @@ -517,7 +521,7 @@ func (oc *OperatorController) ack(op *operator.Operator) { } // RemoveOperator removes a operator from the running operators. -func (oc *OperatorController) RemoveOperator(op *operator.Operator, extraFields ...zap.Field) bool { +func (oc *Controller) RemoveOperator(op *Operator, extraFields ...zap.Field) bool { oc.Lock() removed := oc.removeOperatorLocked(op) oc.Unlock() @@ -533,13 +537,13 @@ func (oc *OperatorController) RemoveOperator(op *operator.Operator, extraFields return removed } -func (oc *OperatorController) removeOperatorWithoutBury(op *operator.Operator) bool { +func (oc *Controller) removeOperatorWithoutBury(op *Operator) bool { oc.Lock() defer oc.Unlock() return oc.removeOperatorLocked(op) } -func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool { +func (oc *Controller) removeOperatorLocked(op *Operator) bool { regionID := op.RegionID() if cur := oc.operators[regionID]; cur == op { delete(oc.operators, regionID) @@ -551,13 +555,13 @@ func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool { return false } -func (oc *OperatorController) buryOperator(op *operator.Operator, extraFields ...zap.Field) { +func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) { st := op.Status() - if !operator.IsEndStatus(st) { + if !IsEndStatus(st) { log.Error("burying operator with non-end status", zap.Uint64("region-id", op.RegionID()), - zap.String("status", operator.OpStatusToString(op.Status())), + zap.String("status", OpStatusToString(op.Status())), zap.Reflect("operator", op), errs.ZapError(errs.ErrUnexpectedOperatorStatus)) failpoint.Inject("unexpectedOperator", func() { panic(op) @@ -567,7 +571,7 @@ func (oc *OperatorController) buryOperator(op *operator.Operator, extraFields .. } switch st { - case operator.SUCCESS: + case SUCCESS: log.Info("operator finish", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), @@ -578,27 +582,27 @@ func (oc *OperatorController) buryOperator(op *operator.Operator, extraFields .. for _, counter := range op.FinishedCounters { counter.Inc() } - case operator.REPLACED: + case REPLACED: log.Info("replace old operator", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), zap.String("additional-info", op.GetAdditionalInfo())) operatorCounter.WithLabelValues(op.Desc(), "replace").Inc() - case operator.EXPIRED: + case EXPIRED: log.Info("operator expired", zap.Uint64("region-id", op.RegionID()), zap.Duration("lives", op.ElapsedTime()), zap.Reflect("operator", op)) operatorCounter.WithLabelValues(op.Desc(), "expire").Inc() - case operator.TIMEOUT: + case TIMEOUT: log.Info("operator timeout", zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), zap.String("additional-info", op.GetAdditionalInfo())) operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() - case operator.CANCELED: + case CANCELED: fields := []zap.Field{ zap.Uint64("region-id", op.RegionID()), zap.Duration("takes", op.RunningTime()), @@ -612,32 +616,32 @@ func (oc *OperatorController) buryOperator(op *operator.Operator, extraFields .. operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc() } - oc.opRecords.Put(op) + oc.records.Put(op) } // GetOperatorStatus gets the operator and its status with the specify id. -func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus { +func (oc *Controller) GetOperatorStatus(id uint64) *OpWithStatus { oc.Lock() defer oc.Unlock() if op, ok := oc.operators[id]; ok { - return NewOperatorWithStatus(op) + return NewOpWithStatus(op) } - return oc.opRecords.Get(id) + return oc.records.Get(id) } // GetOperator gets a operator from the given region. -func (oc *OperatorController) GetOperator(regionID uint64) *operator.Operator { +func (oc *Controller) GetOperator(regionID uint64) *Operator { oc.RLock() defer oc.RUnlock() return oc.operators[regionID] } // GetOperators gets operators from the running operators. -func (oc *OperatorController) GetOperators() []*operator.Operator { +func (oc *Controller) GetOperators() []*Operator { oc.RLock() defer oc.RUnlock() - operators := make([]*operator.Operator, 0, len(oc.operators)) + operators := make([]*Operator, 0, len(oc.operators)) for _, op := range oc.operators { operators = append(operators, op) } @@ -646,14 +650,14 @@ func (oc *OperatorController) GetOperators() []*operator.Operator { } // GetWaitingOperators gets operators from the waiting operators. -func (oc *OperatorController) GetWaitingOperators() []*operator.Operator { +func (oc *Controller) GetWaitingOperators() []*Operator { oc.RLock() defer oc.RUnlock() return oc.wop.ListOperator() } // SendScheduleCommand sends a command to the region. -func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step operator.OpStep, source string) { +func (oc *Controller) SendScheduleCommand(region *core.RegionInfo, step OpStep, source string) { log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), @@ -667,15 +671,15 @@ func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step oc.hbStreams.SendMsg(region, cmd) } -func (oc *OperatorController) pushFastOperator(op *operator.Operator) { +func (oc *Controller) pushFastOperator(op *Operator) { oc.fastOperators.Put(op.RegionID(), op) } // GetRecords gets operators' records. -func (oc *OperatorController) GetRecords(from time.Time) []*operator.OpRecord { - records := make([]*operator.OpRecord, 0, oc.opRecords.ttl.Len()) - for _, id := range oc.opRecords.ttl.GetAllID() { - op := oc.opRecords.Get(id) +func (oc *Controller) GetRecords(from time.Time) []*OpRecord { + records := make([]*OpRecord, 0, oc.records.ttl.Len()) + for _, id := range oc.records.ttl.GetAllID() { + op := oc.records.Get(id) if op == nil || op.FinishTime.Before(from) { continue } @@ -685,10 +689,10 @@ func (oc *OperatorController) GetRecords(from time.Time) []*operator.OpRecord { } // GetHistory gets operators' history. -func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory { - history := make([]operator.OpHistory, 0, oc.opRecords.ttl.Len()) - for _, id := range oc.opRecords.ttl.GetAllID() { - op := oc.opRecords.Get(id) +func (oc *Controller) GetHistory(start time.Time) []OpHistory { + history := make([]OpHistory, 0, oc.records.ttl.Len()) + for _, id := range oc.records.ttl.GetAllID() { + op := oc.records.Get(id) if op == nil || op.FinishTime.Before(start) { continue } @@ -698,7 +702,7 @@ func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory { } // updateCounts updates resource counts using current pending operators. -func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operator) { +func (oc *Controller) updateCounts(operators map[uint64]*Operator) { for k := range oc.counts { delete(oc.counts, k) } @@ -709,16 +713,16 @@ func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operat // OperatorCount gets the count of operators filtered by kind. // kind only has one OpKind. -func (oc *OperatorController) OperatorCount(kind operator.OpKind) uint64 { +func (oc *Controller) OperatorCount(kind OpKind) uint64 { oc.RLock() defer oc.RUnlock() return oc.counts[kind] } // GetOpInfluence gets OpInfluence. -func (oc *OperatorController) GetOpInfluence(cluster sche.ClusterInformer) operator.OpInfluence { - influence := operator.OpInfluence{ - StoresInfluence: make(map[uint64]*operator.StoreInfluence), +func (oc *Controller) GetOpInfluence(cluster sche.ClusterInformer) OpInfluence { + influence := OpInfluence{ + StoresInfluence: make(map[uint64]*StoreInfluence), } oc.RLock() defer oc.RUnlock() @@ -734,13 +738,13 @@ func (oc *OperatorController) GetOpInfluence(cluster sche.ClusterInformer) opera } // GetFastOpInfluence get fast finish operator influence -func (oc *OperatorController) GetFastOpInfluence(cluster sche.ClusterInformer, influence operator.OpInfluence) { +func (oc *Controller) GetFastOpInfluence(cluster sche.ClusterInformer, influence OpInfluence) { for _, id := range oc.fastOperators.GetAllID() { value, ok := oc.fastOperators.Get(id) if !ok { continue } - op, ok := value.(*operator.Operator) + op, ok := value.(*Operator) if !ok { continue } @@ -749,14 +753,14 @@ func (oc *OperatorController) GetFastOpInfluence(cluster sche.ClusterInformer, i } // AddOpInfluence add operator influence for cluster -func AddOpInfluence(op *operator.Operator, influence operator.OpInfluence, cluster sche.ClusterInformer) { +func AddOpInfluence(op *Operator, influence OpInfluence, cluster sche.ClusterInformer) { region := cluster.GetRegion(op.RegionID()) op.TotalInfluence(influence, region) } // NewTotalOpInfluence creates a OpInfluence. -func NewTotalOpInfluence(operators []*operator.Operator, cluster sche.ClusterInformer) operator.OpInfluence { - influence := *operator.NewOpInfluence() +func NewTotalOpInfluence(operators []*Operator, cluster sche.ClusterInformer) OpInfluence { + influence := *NewOpInfluence() for _, op := range operators { AddOpInfluence(op, influence, cluster) @@ -766,73 +770,73 @@ func NewTotalOpInfluence(operators []*operator.Operator, cluster sche.ClusterInf } // SetOperator is only used for test. -func (oc *OperatorController) SetOperator(op *operator.Operator) { +func (oc *Controller) SetOperator(op *Operator) { oc.Lock() defer oc.Unlock() oc.operators[op.RegionID()] = op oc.updateCounts(oc.operators) } -// OperatorWithStatus records the operator and its status. -type OperatorWithStatus struct { - *operator.Operator +// OpWithStatus records the operator and its status. +type OpWithStatus struct { + *Operator Status pdpb.OperatorStatus FinishTime time.Time } -// NewOperatorWithStatus creates an OperatorStatus from an operator. -func NewOperatorWithStatus(op *operator.Operator) *OperatorWithStatus { - return &OperatorWithStatus{ +// NewOpWithStatus creates an OpWithStatus from an operator. +func NewOpWithStatus(op *Operator) *OpWithStatus { + return &OpWithStatus{ Operator: op, - Status: operator.OpStatusToPDPB(op.Status()), + Status: OpStatusToPDPB(op.Status()), FinishTime: time.Now(), } } // MarshalJSON returns the status of operator as a JSON string -func (o *OperatorWithStatus) MarshalJSON() ([]byte, error) { +func (o *OpWithStatus) MarshalJSON() ([]byte, error) { return []byte(`"` + fmt.Sprintf("status: %s, operator: %s", o.Status.String(), o.Operator.String()) + `"`), nil } -// OperatorRecords remains the operator and its status for a while. -type OperatorRecords struct { +// records remains the operator and its status for a while. +type records struct { ttl *cache.TTLUint64 } const operatorStatusRemainTime = 10 * time.Minute -// NewOperatorRecords returns a OperatorRecords. -func NewOperatorRecords(ctx context.Context) *OperatorRecords { - return &OperatorRecords{ +// newRecords returns a records. +func newRecords(ctx context.Context) *records { + return &records{ ttl: cache.NewIDTTL(ctx, time.Minute, operatorStatusRemainTime), } } // Get gets the operator and its status. -func (o *OperatorRecords) Get(id uint64) *OperatorWithStatus { +func (o *records) Get(id uint64) *OpWithStatus { v, exist := o.ttl.Get(id) if !exist { return nil } - return v.(*OperatorWithStatus) + return v.(*OpWithStatus) } // Put puts the operator and its status. -func (o *OperatorRecords) Put(op *operator.Operator) { +func (o *records) Put(op *Operator) { id := op.RegionID() - record := NewOperatorWithStatus(op) + record := NewOpWithStatus(op) o.ttl.Put(id, record) } -// ExceedStoreLimit returns true if the store exceeds the cost limit after adding the operator. Otherwise, returns false. -func (oc *OperatorController) ExceedStoreLimit(ops ...*operator.Operator) bool { +// ExceedStoreLimit returns true if the store exceeds the cost limit after adding the Otherwise, returns false. +func (oc *Controller) ExceedStoreLimit(ops ...*Operator) bool { oc.Lock() defer oc.Unlock() return oc.exceedStoreLimitLocked(ops...) } -// exceedStoreLimitLocked returns true if the store exceeds the cost limit after adding the operator. Otherwise, returns false. -func (oc *OperatorController) exceedStoreLimitLocked(ops ...*operator.Operator) bool { +// exceedStoreLimitLocked returns true if the store exceeds the cost limit after adding the Otherwise, returns false. +func (oc *Controller) exceedStoreLimitLocked(ops ...*Operator) bool { // The operator with Urgent priority, like admin operators, should ignore the store limit check. var desc string if len(ops) != 0 { @@ -853,7 +857,7 @@ func (oc *OperatorController) exceedStoreLimitLocked(ops ...*operator.Operator) return false } if !limiter.Available(stepCost, v, ops[0].GetPriorityLevel()) { - operator.OperatorExceededStoreLimitCounter.WithLabelValues(desc).Inc() + OperatorExceededStoreLimitCounter.WithLabelValues(desc).Inc() return true } } @@ -862,7 +866,7 @@ func (oc *OperatorController) exceedStoreLimitLocked(ops ...*operator.Operator) } // getOrCreateStoreLimit is used to get or create the limit of a store. -func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64, limitType storelimit.Type) storelimit.StoreLimit { +func (oc *Controller) getOrCreateStoreLimit(storeID uint64, limitType storelimit.Type) storelimit.StoreLimit { ratePerSec := oc.cluster.GetOpts().GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime s := oc.cluster.GetStore(storeID) if s == nil { diff --git a/pkg/schedule/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go similarity index 74% rename from pkg/schedule/operator_controller_test.go rename to pkg/schedule/operator/operator_controller_test.go index f198529bcc1..655197f4a2d 100644 --- a/pkg/schedule/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package operator import ( "container/heap" @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/mock/mockconfig" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" - "github.com/tikv/pd/pkg/schedule/operator" ) type operatorControllerTestSuite struct { @@ -49,7 +48,7 @@ func TestOperatorControllerTestSuite(t *testing.T) { func (suite *operatorControllerTestSuite) SetupSuite() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/unexpectedOperator", "return(true)")) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/operator/unexpectedOperator", "return(true)")) } func (suite *operatorControllerTestSuite) TearDownSuite() { @@ -59,30 +58,30 @@ func (suite *operatorControllerTestSuite) TearDownSuite() { func (suite *operatorControllerTestSuite) TestCacheInfluence() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) - oc := NewOperatorController(suite.ctx, tc, nil) + oc := NewController(suite.ctx, tc, nil) tc.AddLeaderStore(2, 1) region := tc.AddLeaderRegion(1, 1, 2) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 2}, + steps := []OpStep{ + RemovePeer{FromStore: 2}, } - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, steps...) oc.SetOperator(op) suite.True(op.Start()) - influence := operator.NewOpInfluence() + influence := NewOpInfluence() AddOpInfluence(op, *influence, tc) suite.Equal(int64(-96), influence.GetStoreInfluence(2).RegionSize) // case: influence is same even if the region size changed. region = region.Clone(core.SetApproximateSize(100)) tc.PutRegion(region) - influence1 := operator.NewOpInfluence() + influence1 := NewOpInfluence() AddOpInfluence(op, *influence1, tc) suite.Equal(int64(-96), influence1.GetStoreInfluence(2).RegionSize) // case: influence is valid even if the region is removed. tc.RemoveRegion(region) - influence2 := operator.NewOpInfluence() + influence2 := NewOpInfluence() AddOpInfluence(op, *influence2, tc) suite.Equal(int64(-96), influence2.GetStoreInfluence(2).RegionSize) } @@ -91,15 +90,15 @@ func (suite *operatorControllerTestSuite) TestCacheInfluence() { func (suite *operatorControllerTestSuite) TestGetOpInfluence() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) - oc := NewOperatorController(suite.ctx, tc, nil) + oc := NewController(suite.ctx, tc, nil) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 1, 2) tc.AddLeaderRegion(2, 1, 2) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 2}, + steps := []OpStep{ + RemovePeer{FromStore: 2}, } - op1 := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) - op2 := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op1 := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, steps...) + op2 := NewTestOperator(2, &metapb.RegionEpoch{}, OpRegion, steps...) suite.True(op1.Start()) oc.SetOperator(op1) suite.True(op2.Start()) @@ -134,26 +133,26 @@ func (suite *operatorControllerTestSuite) TestOperatorStatus() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderRegion(1, 1, 2) tc.AddLeaderRegion(2, 1, 2) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 2}, - operator.AddPeer{ToStore: 2, PeerID: 4}, + steps := []OpStep{ + RemovePeer{FromStore: 2}, + AddPeer{ToStore: 2, PeerID: 4}, } region1 := tc.GetRegion(1) region2 := tc.GetRegion(2) - op1 := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) - op2 := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op1 := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, steps...) + op2 := NewTestOperator(2, &metapb.RegionEpoch{}, OpRegion, steps...) suite.True(op1.Start()) oc.SetOperator(op1) suite.True(op2.Start()) oc.SetOperator(op2) suite.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(1).Status) suite.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(2).Status) - op1.SetStatusReachTime(operator.STARTED, time.Now().Add(-operator.SlowStepWaitTime-operator.FastStepWaitTime)) + op1.SetStatusReachTime(STARTED, time.Now().Add(-SlowStepWaitTime-FastStepWaitTime)) region2 = ApplyOperatorStep(region2, op2) tc.PutRegion(region2) oc.Dispatch(region1, "test") @@ -169,17 +168,17 @@ func (suite *operatorControllerTestSuite) TestFastFailOperator() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderStore(3, 0) tc.AddLeaderRegion(1, 1, 2) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 2}, - operator.AddPeer{ToStore: 3, PeerID: 4}, + steps := []OpStep{ + RemovePeer{FromStore: 2}, + AddPeer{ToStore: 3, PeerID: 4}, } region := tc.GetRegion(1) - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, steps...) suite.True(op.Start()) oc.SetOperator(op) oc.Dispatch(region, "test") @@ -187,14 +186,14 @@ func (suite *operatorControllerTestSuite) TestFastFailOperator() { // change the leader region = region.Clone(core.WithLeader(region.GetPeer(2))) oc.Dispatch(region, DispatchFromHeartBeat) - suite.Equal(operator.CANCELED, op.Status()) + suite.Equal(CANCELED, op.Status()) suite.Nil(oc.GetOperator(region.GetID())) // transfer leader to an illegal store. - op = operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 5}) + op = NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 5}) oc.SetOperator(op) oc.Dispatch(region, DispatchFromHeartBeat) - suite.Equal(operator.CANCELED, op.Status()) + suite.Equal(CANCELED, op.Status()) suite.Nil(oc.GetOperator(region.GetID())) } @@ -203,14 +202,14 @@ func (suite *operatorControllerTestSuite) TestFastFailWithUnhealthyStore() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderStore(3, 0) tc.AddLeaderRegion(1, 1, 2) region := tc.GetRegion(1) - steps := []operator.OpStep{operator.TransferLeader{ToStore: 2}} - op := operator.NewTestOperator(1, region.GetRegionEpoch(), operator.OpLeader, steps...) + steps := []OpStep{TransferLeader{ToStore: 2}} + op := NewTestOperator(1, region.GetRegionEpoch(), OpLeader, steps...) oc.SetOperator(op) suite.False(oc.checkStaleOperator(op, steps[0], region)) tc.SetStoreDown(2) @@ -218,42 +217,42 @@ func (suite *operatorControllerTestSuite) TestFastFailWithUnhealthyStore() { } func (suite *operatorControllerTestSuite) TestCheckAddUnexpectedStatus() { - suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/unexpectedOperator")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/operator/unexpectedOperator")) opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) tc.AddLeaderStore(1, 0) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 2, 1) tc.AddLeaderRegion(2, 2, 1) region1 := tc.GetRegion(1) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 1}, - operator.AddPeer{ToStore: 1, PeerID: 4}, + steps := []OpStep{ + RemovePeer{FromStore: 1}, + AddPeer{ToStore: 1, PeerID: 4}, } { // finished op - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) suite.True(oc.checkAddOperator(false, op)) op.Start() suite.False(oc.checkAddOperator(false, op)) // started suite.Nil(op.Check(region1)) - suite.Equal(operator.SUCCESS, op.Status()) + suite.Equal(SUCCESS, op.Status()) suite.False(oc.checkAddOperator(false, op)) // success } { // finished op canceled - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) suite.True(oc.checkAddOperator(false, op)) suite.True(op.Cancel()) suite.False(oc.checkAddOperator(false, op)) } { // finished op replaced - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) suite.True(oc.checkAddOperator(false, op)) suite.True(op.Start()) suite.True(op.Replace()) @@ -261,23 +260,23 @@ func (suite *operatorControllerTestSuite) TestCheckAddUnexpectedStatus() { } { // finished op expired - op1 := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) - op2 := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 1}) + op1 := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) + op2 := NewTestOperator(2, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 1}) suite.True(oc.checkAddOperator(false, op1, op2)) - op1.SetStatusReachTime(operator.CREATED, time.Now().Add(-operator.OperatorExpireTime)) - op2.SetStatusReachTime(operator.CREATED, time.Now().Add(-operator.OperatorExpireTime)) + op1.SetStatusReachTime(CREATED, time.Now().Add(-OperatorExpireTime)) + op2.SetStatusReachTime(CREATED, time.Now().Add(-OperatorExpireTime)) suite.False(oc.checkAddOperator(false, op1, op2)) - suite.Equal(operator.EXPIRED, op1.Status()) - suite.Equal(operator.EXPIRED, op2.Status()) + suite.Equal(EXPIRED, op1.Status()) + suite.Equal(EXPIRED, op2.Status()) } // finished op never timeout { // unfinished op timeout - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, steps...) suite.True(oc.checkAddOperator(false, op)) op.Start() - op.SetStatusReachTime(operator.STARTED, time.Now().Add(-operator.SlowStepWaitTime-operator.FastStepWaitTime)) + op.SetStatusReachTime(STARTED, time.Now().Add(-SlowStepWaitTime-FastStepWaitTime)) suite.True(op.CheckTimeout()) suite.False(oc.checkAddOperator(false, op)) } @@ -288,24 +287,24 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) tc.AddLeaderStore(1, 0) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 2, 1) region1 := tc.GetRegion(1) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 1}, - operator.AddPeer{ToStore: 1, PeerID: 4}, + steps := []OpStep{ + RemovePeer{FromStore: 1}, + AddPeer{ToStore: 1, PeerID: 4}, } // finished op with normal priority - op1 := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) + op1 := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) // unfinished op with high priority - op2 := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion|operator.OpAdmin, steps...) + op2 := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion|OpAdmin, steps...) suite.True(op1.Start()) oc.SetOperator(op1) - suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator", "return(true)")) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/operator/concurrentRemoveOperator", "return(true)")) var wg sync.WaitGroup wg.Add(2) go func() { @@ -322,27 +321,27 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() { wg.Wait() suite.Equal(op2, oc.GetOperator(1)) - suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/operator/concurrentRemoveOperator")) } func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 1, 2) tc.AddLeaderRegion(2, 1, 2) tc.AddLeaderRegion(4, 2, 1) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 2}, - operator.AddPeer{ToStore: 2, PeerID: 4}, + steps := []OpStep{ + RemovePeer{FromStore: 2}, + AddPeer{ToStore: 2, PeerID: 4}, } - op1 := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) - op2 := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, steps...) - op3 := operator.NewTestOperator(3, &metapb.RegionEpoch{}, operator.OpRegion, steps...) - op4 := operator.NewTestOperator(4, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2}) + op1 := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) + op2 := NewTestOperator(2, &metapb.RegionEpoch{}, OpRegion, steps...) + op3 := NewTestOperator(3, &metapb.RegionEpoch{}, OpRegion, steps...) + op4 := NewTestOperator(4, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2}) region1 := tc.GetRegion(1) region2 := tc.GetRegion(2) region4 := tc.GetRegion(4) @@ -402,7 +401,7 @@ func (suite *operatorControllerTestSuite) TestStoreLimit() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) tc.AddLeaderStore(1, 0) tc.UpdateLeaderCount(1, 1000) tc.AddLeaderStore(2, 0) @@ -414,53 +413,53 @@ func (suite *operatorControllerTestSuite) TestStoreLimit() { tc.SetStoreLimit(2, storelimit.AddPeer, 60) for i := uint64(1); i <= 5; i++ { - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, AddPeer{ToStore: 2, PeerID: i}) suite.True(oc.AddOperator(op)) suite.checkRemoveOperatorSuccess(oc, op) } - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 1}) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, AddPeer{ToStore: 2, PeerID: 1}) suite.False(oc.AddOperator(op)) suite.False(oc.RemoveOperator(op)) tc.SetStoreLimit(2, storelimit.AddPeer, 120) for i := uint64(1); i <= 10; i++ { - op = operator.NewTestOperator(i, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) + op = NewTestOperator(i, &metapb.RegionEpoch{}, OpRegion, AddPeer{ToStore: 2, PeerID: i}) suite.True(oc.AddOperator(op)) suite.checkRemoveOperatorSuccess(oc, op) } tc.SetAllStoresLimit(storelimit.AddPeer, 60) for i := uint64(1); i <= 5; i++ { - op = operator.NewTestOperator(i, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) + op = NewTestOperator(i, &metapb.RegionEpoch{}, OpRegion, AddPeer{ToStore: 2, PeerID: i}) suite.True(oc.AddOperator(op)) suite.checkRemoveOperatorSuccess(oc, op) } - op = operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 1}) + op = NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, AddPeer{ToStore: 2, PeerID: 1}) suite.False(oc.AddOperator(op)) suite.False(oc.RemoveOperator(op)) tc.SetStoreLimit(2, storelimit.RemovePeer, 60) for i := uint64(1); i <= 5; i++ { - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, RemovePeer{FromStore: 2}) suite.True(oc.AddOperator(op)) suite.checkRemoveOperatorSuccess(oc, op) } - op = operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + op = NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, RemovePeer{FromStore: 2}) suite.False(oc.AddOperator(op)) suite.False(oc.RemoveOperator(op)) tc.SetStoreLimit(2, storelimit.RemovePeer, 120) for i := uint64(1); i <= 10; i++ { - op = operator.NewTestOperator(i, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + op = NewTestOperator(i, &metapb.RegionEpoch{}, OpRegion, RemovePeer{FromStore: 2}) suite.True(oc.AddOperator(op)) suite.checkRemoveOperatorSuccess(oc, op) } tc.SetAllStoresLimit(storelimit.RemovePeer, 60) for i := uint64(1); i <= 5; i++ { - op = operator.NewTestOperator(i, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + op = NewTestOperator(i, &metapb.RegionEpoch{}, OpRegion, RemovePeer{FromStore: 2}) suite.True(oc.AddOperator(op)) suite.checkRemoveOperatorSuccess(oc, op) } - op = operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + op = NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, RemovePeer{FromStore: 2}) suite.False(oc.AddOperator(op)) suite.False(oc.RemoveOperator(op)) } @@ -469,18 +468,18 @@ func (suite *operatorControllerTestSuite) TestStoreLimit() { func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewOperatorController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster, stream) cluster.AddLeaderStore(1, 2) cluster.AddLeaderStore(2, 0) cluster.SetAllStoresLimit(storelimit.RemovePeer, 600) cluster.AddLeaderRegion(1, 1, 2) - steps := []operator.OpStep{ - operator.TransferLeader{FromStore: 1, ToStore: 2}, - operator.RemovePeer{FromStore: 1}, + steps := []OpStep{ + TransferLeader{FromStore: 1, ToStore: 2}, + RemovePeer{FromStore: 1}, } - op := operator.NewTestOperator(1, &metapb.RegionEpoch{ConfVer: 0, Version: 0}, operator.OpRegion, steps...) + op := NewTestOperator(1, &metapb.RegionEpoch{ConfVer: 0, Version: 0}, OpRegion, steps...) suite.True(controller.AddOperator(op)) suite.Equal(1, stream.MsgLength()) @@ -501,8 +500,8 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { suite.Equal(2, stream.MsgLength()) // add and dispatch op again, the op should be stale - op = operator.NewTestOperator(1, &metapb.RegionEpoch{ConfVer: 0, Version: 0}, - operator.OpRegion, steps...) + op = NewTestOperator(1, &metapb.RegionEpoch{ConfVer: 0, Version: 0}, + OpRegion, steps...) suite.True(controller.AddOperator(op)) suite.Equal(uint64(0), op.ConfVerChanged(region)) suite.Equal(3, stream.MsgLength()) @@ -519,7 +518,7 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { func (suite *operatorControllerTestSuite) TestCalcInfluence() { cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewOperatorController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster, stream) epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0} region := cluster.MockRegionInfo(1, 1, []uint64{2}, []uint64{}, epoch) @@ -528,16 +527,16 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { cluster.AddRegionStore(1, 1) cluster.AddRegionStore(3, 1) - steps := []operator.OpStep{ - operator.AddLearner{ToStore: 3, PeerID: 3}, - operator.PromoteLearner{ToStore: 3, PeerID: 3}, - operator.TransferLeader{FromStore: 1, ToStore: 3}, - operator.RemovePeer{FromStore: 1}, + steps := []OpStep{ + AddLearner{ToStore: 3, PeerID: 3}, + PromoteLearner{ToStore: 3, PeerID: 3}, + TransferLeader{FromStore: 1, ToStore: 3}, + RemovePeer{FromStore: 1}, } - op := operator.NewTestOperator(1, epoch, operator.OpRegion, steps...) + op := NewTestOperator(1, epoch, OpRegion, steps...) suite.True(controller.AddOperator(op)) - check := func(influence operator.OpInfluence, id uint64, expect *operator.StoreInfluence) { + check := func(influence OpInfluence, id uint64, expect *StoreInfluence) { si := influence.GetStoreInfluence(id) suite.Equal(si.LeaderCount, expect.LeaderCount) suite.Equal(si.LeaderSize, expect.LeaderSize) @@ -548,7 +547,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { } influence := controller.GetOpInfluence(cluster) - check(influence, 1, &operator.StoreInfluence{ + check(influence, 1, &StoreInfluence{ LeaderSize: -20, LeaderCount: -1, RegionSize: -20, @@ -557,7 +556,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { storelimit.RemovePeer: 200, }, }) - check(influence, 3, &operator.StoreInfluence{ + check(influence, 3, &StoreInfluence{ LeaderSize: 20, LeaderCount: 1, RegionSize: 20, @@ -575,7 +574,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { op.Check(region2) influence = controller.GetOpInfluence(cluster) - check(influence, 1, &operator.StoreInfluence{ + check(influence, 1, &StoreInfluence{ LeaderSize: -20, LeaderCount: -1, RegionSize: -20, @@ -584,7 +583,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { storelimit.RemovePeer: 200, }, }) - check(influence, 3, &operator.StoreInfluence{ + check(influence, 3, &StoreInfluence{ LeaderSize: 20, LeaderCount: 1, RegionSize: 0, @@ -596,7 +595,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewOperatorController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster, stream) // Create a new region with epoch(0, 0) // the region has two peers with its peer id allocated incrementally. @@ -611,24 +610,24 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { cluster.AddRegionStore(3, 1) // The next allocated peer should have peerid 3, so we add this peer // to store 3 - testSteps := [][]operator.OpStep{ + testSteps := [][]OpStep{ { - operator.AddLearner{ToStore: 3, PeerID: 3}, - operator.PromoteLearner{ToStore: 3, PeerID: 3}, - operator.TransferLeader{FromStore: 1, ToStore: 3}, - operator.RemovePeer{FromStore: 1}, + AddLearner{ToStore: 3, PeerID: 3}, + PromoteLearner{ToStore: 3, PeerID: 3}, + TransferLeader{FromStore: 1, ToStore: 3}, + RemovePeer{FromStore: 1}, }, { - operator.AddLearner{ToStore: 3, PeerID: 3, IsLightWeight: true}, - operator.PromoteLearner{ToStore: 3, PeerID: 3}, - operator.TransferLeader{FromStore: 1, ToStore: 3}, - operator.RemovePeer{FromStore: 1}, + AddLearner{ToStore: 3, PeerID: 3, IsLightWeight: true}, + PromoteLearner{ToStore: 3, PeerID: 3}, + TransferLeader{FromStore: 1, ToStore: 3}, + RemovePeer{FromStore: 1}, }, } for _, steps := range testSteps { // Create an operator - op := operator.NewTestOperator(1, epoch, operator.OpRegion, steps...) + op := NewTestOperator(1, epoch, OpRegion, steps...) suite.True(controller.AddOperator(op)) suite.Equal(1, stream.MsgLength()) @@ -723,7 +722,7 @@ func newRegionInfo(id uint64, startKey, endKey string, size, keys int64, leader ) } -func (suite *operatorControllerTestSuite) checkRemoveOperatorSuccess(oc *OperatorController, op *operator.Operator) { +func (suite *operatorControllerTestSuite) checkRemoveOperatorSuccess(oc *Controller, op *Operator) { suite.True(oc.RemoveOperator(op)) suite.True(op.IsEnd()) suite.Equal(op, oc.GetOperatorStatus(op.RegionID()).Operator) @@ -733,11 +732,11 @@ func (suite *operatorControllerTestSuite) TestAddWaitingOperator() { opts := mockconfig.NewTestOptions() cluster := mockcluster.NewCluster(suite.ctx, opts) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewOperatorController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster, stream) cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) - addPeerOp := func(i uint64) *operator.Operator { + addPeerOp := func(i uint64) *Operator { start := fmt.Sprintf("%da", i) end := fmt.Sprintf("%db", i) region := newRegionInfo(i, start, end, 1, 1, []uint64{101, 1}, []uint64{101, 1}) @@ -745,7 +744,7 @@ func (suite *operatorControllerTestSuite) TestAddWaitingOperator() { peer := &metapb.Peer{ StoreId: 2, } - op, err := operator.CreateAddPeerOperator("add-peer", cluster, region, peer, operator.OpKind(0)) + op, err := CreateAddPeerOperator("add-peer", cluster, region, peer, OpKind(0)) suite.NoError(err) suite.NotNil(op) @@ -753,7 +752,7 @@ func (suite *operatorControllerTestSuite) TestAddWaitingOperator() { } // a batch of operators should be added atomically - var batch []*operator.Operator + var batch []*Operator for i := uint64(0); i < cluster.GetSchedulerMaxWaitingOperator(); i++ { batch = append(batch, addPeerOp(i)) } @@ -779,7 +778,7 @@ func (suite *operatorControllerTestSuite) TestAddWaitingOperator() { target := newRegionInfo(102, "0a", "0b", 1, 1, []uint64{101, 1}, []uint64{101, 1}) cluster.PutRegion(target) - ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge) + ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) suite.NoError(err) suite.Len(ops, 2) @@ -802,16 +801,16 @@ func (suite *operatorControllerTestSuite) TestInvalidStoreId() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewOperatorController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc, stream) // If PD and store 3 are gone, PD will not have info of store 3 after recreating it. tc.AddRegionStore(1, 1) tc.AddRegionStore(2, 1) tc.AddRegionStore(4, 1) tc.AddLeaderRegionWithRange(1, "", "", 1, 2, 3, 4) - steps := []operator.OpStep{ - operator.RemovePeer{FromStore: 3, PeerID: 3, IsDownStore: false}, + steps := []OpStep{ + RemovePeer{FromStore: 3, PeerID: 3, IsDownStore: false}, } - op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, steps...) suite.True(oc.addOperatorLocked(op)) // Although store 3 does not exist in PD, PD can also send op to TiKV. suite.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(1).Status) diff --git a/pkg/schedule/operator_queue.go b/pkg/schedule/operator/operator_queue.go similarity index 93% rename from pkg/schedule/operator_queue.go rename to pkg/schedule/operator/operator_queue.go index 62746a8530f..7765427793f 100644 --- a/pkg/schedule/operator_queue.go +++ b/pkg/schedule/operator/operator_queue.go @@ -12,16 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package operator import ( "time" - - "github.com/tikv/pd/pkg/schedule/operator" ) type operatorWithTime struct { - op *operator.Operator + op *Operator time time.Time } diff --git a/pkg/schedule/test_util.go b/pkg/schedule/operator/test_util.go similarity index 84% rename from pkg/schedule/test_util.go rename to pkg/schedule/operator/test_util.go index 7b2e959b71c..8206189daa6 100644 --- a/pkg/schedule/test_util.go +++ b/pkg/schedule/operator/test_util.go @@ -12,23 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package operator import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" - "github.com/tikv/pd/pkg/schedule/operator" ) // ApplyOperatorStep applies operator step. Only for test purpose. -func ApplyOperatorStep(region *core.RegionInfo, op *operator.Operator) *core.RegionInfo { +func ApplyOperatorStep(region *core.RegionInfo, op *Operator) *core.RegionInfo { _ = op.Start() if step := op.Check(region); step != nil { switch s := step.(type) { - case operator.TransferLeader: + case TransferLeader: region = region.Clone(core.WithLeader(region.GetStorePeer(s.ToStore))) - case operator.AddPeer: + case AddPeer: if region.GetStorePeer(s.ToStore) != nil { panic("Add peer that exists") } @@ -37,7 +36,7 @@ func ApplyOperatorStep(region *core.RegionInfo, op *operator.Operator) *core.Reg StoreId: s.ToStore, } region = region.Clone(core.WithAddPeer(peer)) - case operator.RemovePeer: + case RemovePeer: if region.GetStorePeer(s.FromStore) == nil { panic("Remove peer that doesn't exist") } @@ -45,7 +44,7 @@ func ApplyOperatorStep(region *core.RegionInfo, op *operator.Operator) *core.Reg panic("Cannot remove the leader peer") } region = region.Clone(core.WithRemoveStorePeer(s.FromStore)) - case operator.AddLearner: + case AddLearner: if region.GetStorePeer(s.ToStore) != nil { panic("Add learner that exists") } @@ -55,7 +54,7 @@ func ApplyOperatorStep(region *core.RegionInfo, op *operator.Operator) *core.Reg Role: metapb.PeerRole_Learner, } region = region.Clone(core.WithAddPeer(peer)) - case operator.PromoteLearner: + case PromoteLearner: if region.GetStoreLearner(s.ToStore) == nil { panic("Promote peer that doesn't exist") } @@ -71,8 +70,8 @@ func ApplyOperatorStep(region *core.RegionInfo, op *operator.Operator) *core.Reg return region } -// ApplyOperator applies operator. Only for test purpose. -func ApplyOperator(mc *mockcluster.Cluster, op *operator.Operator) { +// ApplyOperator applies Only for test purpose. +func ApplyOperator(mc *mockcluster.Cluster, op *Operator) { origin := mc.GetRegion(op.RegionID()) region := origin for !op.IsEnd() { diff --git a/pkg/schedule/waiting_operator.go b/pkg/schedule/operator/waiting_operator.go similarity index 59% rename from pkg/schedule/waiting_operator.go rename to pkg/schedule/operator/waiting_operator.go index 139e185356c..8f5c72b053b 100644 --- a/pkg/schedule/waiting_operator.go +++ b/pkg/schedule/operator/waiting_operator.go @@ -12,49 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package operator import ( "math/rand" - - "github.com/tikv/pd/pkg/schedule/operator" ) -// PriorityWeight is used to represent the weight of different priorities of operators. -var PriorityWeight = []float64{1.0, 4.0, 9.0, 16.0} +// priorityWeight is used to represent the weight of different priorities of operators. +var priorityWeight = []float64{1.0, 4.0, 9.0, 16.0} // WaitingOperator is an interface of waiting operators. type WaitingOperator interface { - PutOperator(op *operator.Operator) - GetOperator() []*operator.Operator - ListOperator() []*operator.Operator + PutOperator(op *Operator) + GetOperator() []*Operator + ListOperator() []*Operator } -// Bucket is used to maintain the operators created by a specific scheduler. -type Bucket struct { +// bucket is used to maintain the operators created by a specific scheduler. +type bucket struct { weight float64 - ops []*operator.Operator + ops []*Operator } -// RandBuckets is an implementation of waiting operators -type RandBuckets struct { +// randBuckets is an implementation of waiting operators +type randBuckets struct { totalWeight float64 - buckets []*Bucket + buckets []*bucket } -// NewRandBuckets creates a random buckets. -func NewRandBuckets() *RandBuckets { - var buckets []*Bucket - for i := 0; i < len(PriorityWeight); i++ { - buckets = append(buckets, &Bucket{ - weight: PriorityWeight[i], +// newRandBuckets creates a random buckets. +func newRandBuckets() *randBuckets { + var buckets []*bucket + for i := 0; i < len(priorityWeight); i++ { + buckets = append(buckets, &bucket{ + weight: priorityWeight[i], }) } - return &RandBuckets{buckets: buckets} + return &randBuckets{buckets: buckets} } // PutOperator puts an operator into the random buckets. -func (b *RandBuckets) PutOperator(op *operator.Operator) { +func (b *randBuckets) PutOperator(op *Operator) { priority := op.GetPriorityLevel() bucket := b.buckets[priority] if len(bucket.ops) == 0 { @@ -64,8 +62,8 @@ func (b *RandBuckets) PutOperator(op *operator.Operator) { } // ListOperator lists all operator in the random buckets. -func (b *RandBuckets) ListOperator() []*operator.Operator { - var ops []*operator.Operator +func (b *randBuckets) ListOperator() []*Operator { + var ops []*Operator for i := range b.buckets { bucket := b.buckets[i] ops = append(ops, bucket.ops...) @@ -74,7 +72,7 @@ func (b *RandBuckets) ListOperator() []*operator.Operator { } // GetOperator gets an operator from the random buckets. -func (b *RandBuckets) GetOperator() []*operator.Operator { +func (b *randBuckets) GetOperator() []*Operator { if b.totalWeight == 0 { return nil } @@ -87,10 +85,10 @@ func (b *RandBuckets) GetOperator() []*operator.Operator { } proportion := bucket.weight / b.totalWeight if r >= sum && r < sum+proportion { - var res []*operator.Operator + var res []*Operator res = append(res, bucket.ops[0]) // Merge operation has two operators, and thus it should be handled specifically. - if bucket.ops[0].Kind()&operator.OpMerge != 0 { + if bucket.ops[0].Kind()&OpMerge != 0 { res = append(res, bucket.ops[1]) bucket.ops = bucket.ops[2:] } else { @@ -106,14 +104,14 @@ func (b *RandBuckets) GetOperator() []*operator.Operator { return nil } -// WaitingOperatorStatus is used to limit the count of each kind of operators. -type WaitingOperatorStatus struct { +// waitingOperatorStatus is used to limit the count of each kind of operators. +type waitingOperatorStatus struct { ops map[string]uint64 } -// NewWaitingOperatorStatus creates a new WaitingOperatorStatus. -func NewWaitingOperatorStatus() *WaitingOperatorStatus { - return &WaitingOperatorStatus{ +// newWaitingOperatorStatus creates a new waitingOperatorStatus. +func newWaitingOperatorStatus() *waitingOperatorStatus { + return &waitingOperatorStatus{ make(map[string]uint64), } } diff --git a/pkg/schedule/waiting_operator_test.go b/pkg/schedule/operator/waiting_operator_test.go similarity index 65% rename from pkg/schedule/waiting_operator_test.go rename to pkg/schedule/operator/waiting_operator_test.go index 4dec91efe41..897f416cf38 100644 --- a/pkg/schedule/waiting_operator_test.go +++ b/pkg/schedule/operator/waiting_operator_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package operator import ( "testing" @@ -20,14 +20,13 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core/constant" - "github.com/tikv/pd/pkg/schedule/operator" ) func TestRandBuckets(t *testing.T) { re := require.New(t) - rb := NewRandBuckets() + rb := newRandBuckets() addOperators(rb) - for i := 0; i < len(PriorityWeight); i++ { + for i := 0; i < len(priorityWeight); i++ { op := rb.GetOperator() re.NotNil(op) } @@ -35,23 +34,23 @@ func TestRandBuckets(t *testing.T) { } func addOperators(wop WaitingOperator) { - op := operator.NewTestOperator(uint64(1), &metapb.RegionEpoch{}, operator.OpRegion, []operator.OpStep{ - operator.RemovePeer{FromStore: uint64(1)}, + op := NewTestOperator(uint64(1), &metapb.RegionEpoch{}, OpRegion, []OpStep{ + RemovePeer{FromStore: uint64(1)}, }...) op.SetPriorityLevel(constant.Medium) wop.PutOperator(op) - op = operator.NewTestOperator(uint64(2), &metapb.RegionEpoch{}, operator.OpRegion, []operator.OpStep{ - operator.RemovePeer{FromStore: uint64(2)}, + op = NewTestOperator(uint64(2), &metapb.RegionEpoch{}, OpRegion, []OpStep{ + RemovePeer{FromStore: uint64(2)}, }...) op.SetPriorityLevel(constant.High) wop.PutOperator(op) - op = operator.NewTestOperator(uint64(3), &metapb.RegionEpoch{}, operator.OpRegion, []operator.OpStep{ - operator.RemovePeer{FromStore: uint64(3)}, + op = NewTestOperator(uint64(3), &metapb.RegionEpoch{}, OpRegion, []OpStep{ + RemovePeer{FromStore: uint64(3)}, }...) op.SetPriorityLevel(constant.Low) wop.PutOperator(op) - op = operator.NewTestOperator(uint64(4), &metapb.RegionEpoch{}, operator.OpRegion, []operator.OpStep{ - operator.RemovePeer{FromStore: uint64(4)}, + op = NewTestOperator(uint64(4), &metapb.RegionEpoch{}, OpRegion, []OpStep{ + RemovePeer{FromStore: uint64(4)}, }...) op.SetPriorityLevel(constant.Urgent) wop.PutOperator(op) @@ -59,20 +58,20 @@ func addOperators(wop WaitingOperator) { func TestListOperator(t *testing.T) { re := require.New(t) - rb := NewRandBuckets() + rb := newRandBuckets() addOperators(rb) - re.Len(rb.ListOperator(), len(PriorityWeight)) + re.Len(rb.ListOperator(), len(priorityWeight)) } func TestRandomBucketsWithMergeRegion(t *testing.T) { re := require.New(t) - rb := NewRandBuckets() + rb := newRandBuckets() descs := []string{"merge-region", "admin-merge-region", "random-merge"} for j := 0; j < 100; j++ { // adds operators desc := descs[j%3] - op := operator.NewTestOperator(uint64(1), &metapb.RegionEpoch{}, operator.OpRegion|operator.OpMerge, []operator.OpStep{ - operator.MergeRegion{ + op := NewTestOperator(uint64(1), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OpStep{ + MergeRegion{ FromRegion: &metapb.Region{ Id: 1, StartKey: []byte{}, @@ -87,8 +86,8 @@ func TestRandomBucketsWithMergeRegion(t *testing.T) { }...) op.SetDesc(desc) rb.PutOperator(op) - op = operator.NewTestOperator(uint64(2), &metapb.RegionEpoch{}, operator.OpRegion|operator.OpMerge, []operator.OpStep{ - operator.MergeRegion{ + op = NewTestOperator(uint64(2), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OpStep{ + MergeRegion{ FromRegion: &metapb.Region{ Id: 1, StartKey: []byte{}, @@ -103,8 +102,8 @@ func TestRandomBucketsWithMergeRegion(t *testing.T) { }...) op.SetDesc(desc) rb.PutOperator(op) - op = operator.NewTestOperator(uint64(3), &metapb.RegionEpoch{}, operator.OpRegion, []operator.OpStep{ - operator.RemovePeer{FromStore: uint64(3)}, + op = NewTestOperator(uint64(3), &metapb.RegionEpoch{}, OpRegion, []OpStep{ + RemovePeer{FromStore: uint64(3)}, }...) op.SetDesc("testOperatorHigh") op.SetPriorityLevel(constant.High) diff --git a/pkg/schedule/region_scatterer.go b/pkg/schedule/region_scatterer.go index 090cf8dfce8..ed6acb3f478 100644 --- a/pkg/schedule/region_scatterer.go +++ b/pkg/schedule/region_scatterer.go @@ -135,12 +135,12 @@ type RegionScatterer struct { cluster sche.ClusterInformer ordinaryEngine engineContext specialEngines sync.Map - opController *OperatorController + opController *operator.Controller } // NewRegionScatterer creates a region scatterer. // RegionScatter is used for the `Lightning`, it will scatter the specified regions before import data. -func NewRegionScatterer(ctx context.Context, cluster sche.ClusterInformer, opController *OperatorController) *RegionScatterer { +func NewRegionScatterer(ctx context.Context, cluster sche.ClusterInformer, opController *operator.Controller) *RegionScatterer { return &RegionScatterer{ ctx: ctx, name: regionScatterName, @@ -261,7 +261,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa continue } failpoint.Inject("scatterHbStreamsDrain", func() { - r.opController.hbStreams.Drain(1) + r.opController.GetHBStreams().Drain(1) r.opController.RemoveOperator(op) }) } diff --git a/pkg/schedule/region_scatterer_test.go b/pkg/schedule/region_scatterer_test.go index a602598b4b9..ad0031f9cdf 100644 --- a/pkg/schedule/region_scatterer_test.go +++ b/pkg/schedule/region_scatterer_test.go @@ -91,7 +91,7 @@ func scatter(re *require.Assertions, numStores, numRegions uint64, useRules bool opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) // Add ordinary stores. @@ -110,7 +110,7 @@ func scatter(re *require.Assertions, numStores, numRegions uint64, useRules bool region := tc.GetRegion(i) if op, _ := scatterer.Scatter(region, ""); op != nil { checkOperator(re, op) - ApplyOperator(tc, op) + operator.ApplyOperator(tc, op) } } @@ -148,7 +148,7 @@ func scatterSpecial(re *require.Assertions, numOrdinaryStores, numSpecialStores, opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) // Add ordinary stores. @@ -180,7 +180,7 @@ func scatterSpecial(re *require.Assertions, numOrdinaryStores, numSpecialStores, region := tc.GetRegion(i) if op, _ := scatterer.Scatter(region, ""); op != nil { checkOperator(re, op) - ApplyOperator(tc, op) + operator.ApplyOperator(tc, op) } } @@ -226,7 +226,7 @@ func TestStoreLimit(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add stores 1~6. for i := uint64(1); i <= 5; i++ { @@ -258,7 +258,7 @@ func TestScatterCheck(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -307,7 +307,7 @@ func TestSomeStoresFilteredScatterGroupInConcurrency(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 5 connected stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -352,7 +352,7 @@ func TestScatterGroupInConcurrency(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -424,7 +424,7 @@ func TestScatterForManyRegion(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 60 stores. for i := uint64(1); i <= 60; i++ { tc.AddRegionStore(i, 0) @@ -452,7 +452,7 @@ func TestScattersGroup(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -541,7 +541,7 @@ func TestRegionFromDifferentGroups(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 6 stores. storeCount := 6 for i := uint64(1); i <= uint64(storeCount); i++ { @@ -577,7 +577,7 @@ func TestRegionHasLearner(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 8 stores. voterCount := uint64(6) storeCount := uint64(8) @@ -665,7 +665,7 @@ func TestSelectedStoresTooFewPeers(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 4 stores. for i := uint64(1); i <= 4; i++ { tc.AddRegionStore(i, 0) @@ -702,7 +702,7 @@ func TestSelectedStoresTooManyPeers(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 4 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -740,7 +740,7 @@ func TestBalanceRegion(t *testing.T) { opt.SetLocationLabels([]string{"host"}) tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) // Add 6 stores in 3 hosts. for i := uint64(2); i <= 7; i++ { tc.AddLabelsStore(i, 0, map[string]string{"host": strconv.FormatUint(i/2, 10)}) diff --git a/pkg/schedule/region_splitter.go b/pkg/schedule/region_splitter.go index 8cf5402dbb0..6af588ee4c0 100644 --- a/pkg/schedule/region_splitter.go +++ b/pkg/schedule/region_splitter.go @@ -45,7 +45,7 @@ type SplitRegionsHandler interface { } // NewSplitRegionsHandler return SplitRegionsHandler -func NewSplitRegionsHandler(cluster sche.ClusterInformer, oc *OperatorController) SplitRegionsHandler { +func NewSplitRegionsHandler(cluster sche.ClusterInformer, oc *operator.Controller) SplitRegionsHandler { return &splitRegionsHandler{ cluster: cluster, oc: oc, @@ -179,7 +179,7 @@ func (r *RegionSplitter) checkRegionValid(region *core.RegionInfo) bool { type splitRegionsHandler struct { cluster sche.ClusterInformer - oc *OperatorController + oc *operator.Controller } func (h *splitRegionsHandler) SplitRegionByKeys(region *core.RegionInfo, splitKeys [][]byte) error { diff --git a/pkg/schedule/scheduler.go b/pkg/schedule/scheduler.go index 5a857ff3583..7d1f821d77e 100644 --- a/pkg/schedule/scheduler.go +++ b/pkg/schedule/scheduler.go @@ -89,7 +89,7 @@ func ConfigSliceDecoder(name string, args []string) ConfigDecoder { } // CreateSchedulerFunc is for creating scheduler. -type CreateSchedulerFunc func(opController *OperatorController, storage endpoint.ConfigStorage, dec ConfigDecoder) (Scheduler, error) +type CreateSchedulerFunc func(opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder) (Scheduler, error) var schedulerMap = make(map[string]CreateSchedulerFunc) var schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder) @@ -114,7 +114,7 @@ func RegisterSliceDecoderBuilder(typ string, builder ConfigSliceDecoderBuilder) } // CreateScheduler creates a scheduler with registered creator func. -func CreateScheduler(typ string, opController *OperatorController, storage endpoint.ConfigStorage, dec ConfigDecoder) (Scheduler, error) { +func CreateScheduler(typ string, opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder) (Scheduler, error) { fn, ok := schedulerMap[typ] if !ok { return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) diff --git a/pkg/schedule/schedulers/balance_benchmark_test.go b/pkg/schedule/schedulers/balance_benchmark_test.go index 30691b0188b..b52b9b493a8 100644 --- a/pkg/schedule/schedulers/balance_benchmark_test.go +++ b/pkg/schedule/schedulers/balance_benchmark_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/mock/mockconfig" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/plan" @@ -42,12 +41,12 @@ var ( // newBenchCluster store region count is same with storeID and // the tolerate define storeCount that store can elect candidate but not should balance // so the case bench the worst scene -func newBenchCluster(ruleEnable, labelEnable bool, tombstoneEnable bool) (context.CancelFunc, *mockcluster.Cluster, *schedule.OperatorController) { +func newBenchCluster(ruleEnable, labelEnable bool, tombstoneEnable bool) (context.CancelFunc, *mockcluster.Cluster, *operator.Controller) { Register() ctx, cancel := context.WithCancel(context.Background()) opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) - oc := schedule.NewOperatorController(ctx, tc, nil) + oc := operator.NewController(ctx, tc, nil) opt.GetScheduleConfig().TolerantSizeRatio = float64(storeCount) opt.SetPlacementRuleEnabled(ruleEnable) @@ -91,12 +90,12 @@ func newBenchCluster(ruleEnable, labelEnable bool, tombstoneEnable bool) (contex return cancel, tc, oc } -func newBenchBigCluster(storeNumInOneRack, regionNum int) (context.CancelFunc, *mockcluster.Cluster, *schedule.OperatorController) { +func newBenchBigCluster(storeNumInOneRack, regionNum int) (context.CancelFunc, *mockcluster.Cluster, *operator.Controller) { Register() ctx, cancel := context.WithCancel(context.Background()) opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) - oc := schedule.NewOperatorController(ctx, tc, nil) + oc := operator.NewController(ctx, tc, nil) opt.GetScheduleConfig().TolerantSizeRatio = float64(storeCount) opt.SetPlacementRuleEnabled(true) diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 3b8244a9a3e..7bb62de6e2c 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -162,7 +162,7 @@ type balanceLeaderScheduler struct { name string conf *balanceLeaderSchedulerConfig handler http.Handler - opController *schedule.OperatorController + opController *operator.Controller filters []filter.Filter counter *prometheus.CounterVec filterCounter *filter.Counter @@ -170,7 +170,7 @@ type balanceLeaderScheduler struct { // newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on // each store balanced. -func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) schedule.Scheduler { +func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) schedule.Scheduler { base := NewBaseScheduler(opController) s := &balanceLeaderScheduler{ BaseScheduler: base, @@ -422,7 +422,7 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s storesIDs := candidate.binarySearchStores(plan.source, plan.target) candidateUpdateStores[id] = storesIDs } - schedule.AddOpInfluence(op, plan.opInfluence, plan.ClusterInformer) + operator.AddOpInfluence(op, plan.opInfluence, plan.ClusterInformer) for id, candidate := range candidates { for _, pos := range candidateUpdateStores[id] { candidate.resortStoreWithPos(pos) diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 745ed402e68..e69d47a6899 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -59,7 +59,7 @@ type balanceRegionScheduler struct { *BaseScheduler *retryQuota conf *balanceRegionSchedulerConfig - opController *schedule.OperatorController + opController *operator.Controller filters []filter.Filter counter *prometheus.CounterVec filterCounter *filter.Counter @@ -67,7 +67,7 @@ type balanceRegionScheduler struct { // newBalanceRegionScheduler creates a scheduler that tends to keep regions on // each store balanced. -func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) schedule.Scheduler { +func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) schedule.Scheduler { base := NewBaseScheduler(opController) scheduler := &balanceRegionScheduler{ BaseScheduler: base, diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index e8c9e084114..7de38a6158f 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -229,7 +229,7 @@ type balanceLeaderSchedulerTestSuite struct { cancel context.CancelFunc tc *mockcluster.Cluster lb schedule.Scheduler - oc *schedule.OperatorController + oc *operator.Controller conf config.Config } @@ -545,7 +545,7 @@ type balanceLeaderRangeSchedulerTestSuite struct { suite.Suite cancel context.CancelFunc tc *mockcluster.Cluster - oc *schedule.OperatorController + oc *operator.Controller } func TestBalanceLeaderRangeSchedulerTestSuite(t *testing.T) { @@ -1465,6 +1465,6 @@ func scheduleAndApplyOperator(tc *mockcluster.Cluster, hb schedule.Scheduler, co limit++ continue } - schedule.ApplyOperator(tc, ops[0]) + operator.ApplyOperator(tc, ops[0]) } } diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index edbca9f5c0d..730122416d6 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -147,7 +147,7 @@ type balanceWitnessScheduler struct { name string conf *balanceWitnessSchedulerConfig handler http.Handler - opController *schedule.OperatorController + opController *operator.Controller filters []filter.Filter counter *prometheus.CounterVec filterCounter *filter.Counter @@ -155,7 +155,7 @@ type balanceWitnessScheduler struct { // newBalanceWitnessScheduler creates a scheduler that tends to keep witnesses on // each store balanced. -func newBalanceWitnessScheduler(opController *schedule.OperatorController, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) schedule.Scheduler { +func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) schedule.Scheduler { base := NewBaseScheduler(opController) s := &balanceWitnessScheduler{ BaseScheduler: base, diff --git a/pkg/schedule/schedulers/balance_witness_test.go b/pkg/schedule/schedulers/balance_witness_test.go index d1342c3dc97..d4508b31035 100644 --- a/pkg/schedule/schedulers/balance_witness_test.go +++ b/pkg/schedule/schedulers/balance_witness_test.go @@ -36,7 +36,7 @@ type balanceWitnessSchedulerTestSuite struct { cancel context.CancelFunc tc *mockcluster.Cluster lb schedule.Scheduler - oc *schedule.OperatorController + oc *operator.Controller conf config.Config } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index c005e1346f2..cea51cd0955 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -23,6 +23,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/typeutil" ) @@ -60,11 +61,11 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth // BaseScheduler is a basic scheduler for all other complex scheduler type BaseScheduler struct { - OpController *schedule.OperatorController + OpController *operator.Controller } // NewBaseScheduler returns a basic scheduler -func NewBaseScheduler(opController *schedule.OperatorController) *BaseScheduler { +func NewBaseScheduler(opController *operator.Controller) *BaseScheduler { return &BaseScheduler{OpController: opController} } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 8b3cba3d2de..c3e0fbc34da 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -171,7 +171,7 @@ type evictLeaderScheduler struct { // newEvictLeaderScheduler creates an admin scheduler that transfers all leaders // out of a store. -func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler { +func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) schedule.Scheduler { base := NewBaseScheduler(opController) handler := newEvictLeaderHandler(conf) return &evictLeaderScheduler{ diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 6fac1704bf0..c7da1183afa 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -211,7 +211,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.ClusterInformer, dryRun } // newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. -func newEvictSlowStoreScheduler(opController *schedule.OperatorController, conf *evictSlowStoreSchedulerConfig) schedule.Scheduler { +func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) schedule.Scheduler { base := NewBaseScheduler(opController) s := &evictSlowStoreScheduler{ diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index b0989246f3f..88a4864a804 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -36,7 +36,7 @@ type evictSlowStoreTestSuite struct { tc *mockcluster.Cluster es schedule.Scheduler bs schedule.Scheduler - oc *schedule.OperatorController + oc *operator.Controller } func TestEvictSlowStoreTestSuite(t *testing.T) { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 14d35c21a24..d17972ef091 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -264,7 +264,7 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.ClusterInformer, dryRun return s.scheduleEvictLeader(cluster), nil } -func newEvictSlowTrendScheduler(opController *schedule.OperatorController, conf *evictSlowTrendSchedulerConfig) schedule.Scheduler { +func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) schedule.Scheduler { return &evictSlowTrendScheduler{ BaseScheduler: NewBaseScheduler(opController), conf: conf, diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index 0c96170e0c9..f8f5f6dbd68 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -36,7 +36,7 @@ type evictSlowTrendTestSuite struct { tc *mockcluster.Cluster es schedule.Scheduler bs schedule.Scheduler - oc *schedule.OperatorController + oc *operator.Controller } func TestEvictSlowTrendTestSuite(t *testing.T) { diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index e5fa152bed7..f39876d825e 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -127,7 +127,7 @@ type grantHotRegionScheduler struct { } // newGrantHotRegionScheduler creates an admin scheduler that transfers hot region peer to fixed store and hot region leader to one store. -func newGrantHotRegionScheduler(opController *schedule.OperatorController, conf *grantHotRegionSchedulerConfig) *grantHotRegionScheduler { +func newGrantHotRegionScheduler(opController *operator.Controller, conf *grantHotRegionSchedulerConfig) *grantHotRegionScheduler { base := newBaseHotScheduler(opController) handler := newGrantHotRegionHandler(conf) ret := &grantHotRegionScheduler{ diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index dab40d174cc..8ec8d3dd891 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -152,7 +152,7 @@ type grantLeaderScheduler struct { // newGrantLeaderScheduler creates an admin scheduler that transfers all leaders // to a store. -func newGrantLeaderScheduler(opController *schedule.OperatorController, conf *grantLeaderSchedulerConfig) schedule.Scheduler { +func newGrantLeaderScheduler(opController *operator.Controller, conf *grantLeaderSchedulerConfig) schedule.Scheduler { base := NewBaseScheduler(opController) handler := newGrantLeaderHandler(conf) return &grantLeaderScheduler{ diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index c4e2c3d90c7..b4402b2c079 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -102,7 +101,7 @@ type baseHotScheduler struct { updateWriteTime time.Time } -func newBaseHotScheduler(opController *schedule.OperatorController) *baseHotScheduler { +func newBaseHotScheduler(opController *operator.Controller) *baseHotScheduler { base := NewBaseScheduler(opController) ret := &baseHotScheduler{ BaseScheduler: base, @@ -222,7 +221,7 @@ type hotScheduler struct { searchRevertRegions [resourceTypeLen]bool // Whether to search revert regions. } -func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler { +func newHotScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { base := newBaseHotScheduler(opController) ret := &hotScheduler{ name: HotRegionName, diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index abd1a272478..d3292aaf6d8 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -40,23 +40,23 @@ import ( func init() { schedulePeerPr = 1.0 - schedule.RegisterScheduler(statistics.Write.String(), func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(statistics.Write.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { cfg := initHotRegionScheduleConfig() return newHotWriteScheduler(opController, cfg), nil }) - schedule.RegisterScheduler(statistics.Read.String(), func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(statistics.Read.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { return newHotReadScheduler(opController, initHotRegionScheduleConfig()), nil }) } -func newHotReadScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler { +func newHotReadScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { ret := newHotScheduler(opController, conf) ret.name = "" ret.types = []statistics.RWType{statistics.Read} return ret } -func newHotWriteScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler { +func newHotWriteScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler { ret := newHotScheduler(opController, conf) ret.name = "" ret.types = []statistics.RWType{statistics.Write} diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 78a35549cd2..7205d034f0a 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -22,6 +22,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/storage/endpoint" ) @@ -52,7 +53,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(BalanceLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(BalanceLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &balanceLeaderSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -80,7 +81,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(BalanceRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(BalanceRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &balanceRegionSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -105,7 +106,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(BalanceWitnessType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(BalanceWitnessType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &balanceWitnessSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -141,7 +142,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -157,7 +158,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(EvictSlowStoreType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} if err := decoder(conf); err != nil { return nil, err @@ -196,7 +197,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(GrantHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(GrantHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage} conf.cluster = opController.GetCluster() if err := decoder(conf); err != nil { @@ -212,7 +213,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(HotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := initHotRegionScheduleConfig() var data map[string]interface{} if err := decoder(&data); err != nil { @@ -257,7 +258,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(GrantLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(GrantLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} conf.cluster = opController.GetCluster() if err := decoder(conf); err != nil { @@ -283,7 +284,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(LabelType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(LabelType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &labelSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -308,7 +309,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(RandomMergeType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(RandomMergeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &randomMergeSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -337,7 +338,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ScatterRangeType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(ScatterRangeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &scatterRangeSchedulerConfig{ storage: storage, } @@ -371,7 +372,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ShuffleHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(ShuffleHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &shuffleHotRegionSchedulerConfig{Limit: uint64(1)} if err := decoder(conf); err != nil { return nil, err @@ -396,7 +397,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ShuffleLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(ShuffleLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &shuffleLeaderSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -421,7 +422,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ShuffleRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(ShuffleRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &shuffleRegionSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -436,7 +437,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(SplitBucketType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(SplitBucketType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := initSplitBucketConfig() if err := decoder(conf); err != nil { return nil, err @@ -452,7 +453,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(TransferWitnessLeaderType, func(opController *schedule.OperatorController, _ endpoint.ConfigStorage, _ schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(TransferWitnessLeaderType, func(opController *operator.Controller, _ endpoint.ConfigStorage, _ schedule.ConfigDecoder) (schedule.Scheduler, error) { return newTransferWitnessLeaderScheduler(opController), nil }) @@ -463,7 +464,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(EvictSlowTrendType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: 0} if err := decoder(conf); err != nil { return nil, err diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 2642c889adc..f71883a0d07 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -57,7 +57,7 @@ type labelScheduler struct { // LabelScheduler is mainly based on the store's label information for scheduling. // Now only used for reject leader schedule, that will move the leader out of // the store with the specific label. -func newLabelScheduler(opController *schedule.OperatorController, conf *labelSchedulerConfig) schedule.Scheduler { +func newLabelScheduler(opController *operator.Controller, conf *labelSchedulerConfig) schedule.Scheduler { return &labelScheduler{ BaseScheduler: NewBaseScheduler(opController), conf: conf, diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index cbf348a4e46..04397d396eb 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -58,7 +58,7 @@ type randomMergeScheduler struct { // newRandomMergeScheduler creates an admin scheduler that randomly picks two adjacent regions // then merges them. -func newRandomMergeScheduler(opController *schedule.OperatorController, conf *randomMergeSchedulerConfig) schedule.Scheduler { +func newRandomMergeScheduler(opController *operator.Controller, conf *randomMergeSchedulerConfig) schedule.Scheduler { base := NewBaseScheduler(opController) return &randomMergeScheduler{ BaseScheduler: base, diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index fef250fac29..8a2c1e942d1 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -125,7 +125,7 @@ type scatterRangeScheduler struct { } // newScatterRangeScheduler creates a scheduler that balances the distribution of leaders and regions that in the specified key range. -func newScatterRangeScheduler(opController *schedule.OperatorController, config *scatterRangeSchedulerConfig) schedule.Scheduler { +func newScatterRangeScheduler(opController *operator.Controller, config *scatterRangeSchedulerConfig) schedule.Scheduler { base := NewBaseScheduler(opController) name := config.getSchedulerName() diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 4b17751c3a5..1dec83dbf1d 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -35,7 +35,7 @@ import ( "github.com/tikv/pd/pkg/versioninfo" ) -func prepareSchedulersTest(needToRunStream ...bool) (context.CancelFunc, config.Config, *mockcluster.Cluster, *schedule.OperatorController) { +func prepareSchedulersTest(needToRunStream ...bool) (context.CancelFunc, config.Config, *mockcluster.Cluster, *operator.Controller) { Register() ctx, cancel := context.WithCancel(context.Background()) opt := mockconfig.NewTestOptions() @@ -46,7 +46,7 @@ func prepareSchedulersTest(needToRunStream ...bool) (context.CancelFunc, config. } else { stream = hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, needToRunStream[0]) } - oc := schedule.NewOperatorController(ctx, tc, stream) + oc := operator.NewController(ctx, tc, stream) return cancel, opt, tc, oc } diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 75525ae25e0..97cb18471ed 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -57,7 +57,7 @@ type shuffleHotRegionScheduler struct { } // newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions -func newShuffleHotRegionScheduler(opController *schedule.OperatorController, conf *shuffleHotRegionSchedulerConfig) schedule.Scheduler { +func newShuffleHotRegionScheduler(opController *operator.Controller, conf *shuffleHotRegionSchedulerConfig) schedule.Scheduler { base := newBaseHotScheduler(opController) ret := &shuffleHotRegionScheduler{ baseHotScheduler: base, diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index f3c0b16bde1..e2ecd40ccae 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -54,7 +54,7 @@ type shuffleLeaderScheduler struct { // newShuffleLeaderScheduler creates an admin scheduler that shuffles leaders // between stores. -func newShuffleLeaderScheduler(opController *schedule.OperatorController, conf *shuffleLeaderSchedulerConfig) schedule.Scheduler { +func newShuffleLeaderScheduler(opController *operator.Controller, conf *shuffleLeaderSchedulerConfig) schedule.Scheduler { filters := []filter.Filter{ &filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true, OperatorLevel: constant.Low}, filter.NewSpecialUseFilter(conf.Name), diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index 31f62be4414..f964e13de50 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -52,7 +52,7 @@ type shuffleRegionScheduler struct { // newShuffleRegionScheduler creates an admin scheduler that shuffles regions // between stores. -func newShuffleRegionScheduler(opController *schedule.OperatorController, conf *shuffleRegionSchedulerConfig) schedule.Scheduler { +func newShuffleRegionScheduler(opController *operator.Controller, conf *shuffleRegionSchedulerConfig) schedule.Scheduler { filters := []filter.Filter{ &filter.StoreStateFilter{ActionScope: ShuffleRegionName, MoveRegion: true, OperatorLevel: constant.Low}, filter.NewSpecialUseFilter(ShuffleRegionName), diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 69881887623..6595433d148 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -140,7 +140,7 @@ func newSplitBucketHandler(conf *splitBucketSchedulerConfig) http.Handler { return router } -func newSplitBucketScheduler(opController *schedule.OperatorController, conf *splitBucketSchedulerConfig) *splitBucketScheduler { +func newSplitBucketScheduler(opController *operator.Controller, conf *splitBucketSchedulerConfig) *splitBucketScheduler { base := NewBaseScheduler(opController) handler := newSplitBucketHandler(conf) ret := &splitBucketScheduler{ diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index c66d467de42..e3792bd09b1 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -53,7 +53,7 @@ type trasferWitnessLeaderScheduler struct { } // newTransferWitnessLeaderScheduler creates an admin scheduler that transfers witness leader of a region. -func newTransferWitnessLeaderScheduler(opController *schedule.OperatorController) schedule.Scheduler { +func newTransferWitnessLeaderScheduler(opController *operator.Controller) schedule.Scheduler { return &trasferWitnessLeaderScheduler{ BaseScheduler: NewBaseScheduler(opController), regions: make(chan *core.RegionInfo, transferWitnessLeaderRecvMaxRegionSize), diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index ee8f0ffed81..81f4411fbf7 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -69,7 +69,7 @@ func init() { } }) - schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedule.RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := &evictLeaderSchedulerConfig{StoreIDWitRanges: make(map[uint64][]core.KeyRange), storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -159,7 +159,7 @@ type evictLeaderScheduler struct { // newEvictLeaderScheduler creates an admin scheduler that transfers all leaders // out of a store. -func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler { +func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) schedule.Scheduler { base := schedulers.NewBaseScheduler(opController) handler := newEvictLeaderHandler(conf) return &evictLeaderScheduler{ diff --git a/server/api/operator.go b/server/api/operator.go index c166fbdff9d..6645a601fb0 100644 --- a/server/api/operator.go +++ b/server/api/operator.go @@ -44,7 +44,7 @@ func newOperatorHandler(handler *server.Handler, r *render.Render) *operatorHand // @Summary Get a Region's pending operator. // @Param region_id path int true "A Region's Id" // @Produce json -// @Success 200 {object} schedule.OperatorWithStatus +// @Success 200 {object} operator.OpWithStatus // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /operators/{region_id} [get] diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e99647520aa..a373c4c9053 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -43,6 +43,7 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/slice" @@ -644,7 +645,7 @@ func (c *RaftCluster) GetCoordinator() *coordinator { } // GetOperatorController returns the operator controller. -func (c *RaftCluster) GetOperatorController() *schedule.OperatorController { +func (c *RaftCluster) GetOperatorController() *operator.Controller { return c.coordinator.opController } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index fd0acfe7466..edae92fe494 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -37,7 +37,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return err } - c.coordinator.opController.Dispatch(region, schedule.DispatchFromHeartBeat) + c.coordinator.opController.Dispatch(region, operator.DispatchFromHeartBeat) return nil } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index ee7c90d0af5..dd3056fad30 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -77,7 +77,7 @@ type coordinator struct { regionScatterer *schedule.RegionScatterer regionSplitter *schedule.RegionSplitter schedulers map[string]*scheduleController - opController *schedule.OperatorController + opController *operator.Controller hbStreams *hbstream.HeartbeatStreams pluginInterface *schedule.PluginInterface diagnosticManager *diagnosticManager @@ -86,7 +86,7 @@ type coordinator struct { // newCoordinator creates a new coordinator. func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstream.HeartbeatStreams) *coordinator { ctx, cancel := context.WithCancel(ctx) - opController := schedule.NewOperatorController(ctx, cluster, hbStreams) + opController := operator.NewController(ctx, cluster, hbStreams) schedulers := make(map[string]*scheduleController) return &coordinator{ ctx: ctx, @@ -290,7 +290,7 @@ func (c *coordinator) drivePushOperator() { defer c.wg.Done() log.Info("coordinator begins to actively drive push operator") - ticker := time.NewTicker(schedule.PushOperatorTickInterval) + ticker := time.NewTicker(operator.PushOperatorTickInterval) defer ticker.Stop() for { select { @@ -682,7 +682,7 @@ func (c *coordinator) removeOptScheduler(o *config.PersistOptions, name string) for i, schedulerCfg := range v.Schedulers { // To create a temporary scheduler is just used to get scheduler's name decoder := schedule.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args) - tmp, err := schedule.CreateScheduler(schedulerCfg.Type, schedule.NewOperatorController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder) + tmp, err := schedule.CreateScheduler(schedulerCfg.Type, operator.NewController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder) if err != nil { return err } @@ -855,7 +855,7 @@ func (c *coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error type scheduleController struct { schedule.Scheduler cluster *RaftCluster - opController *schedule.OperatorController + opController *operator.Controller nextInterval time.Duration ctx context.Context cancel context.CancelFunc diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index fd4337750f0..16fcac79ce3 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -237,7 +237,7 @@ func dispatchHeartbeat(co *coordinator, region *core.RegionInfo, stream hbstream if err := co.cluster.putRegion(region.Clone()); err != nil { return err } - co.opController.Dispatch(region, schedule.DispatchFromHeartBeat) + co.opController.Dispatch(region, operator.DispatchFromHeartBeat) return nil } @@ -1268,7 +1268,7 @@ func TestDownStoreLimit(t *testing.T) { type mockLimitScheduler struct { schedule.Scheduler limit uint64 - counter *schedule.OperatorController + counter *operator.Controller kind operator.OpKind } diff --git a/server/handler.go b/server/handler.go index bc427fa90de..7366047e25d 100644 --- a/server/handler.go +++ b/server/handler.go @@ -100,7 +100,7 @@ func (h *Handler) GetRaftCluster() (*cluster.RaftCluster, error) { } // GetOperatorController returns OperatorController. -func (h *Handler) GetOperatorController() (*schedule.OperatorController, error) { +func (h *Handler) GetOperatorController() (*operator.Controller, error) { rc := h.s.GetRaftCluster() if rc == nil { return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() @@ -393,7 +393,7 @@ func (h *Handler) GetOperator(regionID uint64) (*operator.Operator, error) { } // GetOperatorStatus returns the status of the region operator. -func (h *Handler) GetOperatorStatus(regionID uint64) (*schedule.OperatorWithStatus, error) { +func (h *Handler) GetOperatorStatus(regionID uint64) (*operator.OpWithStatus, error) { c, err := h.GetOperatorController() if err != nil { return nil, err