Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move scheduling-related config definitions into pkg #6857

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions pkg/mock/mockcluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,131 +17,132 @@ package mockcluster
import (
"time"

sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server/config"
)

// SetMaxMergeRegionSize updates the MaxMergeRegionSize configuration.
func (mc *Cluster) SetMaxMergeRegionSize(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxMergeRegionSize = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxMergeRegionSize = uint64(v) })
}

// SetMaxMergeRegionKeys updates the MaxMergeRegionKeys configuration.
func (mc *Cluster) SetMaxMergeRegionKeys(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxMergeRegionKeys = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxMergeRegionKeys = uint64(v) })
}

// SetSplitMergeInterval updates the SplitMergeInterval configuration.
func (mc *Cluster) SetSplitMergeInterval(v time.Duration) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.SplitMergeInterval = typeutil.NewDuration(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.SplitMergeInterval = typeutil.NewDuration(v) })
}

// SetEnableOneWayMerge updates the EnableOneWayMerge configuration.
func (mc *Cluster) SetEnableOneWayMerge(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableOneWayMerge = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableOneWayMerge = v })
}

// SetMaxSnapshotCount updates the MaxSnapshotCount configuration.
func (mc *Cluster) SetMaxSnapshotCount(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxSnapshotCount = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxSnapshotCount = uint64(v) })
}

// SetEnableMakeUpReplica updates the EnableMakeUpReplica configuration.
func (mc *Cluster) SetEnableMakeUpReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableMakeUpReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableMakeUpReplica = v })
}

// SetEnableRemoveExtraReplica updates the EnableRemoveExtraReplica configuration.
func (mc *Cluster) SetEnableRemoveExtraReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableRemoveExtraReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableRemoveExtraReplica = v })
}

// SetEnableLocationReplacement updates the EnableLocationReplacement configuration.
func (mc *Cluster) SetEnableLocationReplacement(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableLocationReplacement = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableLocationReplacement = v })
}

// SetEnableRemoveDownReplica updates the EnableRemoveDownReplica configuration.
func (mc *Cluster) SetEnableRemoveDownReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableRemoveDownReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableRemoveDownReplica = v })
}

// SetEnableReplaceOfflineReplica updates the EnableReplaceOfflineReplica configuration.
func (mc *Cluster) SetEnableReplaceOfflineReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableReplaceOfflineReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableReplaceOfflineReplica = v })
}

// SetLeaderSchedulePolicy updates the LeaderSchedulePolicy configuration.
func (mc *Cluster) SetLeaderSchedulePolicy(v string) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.LeaderSchedulePolicy = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.LeaderSchedulePolicy = v })
}

// SetTolerantSizeRatio updates the TolerantSizeRatio configuration.
func (mc *Cluster) SetTolerantSizeRatio(v float64) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.TolerantSizeRatio = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.TolerantSizeRatio = v })
}

// SetRegionScoreFormulaVersion updates the RegionScoreFormulaVersion configuration.
func (mc *Cluster) SetRegionScoreFormulaVersion(v string) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.RegionScoreFormulaVersion = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.RegionScoreFormulaVersion = v })
}

// SetLeaderScheduleLimit updates the LeaderScheduleLimit configuration.
func (mc *Cluster) SetLeaderScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.LeaderScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.LeaderScheduleLimit = uint64(v) })
}

// SetRegionScheduleLimit updates the RegionScheduleLimit configuration.
func (mc *Cluster) SetRegionScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.RegionScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.RegionScheduleLimit = uint64(v) })
}

// SetMergeScheduleLimit updates the MergeScheduleLimit configuration.
func (mc *Cluster) SetMergeScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MergeScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MergeScheduleLimit = uint64(v) })
}

// SetHotRegionScheduleLimit updates the HotRegionScheduleLimit configuration.
func (mc *Cluster) SetHotRegionScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.HotRegionScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.HotRegionScheduleLimit = uint64(v) })
}

// SetHotRegionCacheHitsThreshold updates the HotRegionCacheHitsThreshold configuration.
func (mc *Cluster) SetHotRegionCacheHitsThreshold(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.HotRegionCacheHitsThreshold = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.HotRegionCacheHitsThreshold = uint64(v) })
}

// SetEnablePlacementRules updates the EnablePlacementRules configuration.
func (mc *Cluster) SetEnablePlacementRules(v bool) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.EnablePlacementRules = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.EnablePlacementRules = v })
if v {
mc.initRuleManager()
}
}

// SetMaxReplicas updates the maxReplicas configuration.
func (mc *Cluster) SetMaxReplicas(v int) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.MaxReplicas = uint64(v) })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.MaxReplicas = uint64(v) })
}

// SetLocationLabels updates the LocationLabels configuration.
func (mc *Cluster) SetLocationLabels(v []string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.LocationLabels = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.LocationLabels = v })
}

// SetIsolationLevel updates the IsolationLevel configuration.
func (mc *Cluster) SetIsolationLevel(v string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.IsolationLevel = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.IsolationLevel = v })
}

func (mc *Cluster) updateScheduleConfig(f func(*config.ScheduleConfig)) {
func (mc *Cluster) updateScheduleConfig(f func(*sc.ScheduleConfig)) {
s := mc.GetScheduleConfig().Clone()
f(s)
mc.SetScheduleConfig(s)
}

func (mc *Cluster) updateReplicationConfig(f func(*config.ReplicationConfig)) {
func (mc *Cluster) updateReplicationConfig(f func(*sc.ReplicationConfig)) {
r := mc.GetReplicationConfig().Clone()
f(r)
mc.SetReplicationConfig(r)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
}

// GetStoreConfig returns the store config.
func (mc *Cluster) GetStoreConfig() sc.StoreConfig {
func (mc *Cluster) GetStoreConfig() sc.StoreConfigProvider {
return mc.StoreConfigManager.GetStoreConfig()
}

// GetCheckerConfig returns the checker config.
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfig {
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfigProvider {
return mc
}

// GetSchedulerConfig returns the scheduler config.
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfig {
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider {
return mc
}

// GetSharedConfig returns the shared config.
func (mc *Cluster) GetSharedConfig() sc.SharedConfig {
func (mc *Cluster) GetSharedConfig() sc.SharedConfigProvider {
return mc
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// NewTestOptions creates default options for testing.
func NewTestOptions() *config.PersistOptions {
// register default schedulers in case config check fail.
for _, d := range config.DefaultSchedulers {
for _, d := range sc.DefaultSchedulers {
sc.RegisterScheduler(d.Type)
}
c := config.NewConfig()
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("
// Controller is used to manage all checkers.
type Controller struct {
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
opController *operator.Controller
learnerChecker *LearnerChecker
replicaChecker *ReplicaChecker
Expand All @@ -53,7 +53,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ var (
type MergeChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ const defaultPriorityQueueSize = 1280
// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfig) *PriorityInspector {
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
conf: conf,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ var (
type ReplicaChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
regionWaitingList cache.Cache
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfig, regionWaitingList cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
Loading