diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 4ce88be652f..122c5d68fec 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -112,9 +112,6 @@ func (mc *Cluster) GetPersistOptions() *config.PersistOptions { // UpdateRegionsLabelLevelStats updates the label level stats for the regions. func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} -// CheckSchedulingAllowance checks if the cluster allows scheduling currently. -func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil } - // LoadRegion puts region info without leader func (mc *Cluster) LoadRegion(regionID uint64, peerStoreIDs ...uint64) { // regions load from etcd will have no leader diff --git a/pkg/schedule/checker/merge_checker.go b/pkg/schedule/checker/merge_checker.go index b84a3b09d11..728583fc845 100644 --- a/pkg/schedule/checker/merge_checker.go +++ b/pkg/schedule/checker/merge_checker.go @@ -76,14 +76,14 @@ var ( // MergeChecker ensures region to merge with adjacent region when size is small type MergeChecker struct { PauseController - cluster sche.ClusterInformer + cluster sche.ScheduleCluster conf config.Config splitCache *cache.TTLUint64 startTime time.Time // it's used to judge whether server recently start. } // NewMergeChecker creates a merge checker. -func NewMergeChecker(ctx context.Context, cluster sche.ClusterInformer, conf config.Config) *MergeChecker { +func NewMergeChecker(ctx context.Context, cluster sche.ScheduleCluster, conf config.Config) *MergeChecker { splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval()) return &MergeChecker{ cluster: cluster, @@ -250,7 +250,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { } // AllowMerge returns true if two regions can be merged according to the key type. -func AllowMerge(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool { +func AllowMerge(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo) bool { var start, end []byte if bytes.Equal(region.GetEndKey(), adjacent.GetStartKey()) && len(region.GetEndKey()) != 0 { start, end = region.GetStartKey(), adjacent.GetEndKey() @@ -306,7 +306,7 @@ func isTableIDSame(region, adjacent *core.RegionInfo) bool { // Check whether there is a peer of the adjacent region on an offline store, // while the source region has no peer on it. This is to prevent from bringing // any other peer into an offline store to slow down the offline process. -func checkPeerStore(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool { +func checkPeerStore(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo) bool { regionStoreIDs := region.GetStoreIDs() for _, peer := range adjacent.GetPeers() { storeID := peer.GetStoreId() diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index f63aae8d2dd..a1fa70875c9 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -29,6 +29,8 @@ func IsSchedulerRegistered(name string) bool { // Config is the interface that wraps the Config related methods. type Config interface { + IsSchedulingHalted() bool + GetReplicaScheduleLimit() uint64 GetRegionScheduleLimit() uint64 GetMergeScheduleLimit() uint64 diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 55d565a11bc..a14a0ff556a 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -960,7 +960,7 @@ func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) { // scheduleController is used to manage a scheduler to schedulers. type scheduleController struct { schedulers.Scheduler - cluster sche.ClusterInformer + cluster sche.ScheduleCluster opController *operator.Controller nextInterval time.Duration ctx context.Context @@ -1074,7 +1074,7 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool { } func (s *scheduleController) isSchedulingHalted() bool { - return s.cluster.GetPersistOptions().IsSchedulingHalted() + return s.cluster.GetOpts().IsSchedulingHalted() } // isPaused returns if a scheduler is paused. @@ -1145,7 +1145,7 @@ func (c *Coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) { // cacheCluster include cache info to improve the performance. type cacheCluster struct { - sche.ClusterInformer + sche.ScheduleCluster stores []*core.StoreInfo } @@ -1155,9 +1155,9 @@ func (c *cacheCluster) GetStores() []*core.StoreInfo { } // newCacheCluster constructor for cache -func newCacheCluster(c sche.ClusterInformer) *cacheCluster { +func newCacheCluster(c sche.ScheduleCluster) *cacheCluster { return &cacheCluster{ - ClusterInformer: c, + ScheduleCluster: c, stores: c.GetStores(), } } diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 72e124c501e..658a476766b 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -28,27 +28,28 @@ import ( // ClusterInformer provides the necessary information of a cluster. type ClusterInformer interface { - RegionHealthCluster - statistics.RegionStatInformer - statistics.StoreStatInformer - buckets.BucketStatInformer + ScheduleCluster - GetBasicCluster() *core.BasicCluster - GetStoreConfig() sc.StoreConfig - GetAllocator() id.Allocator - GetRegionLabeler() *labeler.RegionLabeler GetStorage() storage.Storage UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) AddSuspectRegions(ids ...uint64) GetPersistOptions() *config.PersistOptions } -// RegionHealthCluster is an aggregate interface that wraps multiple interfaces -type RegionHealthCluster interface { +// ScheduleCluster is an aggregate interface that wraps multiple interfaces for schedulers use +type ScheduleCluster interface { BasicCluster + statistics.StoreStatInformer + statistics.RegionStatInformer + buckets.BucketStatInformer + GetOpts() sc.Config GetRuleManager() *placement.RuleManager + GetRegionLabeler() *labeler.RegionLabeler + GetBasicCluster() *core.BasicCluster + GetStoreConfig() sc.StoreConfig + GetAllocator() id.Allocator } // BasicCluster is an aggregate interface that wraps multiple interfaces diff --git a/pkg/schedule/filter/healthy.go b/pkg/schedule/filter/healthy.go index ba4f196dc0b..dd017b974ec 100644 --- a/pkg/schedule/filter/healthy.go +++ b/pkg/schedule/filter/healthy.go @@ -42,17 +42,17 @@ func hasDownPeers(region *core.RegionInfo) bool { // IsRegionReplicated checks if a region is fully replicated. When placement // rules is enabled, its peers should fit corresponding rules. When placement // rules is disabled, it should have enough replicas and no any learner peer. -func IsRegionReplicated(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool { +func IsRegionReplicated(cluster sche.ScheduleCluster, region *core.RegionInfo) bool { if cluster.GetOpts().IsPlacementRulesEnabled() { return isRegionPlacementRuleSatisfied(cluster, region) } return isRegionReplicasSatisfied(cluster, region) } -func isRegionPlacementRuleSatisfied(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool { +func isRegionPlacementRuleSatisfied(cluster sche.ScheduleCluster, region *core.RegionInfo) bool { return cluster.GetRuleManager().FitRegion(cluster, region).IsSatisfied() } -func isRegionReplicasSatisfied(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool { +func isRegionReplicasSatisfied(cluster sche.ScheduleCluster, region *core.RegionInfo) bool { return len(region.GetLearners()) == 0 && len(region.GetPeers()) == cluster.GetOpts().GetMaxReplicas() } diff --git a/pkg/schedule/filter/region_filters.go b/pkg/schedule/filter/region_filters.go index e2c35b4e249..bd356976817 100644 --- a/pkg/schedule/filter/region_filters.go +++ b/pkg/schedule/filter/region_filters.go @@ -100,12 +100,12 @@ func (f *regionDownFilter) Select(region *core.RegionInfo) *plan.Status { // RegionReplicatedFilter filters all unreplicated regions. type RegionReplicatedFilter struct { - cluster sche.RegionHealthCluster + cluster sche.ScheduleCluster fit *placement.RegionFit } // NewRegionReplicatedFilter creates a RegionFilter that filters all unreplicated regions. -func NewRegionReplicatedFilter(cluster sche.RegionHealthCluster) RegionFilter { +func NewRegionReplicatedFilter(cluster sche.ScheduleCluster) RegionFilter { return &RegionReplicatedFilter{cluster: cluster} } @@ -132,11 +132,11 @@ func (f *RegionReplicatedFilter) Select(region *core.RegionInfo) *plan.Status { } type regionEmptyFilter struct { - cluster sche.RegionHealthCluster + cluster sche.ScheduleCluster } // NewRegionEmptyFilter returns creates a RegionFilter that filters all empty regions. -func NewRegionEmptyFilter(cluster sche.RegionHealthCluster) RegionFilter { +func NewRegionEmptyFilter(cluster sche.ScheduleCluster) RegionFilter { return ®ionEmptyFilter{cluster: cluster} } @@ -148,7 +148,7 @@ func (f *regionEmptyFilter) Select(region *core.RegionInfo) *plan.Status { } // isEmptyRegionAllowBalance returns true if the region is not empty or the number of regions is too small. -func isEmptyRegionAllowBalance(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool { +func isEmptyRegionAllowBalance(cluster sche.ScheduleCluster, region *core.RegionInfo) bool { return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetTotalRegionCount() < core.InitClusterRegionThreshold } diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index bde464d02ff..2cc5a6e7102 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -40,7 +40,7 @@ import ( // according to various constraints. type Builder struct { // basic info - sche.ClusterInformer + sche.ScheduleCluster desc string regionID uint64 regionEpoch *metapb.RegionEpoch @@ -92,10 +92,10 @@ func SkipPlacementRulesCheck(b *Builder) { } // NewBuilder creates a Builder. -func NewBuilder(desc string, ci sche.ClusterInformer, region *core.RegionInfo, opts ...BuilderOption) *Builder { +func NewBuilder(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, opts ...BuilderOption) *Builder { b := &Builder{ desc: desc, - ClusterInformer: ci, + ScheduleCluster: ci, regionID: region.GetID(), regionEpoch: region.GetRegionEpoch(), approximateSize: region.GetApproximateSize(), diff --git a/pkg/schedule/operator/create_operator.go b/pkg/schedule/operator/create_operator.go index b80bee2fb80..260547b4d2a 100644 --- a/pkg/schedule/operator/create_operator.go +++ b/pkg/schedule/operator/create_operator.go @@ -31,35 +31,35 @@ import ( ) // CreateAddPeerOperator creates an operator that adds a new peer. -func CreateAddPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer, kind OpKind) (*Operator, error) { +func CreateAddPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer, kind OpKind) (*Operator, error) { return NewBuilder(desc, ci, region). AddPeer(peer). Build(kind) } // CreateDemoteVoterOperator creates an operator that demotes a voter -func CreateDemoteVoterOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { +func CreateDemoteVoterOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { return NewBuilder(desc, ci, region). DemoteVoter(peer.GetStoreId()). Build(0) } // CreatePromoteLearnerOperator creates an operator that promotes a learner. -func CreatePromoteLearnerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { +func CreatePromoteLearnerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { return NewBuilder(desc, ci, region). PromoteLearner(peer.GetStoreId()). Build(0) } // CreateRemovePeerOperator creates an operator that removes a peer from region. -func CreateRemovePeerOperator(desc string, ci sche.ClusterInformer, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) { +func CreateRemovePeerOperator(desc string, ci sche.ScheduleCluster, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) { return NewBuilder(desc, ci, region). RemovePeer(storeID). Build(kind) } // CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store. -func CreateTransferLeaderOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) { +func CreateTransferLeaderOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) { return NewBuilder(desc, ci, region, SkipOriginJointStateCheck). SetLeader(targetStoreID). SetLeaders(targetStoreIDs). @@ -67,7 +67,7 @@ func CreateTransferLeaderOperator(desc string, ci sche.ClusterInformer, region * } // CreateForceTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store forcible. -func CreateForceTransferLeaderOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, kind OpKind) (*Operator, error) { +func CreateForceTransferLeaderOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, kind OpKind) (*Operator, error) { return NewBuilder(desc, ci, region, SkipOriginJointStateCheck, SkipPlacementRulesCheck). SetLeader(targetStoreID). EnableForceTargetLeader(). @@ -75,7 +75,7 @@ func CreateForceTransferLeaderOperator(desc string, ci sche.ClusterInformer, reg } // CreateMoveRegionOperator creates an operator that moves a region to specified stores. -func CreateMoveRegionOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, roles map[uint64]placement.PeerRoleType) (*Operator, error) { +func CreateMoveRegionOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, roles map[uint64]placement.PeerRoleType) (*Operator, error) { // construct the peers from roles oldPeers := region.GetPeers() peers := make(map[uint64]*metapb.Peer) @@ -97,7 +97,7 @@ func CreateMoveRegionOperator(desc string, ci sche.ClusterInformer, region *core } // CreateMovePeerOperator creates an operator that replaces an old peer with a new peer. -func CreateMovePeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) { +func CreateMovePeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) { return NewBuilder(desc, ci, region). RemovePeer(oldStore). AddPeer(peer). @@ -105,7 +105,7 @@ func CreateMovePeerOperator(desc string, ci sche.ClusterInformer, region *core.R } // CreateMoveWitnessOperator creates an operator that replaces an old witness with a new witness. -func CreateMoveWitnessOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64) (*Operator, error) { +func CreateMoveWitnessOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64) (*Operator, error) { return NewBuilder(desc, ci, region). BecomeNonWitness(sourceStoreID). BecomeWitness(targetStoreID). @@ -113,7 +113,7 @@ func CreateMoveWitnessOperator(desc string, ci sche.ClusterInformer, region *cor } // CreateReplaceLeaderPeerOperator creates an operator that replaces an old peer with a new peer, and move leader from old store firstly. -func CreateReplaceLeaderPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer, leader *metapb.Peer) (*Operator, error) { +func CreateReplaceLeaderPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer, leader *metapb.Peer) (*Operator, error) { return NewBuilder(desc, ci, region). RemovePeer(oldStore). AddPeer(peer). @@ -122,7 +122,7 @@ func CreateReplaceLeaderPeerOperator(desc string, ci sche.ClusterInformer, regio } // CreateMoveLeaderOperator creates an operator that replaces an old leader with a new leader. -func CreateMoveLeaderOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) { +func CreateMoveLeaderOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) { return NewBuilder(desc, ci, region). RemovePeer(oldStore). AddPeer(peer). @@ -157,7 +157,7 @@ func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind } // CreateMergeRegionOperator creates an operator that merge two region into one. -func CreateMergeRegionOperator(desc string, ci sche.ClusterInformer, source *core.RegionInfo, target *core.RegionInfo, kind OpKind) ([]*Operator, error) { +func CreateMergeRegionOperator(desc string, ci sche.ScheduleCluster, source *core.RegionInfo, target *core.RegionInfo, kind OpKind) ([]*Operator, error) { if core.IsInJointState(source.GetPeers()...) || core.IsInJointState(target.GetPeers()...) { return nil, errors.Errorf("cannot merge regions which are in joint state") } @@ -215,7 +215,7 @@ func isRegionMatch(a, b *core.RegionInfo) bool { } // CreateScatterRegionOperator creates an operator that scatters the specified region. -func CreateScatterRegionOperator(desc string, ci sche.ClusterInformer, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) (*Operator, error) { +func CreateScatterRegionOperator(desc string, ci sche.ScheduleCluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) (*Operator, error) { // randomly pick a leader. var ids []uint64 for id, peer := range targetPeers { @@ -243,7 +243,7 @@ func CreateScatterRegionOperator(desc string, ci sche.ClusterInformer, origin *c const OpDescLeaveJointState = "leave-joint-state" // CreateLeaveJointStateOperator creates an operator that let region leave joint state. -func CreateLeaveJointStateOperator(desc string, ci sche.ClusterInformer, origin *core.RegionInfo) (*Operator, error) { +func CreateLeaveJointStateOperator(desc string, ci sche.ScheduleCluster, origin *core.RegionInfo) (*Operator, error) { b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck, SkipPlacementRulesCheck) if b.err == nil && !core.IsInJointState(origin.GetPeers()...) { @@ -303,14 +303,14 @@ func CreateLeaveJointStateOperator(desc string, ci sche.ClusterInformer, origin } // CreateWitnessPeerOperator creates an operator that set a follower or learner peer with witness -func CreateWitnessPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { +func CreateWitnessPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { return NewBuilder(desc, ci, region). BecomeWitness(peer.GetStoreId()). Build(OpWitness) } // CreateNonWitnessPeerOperator creates an operator that set a peer with non-witness -func CreateNonWitnessPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { +func CreateNonWitnessPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) { return NewBuilder(desc, ci, region). BecomeNonWitness(peer.GetStoreId()). Build(OpWitness) diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 349df14b35d..54773c1de0f 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -224,7 +224,7 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(l.conf) } -func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpLeader.String()).Inc() @@ -324,7 +324,7 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { } } -func (l *balanceLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (l *balanceLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { l.conf.mu.RLock() defer l.conf.mu.RUnlock() basePlan := NewBalanceSchedulerPlan() @@ -419,7 +419,7 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s storesIDs := candidate.binarySearchStores(plan.source, plan.target) candidateUpdateStores[id] = storesIDs } - operator.AddOpInfluence(op, plan.opInfluence, plan.ClusterInformer.GetBasicCluster()) + operator.AddOpInfluence(op, plan.opInfluence, plan.ScheduleCluster.GetBasicCluster()) for id, candidate := range candidates { for _, pos := range candidateUpdateStores[id] { candidate.resortStoreWithPos(pos) diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 04c8fda59f2..c3dc4cf9d68 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -113,7 +113,7 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc() @@ -121,7 +121,7 @@ func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) return allowed } -func (s *balanceRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *balanceRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 4a9c3433963..e2881123aee 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -209,7 +209,7 @@ func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(b.conf) } -func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := b.OpController.OperatorCount(operator.OpWitness) < cluster.GetOpts().GetWitnessScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(b.GetType(), operator.OpWitness.String()).Inc() @@ -217,7 +217,7 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.ClusterInformer return allowed } -func (b *balanceWitnessScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (b *balanceWitnessScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { b.conf.mu.RLock() defer b.conf.mu.RUnlock() basePlan := NewBalanceSchedulerPlan() diff --git a/pkg/schedule/schedulers/balance_witness_test.go b/pkg/schedule/schedulers/balance_witness_test.go index 586bc0d13b9..aa368b4a2d8 100644 --- a/pkg/schedule/schedulers/balance_witness_test.go +++ b/pkg/schedule/schedulers/balance_witness_test.go @@ -49,7 +49,7 @@ func (suite *balanceWitnessSchedulerTestSuite) SetupTest() { Count: 4, }, }) - lb, err := CreateScheduler(BalanceWitnessType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceWitnessType, []string{"", ""})) + lb, err := CreateScheduler(BalanceWitnessType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceWitnessType, []string{"", ""}), nil) suite.NoError(err) suite.lb = lb } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index c3821642422..f83043c8884 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -88,7 +88,7 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration { } // Prepare does some prepare work -func (s *BaseScheduler) Prepare(cluster sche.ClusterInformer) error { return nil } +func (s *BaseScheduler) Prepare(cluster sche.ScheduleCluster) error { return nil } // Cleanup does some cleanup work -func (s *BaseScheduler) Cleanup(cluster sche.ClusterInformer) {} +func (s *BaseScheduler) Cleanup(cluster sche.ScheduleCluster) {} diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index f98f81dd466..df45d9af3af 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -204,7 +204,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *evictLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { +func (s *evictLeaderScheduler) Prepare(cluster sche.ScheduleCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -216,7 +216,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { return res } -func (s *evictLeaderScheduler) Cleanup(cluster sche.ClusterInformer) { +func (s *evictLeaderScheduler) Cleanup(cluster sche.ScheduleCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWithRanges { @@ -224,7 +224,7 @@ func (s *evictLeaderScheduler) Cleanup(cluster sche.ClusterInformer) { } } -func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() @@ -232,7 +232,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) b return allowed } -func (s *evictLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *evictLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil } @@ -257,7 +257,7 @@ type evictLeaderStoresConf interface { getKeyRangesByID(id uint64) []core.KeyRange } -func scheduleEvictLeaderBatch(name, typ string, cluster sche.ClusterInformer, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { +func scheduleEvictLeaderBatch(name, typ string, cluster sche.ScheduleCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { var ops []*operator.Operator for i := 0; i < batchSize; i++ { once := scheduleEvictLeaderOnce(name, typ, cluster, conf) @@ -274,7 +274,7 @@ func scheduleEvictLeaderBatch(name, typ string, cluster sche.ClusterInformer, co return ops } -func scheduleEvictLeaderOnce(name, typ string, cluster sche.ClusterInformer, conf evictLeaderStoresConf) []*operator.Operator { +func scheduleEvictLeaderOnce(name, typ string, cluster sche.ScheduleCluster, conf evictLeaderStoresConf) []*operator.Operator { stores := conf.getStores() ops := make([]*operator.Operator, 0, len(stores)) for _, storeID := range stores { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index 015086ac3de..ebaa6e4a7ba 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -109,7 +109,7 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *evictSlowStoreScheduler) Prepare(cluster sche.ClusterInformer) error { +func (s *evictSlowStoreScheduler) Prepare(cluster sche.ScheduleCluster) error { evictStore := s.conf.evictStore() if evictStore != 0 { return cluster.SlowStoreEvicted(evictStore) @@ -117,11 +117,11 @@ func (s *evictSlowStoreScheduler) Prepare(cluster sche.ClusterInformer) error { return nil } -func (s *evictSlowStoreScheduler) Cleanup(cluster sche.ClusterInformer) { +func (s *evictSlowStoreScheduler) Cleanup(cluster sche.ScheduleCluster) { s.cleanupEvictLeader(cluster) } -func (s *evictSlowStoreScheduler) prepareEvictLeader(cluster sche.ClusterInformer, storeID uint64) error { +func (s *evictSlowStoreScheduler) prepareEvictLeader(cluster sche.ScheduleCluster, storeID uint64) error { err := s.conf.setStoreAndPersist(storeID) if err != nil { log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", storeID)) @@ -131,7 +131,7 @@ func (s *evictSlowStoreScheduler) prepareEvictLeader(cluster sche.ClusterInforme return cluster.SlowStoreEvicted(storeID) } -func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.ClusterInformer) { +func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.ScheduleCluster) { evictSlowStore, err := s.conf.clearAndPersist() if err != nil { log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", evictSlowStore)) @@ -142,11 +142,11 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.ClusterInforme cluster.SlowStoreRecovered(evictSlowStore) } -func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.ClusterInformer) []*operator.Operator { +func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.ScheduleCluster) []*operator.Operator { return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) } -func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { if s.conf.evictStore() != 0 { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { @@ -157,7 +157,7 @@ func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.ClusterInformer return true } -func (s *evictSlowStoreScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *evictSlowStoreScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { evictSlowStoreCounter.Inc() var ops []*operator.Operator diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 29ecf9fc022..58a44118048 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -56,9 +56,9 @@ func (suite *evictSlowStoreTestSuite) SetupTest() { storage := storage.NewStorageWithMemoryBackend() var err error - suite.es, err = CreateScheduler(EvictSlowStoreType, suite.oc, storage, ConfigSliceDecoder(EvictSlowStoreType, []string{})) + suite.es, err = CreateScheduler(EvictSlowStoreType, suite.oc, storage, ConfigSliceDecoder(EvictSlowStoreType, []string{}), nil) suite.NoError(err) - suite.bs, err = CreateScheduler(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{})) + suite.bs, err = CreateScheduler(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{}), nil) suite.NoError(err) } diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index ea13a707b6b..52d0d38012b 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -107,7 +107,7 @@ func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { return conf.Persist() } -func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.ClusterInformer) (oldID uint64, err error) { +func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.ScheduleCluster) (oldID uint64, err error) { oldID = conf.evictedStore() if oldID == 0 { return @@ -139,7 +139,7 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *evictSlowTrendScheduler) Prepare(cluster sche.ClusterInformer) error { +func (s *evictSlowTrendScheduler) Prepare(cluster sche.ScheduleCluster) error { evictedStoreID := s.conf.evictedStore() if evictedStoreID == 0 { return nil @@ -147,11 +147,11 @@ func (s *evictSlowTrendScheduler) Prepare(cluster sche.ClusterInformer) error { return cluster.SlowTrendEvicted(evictedStoreID) } -func (s *evictSlowTrendScheduler) Cleanup(cluster sche.ClusterInformer) { +func (s *evictSlowTrendScheduler) Cleanup(cluster sche.ScheduleCluster) { s.cleanupEvictLeader(cluster) } -func (s *evictSlowTrendScheduler) prepareEvictLeader(cluster sche.ClusterInformer, storeID uint64) error { +func (s *evictSlowTrendScheduler) prepareEvictLeader(cluster sche.ScheduleCluster, storeID uint64) error { err := s.conf.setStoreAndPersist(storeID) if err != nil { log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", storeID)) @@ -160,7 +160,7 @@ func (s *evictSlowTrendScheduler) prepareEvictLeader(cluster sche.ClusterInforme return cluster.SlowTrendEvicted(storeID) } -func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.ClusterInformer) { +func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.ScheduleCluster) { evictedStoreID, err := s.conf.clearAndPersist(cluster) if err != nil { log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", evictedStoreID)) @@ -170,7 +170,7 @@ func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.ClusterInforme } } -func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.ClusterInformer) []*operator.Operator { +func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.ScheduleCluster) []*operator.Operator { store := cluster.GetStore(s.conf.evictedStore()) if store == nil { return nil @@ -179,7 +179,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.ClusterInform return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) } -func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { if s.conf.evictedStore() == 0 { return true } @@ -190,7 +190,7 @@ func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.ClusterInformer return allowed } -func (s *evictSlowTrendScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *evictSlowTrendScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() var ops []*operator.Operator @@ -270,7 +270,7 @@ func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSl } } -func chooseEvictCandidate(cluster sche.ClusterInformer) (slowStore *core.StoreInfo) { +func chooseEvictCandidate(cluster sche.ScheduleCluster) (slowStore *core.StoreInfo) { stores := cluster.GetStores() if len(stores) < 3 { storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:too-few").Inc() @@ -330,7 +330,7 @@ func chooseEvictCandidate(cluster sche.ClusterInformer) (slowStore *core.StoreIn return store } -func checkStoresAreUpdated(cluster sche.ClusterInformer, slowStoreID uint64, slowStoreRecordTS time.Time) bool { +func checkStoresAreUpdated(cluster sche.ScheduleCluster, slowStoreID uint64, slowStoreRecordTS time.Time) bool { stores := cluster.GetStores() if len(stores) <= 1 { return false @@ -359,7 +359,7 @@ func checkStoresAreUpdated(cluster sche.ClusterInformer, slowStoreID uint64, slo return updatedStores >= expected } -func checkStoreSlowerThanOthers(cluster sche.ClusterInformer, target *core.StoreInfo) bool { +func checkStoreSlowerThanOthers(cluster sche.ScheduleCluster, target *core.StoreInfo) bool { stores := cluster.GetStores() expected := (len(stores)*2 + 1) / 3 targetSlowTrend := target.GetSlowTrend() @@ -390,7 +390,7 @@ func checkStoreSlowerThanOthers(cluster sche.ClusterInformer, target *core.Store return slowerThanStoresNum >= expected } -func checkStoreCanRecover(cluster sche.ClusterInformer, target *core.StoreInfo) bool { +func checkStoreCanRecover(cluster sche.ScheduleCluster, target *core.StoreInfo) bool { /* // // This might not be necessary, @@ -413,7 +413,7 @@ func checkStoreCanRecover(cluster sche.ClusterInformer, target *core.StoreInfo) return checkStoreFasterThanOthers(cluster, target) } -func checkStoreFasterThanOthers(cluster sche.ClusterInformer, target *core.StoreInfo) bool { +func checkStoreFasterThanOthers(cluster sche.ScheduleCluster, target *core.StoreInfo) bool { stores := cluster.GetStores() expected := (len(stores) + 1) / 2 targetSlowTrend := target.GetSlowTrend() diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index f08b1ab2d30..18b6e7edd52 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -152,7 +152,7 @@ func (s *grantHotRegionScheduler) EncodeConfig() ([]byte, error) { // IsScheduleAllowed returns whether the scheduler is allowed to schedule. // TODO it should check if there is any scheduler such as evict or hot region scheduler -func (s *grantHotRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *grantHotRegionScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { regionAllowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit() leaderAllowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !regionAllowed { @@ -226,14 +226,14 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle return router } -func (s *grantHotRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *grantHotRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { grantHotRegionCounter.Inc() rw := s.randomRWType() s.prepareForBalance(rw, cluster) return s.dispatch(rw, cluster), nil } -func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster sche.ClusterInformer) []*operator.Operator { +func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster sche.ScheduleCluster) []*operator.Operator { stLoadInfos := s.stLoadInfos[buildResourceType(typ, constant.RegionKind)] infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos)) index := 0 @@ -247,7 +247,7 @@ func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster sche.C return s.randomSchedule(cluster, infos) } -func (s *grantHotRegionScheduler) randomSchedule(cluster sche.ClusterInformer, srcStores []*statistics.StoreLoadDetail) (ops []*operator.Operator) { +func (s *grantHotRegionScheduler) randomSchedule(cluster sche.ScheduleCluster, srcStores []*statistics.StoreLoadDetail) (ops []*operator.Operator) { isLeader := s.r.Int()%2 == 1 for _, srcStore := range srcStores { srcStoreID := srcStore.GetID() @@ -278,7 +278,7 @@ func (s *grantHotRegionScheduler) randomSchedule(cluster sche.ClusterInformer, s return nil } -func (s *grantHotRegionScheduler) transfer(cluster sche.ClusterInformer, regionID uint64, srcStoreID uint64, isLeader bool) (op *operator.Operator, err error) { +func (s *grantHotRegionScheduler) transfer(cluster sche.ScheduleCluster, regionID uint64, srcStoreID uint64, isLeader bool) (op *operator.Operator, err error) { srcRegion := cluster.GetRegion(regionID) if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { return nil, errs.ErrRegionRuleNotFound diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index f84b922f32b..41918f96ed1 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -178,7 +178,7 @@ func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *grantLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { +func (s *grantLeaderScheduler) Prepare(cluster sche.ScheduleCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -190,7 +190,7 @@ func (s *grantLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { return res } -func (s *grantLeaderScheduler) Cleanup(cluster sche.ClusterInformer) { +func (s *grantLeaderScheduler) Cleanup(cluster sche.ScheduleCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWithRanges { @@ -198,7 +198,7 @@ func (s *grantLeaderScheduler) Cleanup(cluster sche.ClusterInformer) { } } -func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() @@ -206,7 +206,7 @@ func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) b return allowed } -func (s *grantLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *grantLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { grantLeaderCounter.Inc() s.conf.mu.RLock() defer s.conf.mu.RUnlock() diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 57b010d2e9c..f4a7772f2f7 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -118,7 +118,7 @@ func newBaseHotScheduler(opController *operator.Controller) *baseHotScheduler { // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store, only update read or write load detail -func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sche.ClusterInformer) { +func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sche.ScheduleCluster) { h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores()) h.summaryPendingInfluence() h.storesLoads = cluster.GetStoresLoads() @@ -263,7 +263,7 @@ func (h *hotScheduler) GetNextInterval(interval time.Duration) time.Duration { return intervalGrow(h.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth) } -func (h *hotScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (h *hotScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := h.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetOpts().GetHotRegionScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(h.GetType(), operator.OpHotRegion.String()).Inc() @@ -271,13 +271,13 @@ func (h *hotScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { return allowed } -func (h *hotScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (h *hotScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { hotSchedulerCounter.Inc() rw := h.randomRWType() return h.dispatch(rw, cluster), nil } -func (h *hotScheduler) dispatch(typ statistics.RWType, cluster sche.ClusterInformer) []*operator.Operator { +func (h *hotScheduler) dispatch(typ statistics.RWType, cluster sche.ScheduleCluster) []*operator.Operator { h.Lock() defer h.Unlock() h.prepareForBalance(typ, cluster) @@ -311,7 +311,7 @@ func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, d return true } -func (h *hotScheduler) balanceHotReadRegions(cluster sche.ClusterInformer) []*operator.Operator { +func (h *hotScheduler) balanceHotReadRegions(cluster sche.ScheduleCluster) []*operator.Operator { leaderSolver := newBalanceSolver(h, cluster, statistics.Read, transferLeader) leaderOps := leaderSolver.solve() peerSolver := newBalanceSolver(h, cluster, statistics.Read, movePeer) @@ -354,7 +354,7 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.ClusterInformer) []*op return nil } -func (h *hotScheduler) balanceHotWriteRegions(cluster sche.ClusterInformer) []*operator.Operator { +func (h *hotScheduler) balanceHotWriteRegions(cluster sche.ScheduleCluster) []*operator.Operator { // prefer to balance by peer s := h.r.Intn(100) switch { @@ -443,7 +443,7 @@ func isAvailableV1(s *solution) bool { } type balanceSolver struct { - sche.ClusterInformer + sche.ScheduleCluster sche *hotScheduler stLoadDetail map[uint64]*statistics.StoreLoadDetail rwTy statistics.RWType @@ -564,7 +564,7 @@ func (bs *balanceSolver) isSelectedDim(dim int) bool { } func (bs *balanceSolver) getPriorities() []string { - querySupport := bs.sche.conf.checkQuerySupport(bs.ClusterInformer) + querySupport := bs.sche.conf.checkQuerySupport(bs.ScheduleCluster) // For read, transfer-leader and move-peer have the same priority config // For write, they are different switch bs.resourceTy { @@ -579,9 +579,9 @@ func (bs *balanceSolver) getPriorities() []string { return []string{} } -func newBalanceSolver(sche *hotScheduler, cluster sche.ClusterInformer, rwTy statistics.RWType, opTy opType) *balanceSolver { +func newBalanceSolver(sche *hotScheduler, cluster sche.ScheduleCluster, rwTy statistics.RWType, opTy opType) *balanceSolver { bs := &balanceSolver{ - ClusterInformer: cluster, + ScheduleCluster: cluster, sche: sche, rwTy: rwTy, opTy: opTy, @@ -591,7 +591,7 @@ func newBalanceSolver(sche *hotScheduler, cluster sche.ClusterInformer, rwTy sta } func (bs *balanceSolver) isValid() bool { - if bs.ClusterInformer == nil || bs.sche == nil || bs.stLoadDetail == nil { + if bs.ScheduleCluster == nil || bs.sche == nil || bs.stLoadDetail == nil { return false } return true @@ -910,7 +910,7 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool { return false } - if !filter.IsRegionReplicated(bs.ClusterInformer, region) { + if !filter.IsRegionReplicated(bs.ScheduleCluster, region) { log.Debug("region has abnormal replica count", zap.String("scheduler", bs.sche.GetName()), zap.Uint64("region-id", region.GetID())) hotSchedulerAbnormalReplicaCounter.Inc() return false @@ -1483,7 +1483,7 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper for i, region := range regions { ids[i] = region.GetID() } - hotBuckets := bs.ClusterInformer.BucketsStats(bs.minHotDegree, ids...) + hotBuckets := bs.ScheduleCluster.BucketsStats(bs.minHotDegree, ids...) operators := make([]*operator.Operator, 0) createFunc := func(region *core.RegionInfo) { diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index 1b1585199f1..c50f18d7c68 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -433,7 +433,7 @@ func (conf *hotRegionSchedulerConfig) persistLocked() error { return conf.storage.SaveScheduleConfig(HotRegionName, data) } -func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster sche.ClusterInformer) bool { +func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster sche.ScheduleCluster) bool { querySupport := versioninfo.IsFeatureSupported(cluster.GetOpts().GetClusterVersion(), versioninfo.HotScheduleWithQuery) conf.Lock() defer conf.Unlock() diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index f8f5959bba6..3dd40fcb984 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -33,7 +33,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { defer cancel() statistics.Denoising = false - sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -95,7 +95,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { defer cancel() statistics.Denoising = false - sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -148,7 +148,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -210,7 +210,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -271,7 +271,7 @@ func TestSkipUniformStore(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 65103e39067..e936f06761f 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -75,7 +75,7 @@ func (s *labelScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *labelScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *labelScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() @@ -83,7 +83,7 @@ func (s *labelScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { return allowed } -func (s *labelScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *labelScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { labelCounter.Inc() stores := cluster.GetStores() rejectLeaderStores := make(map[uint64]struct{}) diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index 980a69b0cee..71e33775c3e 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -77,7 +77,7 @@ func (s *randomMergeScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpMerge) < cluster.GetOpts().GetMergeScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpMerge.String()).Inc() @@ -85,7 +85,7 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) b return allowed } -func (s *randomMergeScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *randomMergeScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { randomMergeCounter.Inc() store := filter.NewCandidates(cluster.GetStores()). @@ -128,7 +128,7 @@ func (s *randomMergeScheduler) Schedule(cluster sche.ClusterInformer, dryRun boo return ops, nil } -func (s *randomMergeScheduler) allowMerge(cluster sche.ClusterInformer, region, target *core.RegionInfo) bool { +func (s *randomMergeScheduler) allowMerge(cluster sche.ScheduleCluster, region, target *core.RegionInfo) bool { if !filter.IsRegionHealthy(region) || !filter.IsRegionHealthy(target) { return false } diff --git a/pkg/schedule/schedulers/range_cluster.go b/pkg/schedule/schedulers/range_cluster.go index 3f81f4e59ca..80bff6a64c9 100644 --- a/pkg/schedule/schedulers/range_cluster.go +++ b/pkg/schedule/schedulers/range_cluster.go @@ -22,21 +22,21 @@ import ( // rangeCluster isolates the cluster by range. type rangeCluster struct { - sche.ClusterInformer + sche.ScheduleCluster subCluster *core.BasicCluster // Collect all regions belong to the range. tolerantSizeRatio float64 } // genRangeCluster gets a range cluster by specifying start key and end key. // The cluster can only know the regions within [startKey, endKey). -func genRangeCluster(cluster sche.ClusterInformer, startKey, endKey []byte) *rangeCluster { +func genRangeCluster(cluster sche.ScheduleCluster, startKey, endKey []byte) *rangeCluster { subCluster := core.NewBasicCluster() for _, r := range cluster.ScanRegions(startKey, endKey, -1) { origin, overlaps, rangeChanged := subCluster.SetRegion(r) subCluster.UpdateSubTree(r, origin, overlaps, rangeChanged) } return &rangeCluster{ - ClusterInformer: cluster, + ScheduleCluster: cluster, subCluster: subCluster, } } @@ -70,7 +70,7 @@ func (r *rangeCluster) updateStoreInfo(s *core.StoreInfo) *core.StoreInfo { // GetStore searches for a store by ID. func (r *rangeCluster) GetStore(id uint64) *core.StoreInfo { - s := r.ClusterInformer.GetStore(id) + s := r.ScheduleCluster.GetStore(id) if s == nil { return nil } @@ -79,7 +79,7 @@ func (r *rangeCluster) GetStore(id uint64) *core.StoreInfo { // GetStores returns all Stores in the cluster. func (r *rangeCluster) GetStores() []*core.StoreInfo { - stores := r.ClusterInformer.GetStores() + stores := r.ScheduleCluster.GetStores() newStores := make([]*core.StoreInfo, 0, len(stores)) for _, s := range stores { newStores = append(newStores, r.updateStoreInfo(s)) @@ -97,7 +97,7 @@ func (r *rangeCluster) GetTolerantSizeRatio() float64 { if r.tolerantSizeRatio != 0 { return r.tolerantSizeRatio } - return r.ClusterInformer.GetOpts().GetTolerantSizeRatio() + return r.ScheduleCluster.GetOpts().GetTolerantSizeRatio() } // RandFollowerRegions returns a random region that has a follower on the store. @@ -117,7 +117,7 @@ func (r *rangeCluster) GetAverageRegionSize() int64 { // GetRegionStores returns all stores that contains the region's peer. func (r *rangeCluster) GetRegionStores(region *core.RegionInfo) []*core.StoreInfo { - stores := r.ClusterInformer.GetRegionStores(region) + stores := r.ScheduleCluster.GetRegionStores(region) newStores := make([]*core.StoreInfo, 0, len(stores)) for _, s := range stores { newStores = append(newStores, r.updateStoreInfo(s)) @@ -127,7 +127,7 @@ func (r *rangeCluster) GetRegionStores(region *core.RegionInfo) []*core.StoreInf // GetFollowerStores returns all stores that contains the region's follower peer. func (r *rangeCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo { - stores := r.ClusterInformer.GetFollowerStores(region) + stores := r.ScheduleCluster.GetFollowerStores(region) newStores := make([]*core.StoreInfo, 0, len(stores)) for _, s := range stores { newStores = append(newStores, r.updateStoreInfo(s)) @@ -137,7 +137,7 @@ func (r *rangeCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreI // GetLeaderStore returns all stores that contains the region's leader peer. func (r *rangeCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo { - s := r.ClusterInformer.GetLeaderStore(region) + s := r.ScheduleCluster.GetLeaderStore(region) if s != nil { return r.updateStoreInfo(s) } diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 6f9e48398f6..e89b0abebb8 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -168,11 +168,11 @@ func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(l.config) } -func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { return l.allowBalanceLeader(cluster) || l.allowBalanceRegion(cluster) } -func (l *scatterRangeScheduler) allowBalanceLeader(cluster sche.ClusterInformer) bool { +func (l *scatterRangeScheduler) allowBalanceLeader(cluster sche.ScheduleCluster) bool { allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpLeader.String()).Inc() @@ -180,7 +180,7 @@ func (l *scatterRangeScheduler) allowBalanceLeader(cluster sche.ClusterInformer) return allowed } -func (l *scatterRangeScheduler) allowBalanceRegion(cluster sche.ClusterInformer) bool { +func (l *scatterRangeScheduler) allowBalanceRegion(cluster sche.ScheduleCluster) bool { allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetOpts().GetRegionScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpRegion.String()).Inc() @@ -188,7 +188,7 @@ func (l *scatterRangeScheduler) allowBalanceRegion(cluster sche.ClusterInformer) return allowed } -func (l *scatterRangeScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (l *scatterRangeScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { scatterRangeCounter.Inc() // isolate a new cluster according to the key range c := genRangeCluster(cluster, l.config.GetStartKey(), l.config.GetEndKey()) diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 9389f862f5a..b03044e9471 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -40,10 +40,10 @@ type Scheduler interface { EncodeConfig() ([]byte, error) GetMinInterval() time.Duration GetNextInterval(interval time.Duration) time.Duration - Prepare(cluster sche.ClusterInformer) error - Cleanup(cluster sche.ClusterInformer) - Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) - IsScheduleAllowed(cluster sche.ClusterInformer) bool + Prepare(cluster sche.ScheduleCluster) error + Cleanup(cluster sche.ScheduleCluster) + Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) + IsScheduleAllowed(cluster sche.ScheduleCluster) bool } // EncodeConfig encode the custom config for each scheduler. diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index d3241087d19..aaa729f5a55 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -77,7 +77,7 @@ func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { hotRegionAllowed := s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.Limit regionAllowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit() leaderAllowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() @@ -93,7 +93,7 @@ func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInform return hotRegionAllowed && regionAllowed && leaderAllowed } -func (s *shuffleHotRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *shuffleHotRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { shuffleHotRegionCounter.Inc() rw := s.randomRWType() s.prepareForBalance(rw, cluster) @@ -101,7 +101,7 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRu return operators, nil } -func (s *shuffleHotRegionScheduler) randomSchedule(cluster sche.ClusterInformer, loadDetail map[uint64]*statistics.StoreLoadDetail) []*operator.Operator { +func (s *shuffleHotRegionScheduler) randomSchedule(cluster sche.ScheduleCluster, loadDetail map[uint64]*statistics.StoreLoadDetail) []*operator.Operator { for _, detail := range loadDetail { if len(detail.HotPeers) < 1 { continue diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index b406ad1a782..f2ee8b8ff92 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -78,7 +78,7 @@ func (s *shuffleLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() @@ -86,7 +86,7 @@ func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) return allowed } -func (s *shuffleLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *shuffleLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { // We shuffle leaders between stores by: // 1. random select a valid store. // 2. transfer a leader to the store. diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index 32ff4eb708a..deb4ac5635a 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -80,7 +80,7 @@ func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) { return s.conf.EncodeConfig() } -func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc() @@ -88,7 +88,7 @@ func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) return allowed } -func (s *shuffleRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *shuffleRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { shuffleRegionCounter.Inc() region, oldPeer := s.scheduleRemovePeer(cluster) if region == nil { @@ -112,7 +112,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRun b return []*operator.Operator{op}, nil } -func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.ClusterInformer) (*core.RegionInfo, *metapb.Peer) { +func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.ScheduleCluster) (*core.RegionInfo, *metapb.Peer) { candidates := filter.NewCandidates(cluster.GetStores()). FilterSource(cluster.GetOpts(), nil, nil, s.filters...). Shuffle() @@ -144,7 +144,7 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.ClusterInformer return nil, nil } -func (s *shuffleRegionScheduler) scheduleAddPeer(cluster sche.ClusterInformer, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer { +func (s *shuffleRegionScheduler) scheduleAddPeer(cluster sche.ScheduleCluster, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer { store := cluster.GetStore(oldPeer.GetStoreId()) if store == nil { return nil diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index a1ff2add096..fef20974ef0 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -166,7 +166,7 @@ func (s *splitBucketScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) } // IsScheduleAllowed return true if the sum of executing opSplit operator is less . -func (s *splitBucketScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *splitBucketScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { if !cluster.GetStoreConfig().IsEnableRegionBucket() { splitBucketDisableCounter.Inc() return false @@ -181,13 +181,13 @@ func (s *splitBucketScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) b type splitBucketPlan struct { hotBuckets map[uint64][]*buckets.BucketStat - cluster sche.ClusterInformer + cluster sche.ScheduleCluster conf *splitBucketSchedulerConfig hotRegionSplitSize int64 } // Schedule return operators if some bucket is too hot. -func (s *splitBucketScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *splitBucketScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { splitBucketScheduleCounter.Inc() conf := s.conf.Clone() plan := &splitBucketPlan{ diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index 07a9858a0a7..c9b66ccef4e 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -67,16 +67,16 @@ func (s *trasferWitnessLeaderScheduler) GetType() string { return TransferWitnessLeaderType } -func (s *trasferWitnessLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *trasferWitnessLeaderScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { return true } -func (s *trasferWitnessLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *trasferWitnessLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { transferWitnessLeaderCounter.Inc() return s.scheduleTransferWitnessLeaderBatch(s.GetName(), s.GetType(), cluster, transferWitnessLeaderBatchSize), nil } -func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster sche.ClusterInformer, batchSize int) []*operator.Operator { +func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster sche.ScheduleCluster, batchSize int) []*operator.Operator { var ops []*operator.Operator for i := 0; i < batchSize; i++ { select { @@ -98,7 +98,7 @@ func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, return ops } -func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.ClusterInformer, region *core.RegionInfo) (*operator.Operator, error) { +func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.ScheduleCluster, region *core.RegionInfo) (*operator.Operator, error) { var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 45c3ada135f..5b034e79842 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -43,7 +43,7 @@ const ( type solver struct { *balanceSchedulerPlan - sche.ClusterInformer + sche.ScheduleCluster kind constant.ScheduleKind opInfluence operator.OpInfluence tolerantSizeRatio float64 @@ -54,10 +54,10 @@ type solver struct { targetScore float64 } -func newSolver(basePlan *balanceSchedulerPlan, kind constant.ScheduleKind, cluster sche.ClusterInformer, opInfluence operator.OpInfluence) *solver { +func newSolver(basePlan *balanceSchedulerPlan, kind constant.ScheduleKind, cluster sche.ScheduleCluster, opInfluence operator.OpInfluence) *solver { return &solver{ balanceSchedulerPlan: basePlan, - ClusterInformer: cluster, + ScheduleCluster: cluster, kind: kind, opInfluence: opInfluence, tolerantSizeRatio: adjustTolerantRatio(cluster, kind), @@ -181,7 +181,7 @@ func (p *solver) getTolerantResource() int64 { return p.tolerantSource } -func adjustTolerantRatio(cluster sche.ClusterInformer, kind constant.ScheduleKind) float64 { +func adjustTolerantRatio(cluster sche.ScheduleCluster, kind constant.ScheduleKind) float64 { var tolerantSizeRatio float64 switch c := cluster.(type) { case *rangeCluster: diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 70d0884d53d..6b7e9046542 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -186,7 +186,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { return schedulers.EncodeConfig(s.conf) } -func (s *evictLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { +func (s *evictLeaderScheduler) Prepare(cluster sche.ScheduleCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -198,7 +198,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { return res } -func (s *evictLeaderScheduler) Cleanup(cluster sche.ClusterInformer) { +func (s *evictLeaderScheduler) Cleanup(cluster sche.ScheduleCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWitRanges { @@ -206,7 +206,7 @@ func (s *evictLeaderScheduler) Cleanup(cluster sche.ClusterInformer) { } } -func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() @@ -214,7 +214,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) b return allowed } -func (s *evictLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *evictLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWitRanges)) s.conf.mu.RLock() defer s.conf.mu.RUnlock() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 0e5c049e15b..86b241f08b7 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3335,7 +3335,7 @@ type mockLimitScheduler struct { kind operator.OpKind } -func (s *mockLimitScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { +func (s *mockLimitScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool { return s.counter.OperatorCount(s.kind) < s.limit }