Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: instead scheduler type string of types.CheckerSchedulerType #8485

Merged
merged 18 commits into from
Aug 8, 2024
Merged
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,12 @@ func (c *Cluster) updateScheduler() {
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStr2Type[scheduler.Type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does ConvertOldStrToType sound to you? Variable names with embedded numbers are not common in PD.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are same for me. I can update it

s, err := schedulers.CreateScheduler(
scheduler.Type,
schedulerType,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args),
schedulers.ConfigSliceDecoder(schedulerType, scheduler.Args),
schedulersController.RemoveScheduler,
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ package mockconfig

import (
sc "github.com/tikv/pd/pkg/schedule/config"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/server/config"
)

// NewTestOptions creates default options for testing.
func NewTestOptions() *config.PersistOptions {
// register default schedulers in case config check fail.
for _, d := range sc.DefaultSchedulers {
sc.RegisterScheduler(d.Type)
sc.RegisterScheduler(types.ConvertOldStr2Type[d.Type])
}
c := config.NewConfig()
c.Adjust(nil, false)
Expand Down
9 changes: 5 additions & 4 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/storelimit"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -571,10 +572,10 @@ type SchedulerConfig struct {
// If these schedulers are not in the persistent configuration, they
// will be created automatically when reloading.
var DefaultSchedulers = SchedulerConfigs{
{Type: "balance-region"},
{Type: "balance-leader"},
{Type: "hot-region"},
{Type: "evict-slow-store"},
{Type: types.SchedulerTypeCompatibleMap[types.BalanceRegionScheduler]},
{Type: types.SchedulerTypeCompatibleMap[types.BalanceLeaderScheduler]},
{Type: types.SchedulerTypeCompatibleMap[types.BalanceHotRegionScheduler]},
{Type: types.SchedulerTypeCompatibleMap[types.EvictSlowStoreScheduler]},
}

// IsDefaultScheduler checks whether the scheduler is enabled by default.
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ const RejectLeader = "reject-leader"
var schedulerMap sync.Map

// RegisterScheduler registers the scheduler type.
func RegisterScheduler(typ string) {
func RegisterScheduler(typ types.CheckerSchedulerType) {
schedulerMap.Store(typ, struct{}{})
}

// IsSchedulerRegistered checks if the named scheduler type is registered.
func IsSchedulerRegistered(name string) bool {
_, ok := schedulerMap.Load(name)
func IsSchedulerRegistered(typ types.CheckerSchedulerType) bool {
_, ok := schedulerMap.Load(typ)
return ok
}

Expand Down
34 changes: 22 additions & 12 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -187,7 +188,7 @@ func (c *Coordinator) driveSlowNodeScheduler() {
// If the cluster was set up with `raft-kv2` engine, this cluster should
// enable `evict-slow-trend` scheduler as default.
if c.GetCluster().GetStoreConfig().IsRaftKV2() {
typ := schedulers.EvictSlowTrendType
typ := types.EvictSlowTrendScheduler
args := []string{}

s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler)
Expand Down Expand Up @@ -275,7 +276,7 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
typ := schedulers.FindSchedulerTypeByName(name)
var cfg sc.SchedulerConfig
for _, c := range scheduleCfg.Schedulers {
if c.Type == typ {
if c.Type == types.SchedulerTypeCompatibleMap[typ] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming to ConvertNewType2OldStr

cfg = c
break
}
Expand All @@ -288,20 +289,23 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
s, err := schedulers.CreateScheduler(types.ConvertOldStr2Type[cfg.Type], c.opController,
c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
}
if needRun {
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
log.Error("can not add scheduler with independent configuration",
zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
} else {
log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
log.Error("can not add scheduler handler with independent configuration",
zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
}
}
Expand All @@ -316,16 +320,22 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
continue
}

s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.schedulers.RemoveScheduler)
tp := types.ConvertOldStr2Type[schedulerCfg.Type]
s, err := schedulers.CreateScheduler(tp, c.opController,
c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(tp, schedulerCfg.Args), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
log.Error("can not create scheduler", zap.Stringer("type", tp), zap.String("scheduler-type", schedulerCfg.Type),
zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
continue
}

if needRun {
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()),
zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil &&
!errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name",
s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
Expand Down Expand Up @@ -362,7 +372,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
log.Error("GetFunction SchedulerType error", errs.ZapError(err))
return
}
schedulerType := SchedulerType.(func() string)
schedulerType := SchedulerType.(func() types.CheckerSchedulerType)
// get func: SchedulerArgs from plugin
SchedulerArgs, err := c.pluginInterface.GetFunction(pluginPath, "SchedulerArgs")
if err != nil {
Expand All @@ -373,7 +383,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
// create and add user scheduler
s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err))
log.Error("can not create scheduler", zap.Stringer("scheduler-type", schedulerType()), errs.ZapError(err))
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
Expand Down
14 changes: 7 additions & 7 deletions pkg/schedule/schedulers/balance_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func BenchmarkPlacementRule(b *testing.B) {
re := assert.New(b)
cancel, tc, oc := newBenchCluster(true, true, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
var ops []*operator.Operator
var plans []plan.Plan
Expand All @@ -171,7 +171,7 @@ func BenchmarkPlacementRule(b *testing.B) {
func BenchmarkLabel(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, true, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand All @@ -181,7 +181,7 @@ func BenchmarkLabel(b *testing.B) {
func BenchmarkNoLabel(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, false, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand All @@ -191,7 +191,7 @@ func BenchmarkNoLabel(b *testing.B) {
func BenchmarkDiagnosticNoLabel1(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, false, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, true)
Expand All @@ -201,7 +201,7 @@ func BenchmarkDiagnosticNoLabel1(b *testing.B) {
func BenchmarkDiagnosticNoLabel2(b *testing.B) {
cancel, tc, oc := newBenchBigCluster(100, 100)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, true)
Expand All @@ -211,7 +211,7 @@ func BenchmarkDiagnosticNoLabel2(b *testing.B) {
func BenchmarkNoLabel2(b *testing.B) {
cancel, tc, oc := newBenchBigCluster(100, 100)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand All @@ -221,7 +221,7 @@ func BenchmarkNoLabel2(b *testing.B) {
func BenchmarkTombStore(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, false, true)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand Down
4 changes: 1 addition & 3 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ import (
const (
// BalanceLeaderName is balance leader scheduler name.
BalanceLeaderName = "balance-leader-scheduler"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we will unify it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be in the next pr

// BalanceLeaderType is balance leader scheduler type.
BalanceLeaderType = "balance-leader"
// BalanceLeaderBatchSize is the default number of operators to transfer leaders by one scheduling.
// Default value is 4 which is subjected by scheduler-max-waiting-operator and leader-schedule-limit
// If you want to increase balance speed more, please increase above-mentioned param.
Expand Down Expand Up @@ -536,7 +534,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
}
solver.Step++
defer func() { solver.Step-- }()
op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(l.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader)
if err != nil {
log.Debug("fail to create balance leader operator", errs.ZapError(err))
if collector != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
const (
// BalanceRegionName is balance region scheduler name.
BalanceRegionName = "balance-region-scheduler"
// BalanceRegionType is balance region scheduler type.
BalanceRegionType = "balance-region"
)

type balanceRegionSchedulerConfig struct {
Expand Down Expand Up @@ -245,7 +243,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
oldPeer := solver.Region.GetStorePeer(sourceID)
newPeer := &metapb.Peer{StoreId: solver.Target.GetID(), Role: oldPeer.Role}
solver.Step++
op, err := operator.CreateMovePeerOperator(BalanceRegionType, solver, solver.Region, operator.OpRegion, oldPeer.GetStoreId(), newPeer)
op, err := operator.CreateMovePeerOperator(s.GetName(), solver, solver.Region, operator.OpRegion, oldPeer.GetStoreId(), newPeer)
if err != nil {
balanceRegionCreateOpFailCounter.Inc()
if collector != nil {
Expand Down
Loading
Loading