Skip to content

Commit

Permalink
put the scheduler related logic together
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 23, 2023
1 parent a14f38f commit b57cc7e
Show file tree
Hide file tree
Showing 39 changed files with 224 additions and 255 deletions.
6 changes: 3 additions & 3 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Config interface {
// Manager manages keyspace related data.
// It validates requests and provides concurrency control.
type Manager struct {
// ctx is the context of the manager, to be used in transaction.
ctx context.Context
// metaLock guards keyspace meta.
metaLock *syncutil.LockGroup
// idAllocator allocates keyspace id.
Expand All @@ -72,8 +74,6 @@ type Manager struct {
store endpoint.KeyspaceStorage
// rc is the raft cluster of the server.
cluster core.ClusterInformer
// ctx is the context of the manager, to be used in transaction.
ctx context.Context
// config is the configurations of the manager.
config Config
// kgm is the keyspace group manager of the server.
Expand Down Expand Up @@ -104,11 +104,11 @@ func NewKeyspaceManager(
kgm *GroupManager,
) *Manager {
return &Manager{
ctx: ctx,
metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)),
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: ctx,
config: config,
kgm: kgm,
nextPatrolStartID: utils.DefaultKeyspaceID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 TiKV Project Authors.
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
7 changes: 3 additions & 4 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
Expand Down Expand Up @@ -121,7 +120,7 @@ func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig
}

func (conf *balanceLeaderSchedulerConfig) persistLocked() error {
data, err := schedule.EncodeConfig(conf)
data, err := EncodeConfig(conf)
if err != nil {
return err
}
Expand Down Expand Up @@ -170,7 +169,7 @@ type balanceLeaderScheduler struct {

// newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on
// each store balanced.
func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) schedule.Scheduler {
func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) Scheduler {
base := NewBaseScheduler(opController)
s := &balanceLeaderScheduler{
BaseScheduler: base,
Expand Down Expand Up @@ -224,7 +223,7 @@ func (l *balanceLeaderScheduler) GetType() string {
func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
l.conf.mu.RLock()
defer l.conf.mu.RUnlock()
return schedule.EncodeConfig(l.conf)
return EncodeConfig(l.conf)
}

func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type balanceRegionScheduler struct {

// newBalanceRegionScheduler creates a scheduler that tends to keep regions on
// each store balanced.
func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) schedule.Scheduler {
func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) Scheduler {
base := NewBaseScheduler(opController)
scheduler := &balanceRegionScheduler{
BaseScheduler: base,
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s *balanceRegionScheduler) GetType() string {
}

func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(s.conf)
return EncodeConfig(s.conf)
}

func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool {
Expand Down
53 changes: 26 additions & 27 deletions pkg/schedule/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
Expand Down Expand Up @@ -228,7 +227,7 @@ type balanceLeaderSchedulerTestSuite struct {
suite.Suite
cancel context.CancelFunc
tc *mockcluster.Cluster
lb schedule.Scheduler
lb Scheduler
oc *operator.Controller
conf config.Config
}
Expand All @@ -239,7 +238,7 @@ func TestBalanceLeaderSchedulerTestSuite(t *testing.T) {

func (suite *balanceLeaderSchedulerTestSuite) SetupTest() {
suite.cancel, suite.conf, suite.tc, suite.oc = prepareSchedulersTest()
lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""}))
lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""}))
suite.NoError(err)
suite.lb = lb
}
Expand Down Expand Up @@ -573,34 +572,34 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestSingleRangeBalance() {
suite.tc.UpdateStoreLeaderWeight(3, 1)
suite.tc.UpdateStoreLeaderWeight(4, 2)
suite.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4)
lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""}))
lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""}))
suite.NoError(err)
ops, _ := lb.Schedule(suite.tc, false)
suite.NotEmpty(ops)
suite.Len(ops, 1)
suite.Len(ops[0].Counters, 1)
suite.Len(ops[0].FinishedCounters, 3)
lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"}))
lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"}))
suite.NoError(err)
ops, _ = lb.Schedule(suite.tc, false)
suite.Empty(ops)
lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"b", "f"}))
lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", "f"}))
suite.NoError(err)
ops, _ = lb.Schedule(suite.tc, false)
suite.Empty(ops)
lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "a"}))
lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "a"}))
suite.NoError(err)
ops, _ = lb.Schedule(suite.tc, false)
suite.Empty(ops)
lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"g", ""}))
lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"g", ""}))
suite.NoError(err)
ops, _ = lb.Schedule(suite.tc, false)
suite.Empty(ops)
lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "f"}))
lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "f"}))
suite.NoError(err)
ops, _ = lb.Schedule(suite.tc, false)
suite.Empty(ops)
lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"b", ""}))
lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", ""}))
suite.NoError(err)
ops, _ = lb.Schedule(suite.tc, false)
suite.Empty(ops)
Expand All @@ -619,7 +618,7 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestMultiRangeBalance() {
suite.tc.UpdateStoreLeaderWeight(3, 1)
suite.tc.UpdateStoreLeaderWeight(4, 2)
suite.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4)
lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "g", "o", "t"}))
lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "g", "o", "t"}))
suite.NoError(err)
ops, _ := lb.Schedule(suite.tc, false)
suite.Equal(uint64(1), ops[0].RegionID())
Expand Down Expand Up @@ -657,7 +656,7 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestBatchBalance() {

suite.tc.AddLeaderRegionWithRange(uint64(102), "102a", "102z", 1, 2, 3)
suite.tc.AddLeaderRegionWithRange(uint64(103), "103a", "103z", 4, 5, 6)
lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""}))
lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""}))
suite.NoError(err)
ops, _ := lb.Schedule(suite.tc, false)
suite.Len(ops, 2)
Expand Down Expand Up @@ -748,7 +747,7 @@ func checkBalanceRegionSchedule1(re *require.Assertions, enablePlacementRules bo
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetEnablePlacementRules(enablePlacementRules)
tc.SetMaxReplicasWithLabel(enablePlacementRules, 1)
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)
// Add stores 1,2,3,4.
tc.AddRegionStore(1, 6)
Expand Down Expand Up @@ -803,7 +802,7 @@ func checkReplica3(re *require.Assertions, enablePlacementRules bool) {
tc.SetEnablePlacementRules(enablePlacementRules)
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3)

sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)
// Store 1 has the largest region score, so the balance scheduler tries to replace peer in store 1.
tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -877,7 +876,7 @@ func checkReplica5(re *require.Assertions, enablePlacementRules bool) {
tc.SetEnablePlacementRules(enablePlacementRules)
tc.SetMaxReplicasWithLabel(enablePlacementRules, 5)

sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)
tc.AddLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 5, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -978,7 +977,7 @@ func checkBalanceRegionSchedule2(re *require.Assertions, enablePlacementRules bo
core.SetApproximateKeys(200),
)

sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)

tc.AddRegionStore(1, 11)
Expand Down Expand Up @@ -1034,7 +1033,7 @@ func checkBalanceRegionStoreWeight(re *require.Assertions, enablePlacementRules
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetEnablePlacementRules(enablePlacementRules)
tc.SetMaxReplicasWithLabel(enablePlacementRules, 1)
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)

tc.AddRegionStore(1, 10)
Expand Down Expand Up @@ -1069,7 +1068,7 @@ func checkBalanceRegionOpInfluence(re *require.Assertions, enablePlacementRules
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetEnablePlacementRules(enablePlacementRules)
tc.SetMaxReplicasWithLabel(enablePlacementRules, 1)
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)
// Add stores 1,2,3,4.
tc.AddRegionStoreWithLeader(1, 2)
Expand Down Expand Up @@ -1105,7 +1104,7 @@ func checkReplacePendingRegion(re *require.Assertions, enablePlacementRules bool
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetEnablePlacementRules(enablePlacementRules)
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3)
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)
// Store 1 has the largest region score, so the balance scheduler try to replace peer in store 1.
tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -1135,7 +1134,7 @@ func TestBalanceRegionShouldNotBalance(t *testing.T) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)
region := tc.MockRegionInfo(1, 0, []uint64{2, 3, 4}, nil, nil)
tc.PutRegion(region)
Expand All @@ -1148,7 +1147,7 @@ func TestBalanceRegionEmptyRegion(t *testing.T) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""}))
re.NoError(err)
tc.AddRegionStore(1, 10)
tc.AddRegionStore(2, 9)
Expand Down Expand Up @@ -1194,7 +1193,7 @@ func checkRandomMergeSchedule(re *require.Assertions, enablePlacementRules bool)
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3)
tc.SetMergeScheduleLimit(1)

mb, err := schedule.CreateScheduler(RandomMergeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(RandomMergeType, []string{"", ""}))
mb, err := CreateScheduler(RandomMergeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(RandomMergeType, []string{"", ""}))
re.NoError(err)

tc.AddRegionStore(1, 4)
Expand Down Expand Up @@ -1276,7 +1275,7 @@ func checkScatterRangeBalance(re *require.Assertions, enablePlacementRules bool)
tc.UpdateStoreStatus(uint64(i))
}

hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
re.NoError(err)

scheduleAndApplyOperator(tc, hb, 100)
Expand Down Expand Up @@ -1350,7 +1349,7 @@ func checkBalanceLeaderLimit(re *require.Assertions, enablePlacementRules bool)

// test not allow schedule leader
tc.SetLeaderScheduleLimit(0)
hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
re.NoError(err)

scheduleAndApplyOperator(tc, hb, 100)
Expand All @@ -1374,7 +1373,7 @@ func TestConcurrencyUpdateConfig(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
sche := hb.(*scatterRangeScheduler)
re.NoError(err)
ch := make(chan struct{})
Expand Down Expand Up @@ -1447,14 +1446,14 @@ func TestBalanceWhenRegionNotHeartbeat(t *testing.T) {
tc.UpdateStoreStatus(uint64(i))
}

hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_09", "t"}))
hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_09", "t"}))
re.NoError(err)

scheduleAndApplyOperator(tc, hb, 100)
}

// scheduleAndApplyOperator will try to schedule for `count` times and apply the operator if the operator is created.
func scheduleAndApplyOperator(tc *mockcluster.Cluster, hb schedule.Scheduler, count int) {
func scheduleAndApplyOperator(tc *mockcluster.Cluster, hb Scheduler, count int) {
limit := 0
for {
if limit > count {
Expand Down
7 changes: 3 additions & 4 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
Expand Down Expand Up @@ -106,7 +105,7 @@ func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfi
}

func (conf *balanceWitnessSchedulerConfig) persistLocked() error {
data, err := schedule.EncodeConfig(conf)
data, err := EncodeConfig(conf)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,7 +154,7 @@ type balanceWitnessScheduler struct {

// newBalanceWitnessScheduler creates a scheduler that tends to keep witnesses on
// each store balanced.
func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) schedule.Scheduler {
func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) Scheduler {
base := NewBaseScheduler(opController)
s := &balanceWitnessScheduler{
BaseScheduler: base,
Expand Down Expand Up @@ -209,7 +208,7 @@ func (b *balanceWitnessScheduler) GetType() string {
func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) {
b.conf.mu.RLock()
defer b.conf.mu.RUnlock()
return schedule.EncodeConfig(b.conf)
return EncodeConfig(b.conf)
}

func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool {
Expand Down
Loading

0 comments on commit b57cc7e

Please sign in to comment.