diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index f244f392a15d..203e97e3f5f3 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -64,6 +64,8 @@ type Config interface { // Manager manages keyspace related data. // It validates requests and provides concurrency control. type Manager struct { + // ctx is the context of the manager, to be used in transaction. + ctx context.Context // metaLock guards keyspace meta. metaLock *syncutil.LockGroup // idAllocator allocates keyspace id. @@ -72,8 +74,6 @@ type Manager struct { store endpoint.KeyspaceStorage // rc is the raft cluster of the server. cluster core.ClusterInformer - // ctx is the context of the manager, to be used in transaction. - ctx context.Context // config is the configurations of the manager. config Config // kgm is the keyspace group manager of the server. @@ -104,11 +104,11 @@ func NewKeyspaceManager( kgm *GroupManager, ) *Manager { return &Manager{ + ctx: ctx, metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)), idAllocator: idAllocator, store: store, cluster: cluster, - ctx: ctx, config: config, kgm: kgm, nextPatrolStartID: utils.DefaultKeyspaceID, diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 8588f184469a..f08d1c0f0de2 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -1,4 +1,4 @@ -// Copyright 2017 TiKV Project Authors. +// Copyright 2020 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 7bb62de6e2c2..4f331b870b90 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -121,7 +120,7 @@ func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig } func (conf *balanceLeaderSchedulerConfig) persistLocked() error { - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } @@ -170,7 +169,7 @@ type balanceLeaderScheduler struct { // newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on // each store balanced. -func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) schedule.Scheduler { +func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) Scheduler { base := NewBaseScheduler(opController) s := &balanceLeaderScheduler{ BaseScheduler: base, @@ -224,7 +223,7 @@ func (l *balanceLeaderScheduler) GetType() string { func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { l.conf.mu.RLock() defer l.conf.mu.RUnlock() - return schedule.EncodeConfig(l.conf) + return EncodeConfig(l.conf) } func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index e69d47a68997..ab28f1d8af92 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -67,7 +67,7 @@ type balanceRegionScheduler struct { // newBalanceRegionScheduler creates a scheduler that tends to keep regions on // each store balanced. -func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) schedule.Scheduler { +func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) Scheduler { base := NewBaseScheduler(opController) scheduler := &balanceRegionScheduler{ BaseScheduler: base, @@ -113,7 +113,7 @@ func (s *balanceRegionScheduler) GetType() string { } func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 7de38a6158f9..4d3888308b78 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/mock/mockcluster" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -228,7 +227,7 @@ type balanceLeaderSchedulerTestSuite struct { suite.Suite cancel context.CancelFunc tc *mockcluster.Cluster - lb schedule.Scheduler + lb Scheduler oc *operator.Controller conf config.Config } @@ -239,7 +238,7 @@ func TestBalanceLeaderSchedulerTestSuite(t *testing.T) { func (suite *balanceLeaderSchedulerTestSuite) SetupTest() { suite.cancel, suite.conf, suite.tc, suite.oc = prepareSchedulersTest() - lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) suite.NoError(err) suite.lb = lb } @@ -573,34 +572,34 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestSingleRangeBalance() { suite.tc.UpdateStoreLeaderWeight(3, 1) suite.tc.UpdateStoreLeaderWeight(4, 2) suite.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4) - lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) suite.NoError(err) ops, _ := lb.Schedule(suite.tc, false) suite.NotEmpty(ops) suite.Len(ops, 1) suite.Len(ops[0].Counters, 1) suite.Len(ops[0].FinishedCounters, 3) - lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"})) + lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"b", "f"})) + lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", "f"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "a"})) + lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "a"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"g", ""})) + lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"g", ""})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "f"})) + lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "f"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"b", ""})) + lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", ""})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) @@ -619,7 +618,7 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestMultiRangeBalance() { suite.tc.UpdateStoreLeaderWeight(3, 1) suite.tc.UpdateStoreLeaderWeight(4, 2) suite.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4) - lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "g", "o", "t"})) + lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "g", "o", "t"})) suite.NoError(err) ops, _ := lb.Schedule(suite.tc, false) suite.Equal(uint64(1), ops[0].RegionID()) @@ -657,7 +656,7 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestBatchBalance() { suite.tc.AddLeaderRegionWithRange(uint64(102), "102a", "102z", 1, 2, 3) suite.tc.AddLeaderRegionWithRange(uint64(103), "103a", "103z", 4, 5, 6) - lb, err := schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) suite.NoError(err) ops, _ := lb.Schedule(suite.tc, false) suite.Len(ops, 2) @@ -748,7 +747,7 @@ func checkBalanceRegionSchedule1(re *require.Assertions, enablePlacementRules bo tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 1) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Add stores 1,2,3,4. tc.AddRegionStore(1, 6) @@ -803,7 +802,7 @@ func checkReplica3(re *require.Assertions, enablePlacementRules bool) { tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 3) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Store 1 has the largest region score, so the balance scheduler tries to replace peer in store 1. tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) @@ -877,7 +876,7 @@ func checkReplica5(re *require.Assertions, enablePlacementRules bool) { tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 5) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 5, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) @@ -978,7 +977,7 @@ func checkBalanceRegionSchedule2(re *require.Assertions, enablePlacementRules bo core.SetApproximateKeys(200), ) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 11) @@ -1034,7 +1033,7 @@ func checkBalanceRegionStoreWeight(re *require.Assertions, enablePlacementRules tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 1) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 10) @@ -1069,7 +1068,7 @@ func checkBalanceRegionOpInfluence(re *require.Assertions, enablePlacementRules tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 1) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Add stores 1,2,3,4. tc.AddRegionStoreWithLeader(1, 2) @@ -1105,7 +1104,7 @@ func checkReplacePendingRegion(re *require.Assertions, enablePlacementRules bool tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 3) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Store 1 has the largest region score, so the balance scheduler try to replace peer in store 1. tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) @@ -1135,7 +1134,7 @@ func TestBalanceRegionShouldNotBalance(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) region := tc.MockRegionInfo(1, 0, []uint64{2, 3, 4}, nil, nil) tc.PutRegion(region) @@ -1148,7 +1147,7 @@ func TestBalanceRegionEmptyRegion(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 10) tc.AddRegionStore(2, 9) @@ -1194,7 +1193,7 @@ func checkRandomMergeSchedule(re *require.Assertions, enablePlacementRules bool) tc.SetMaxReplicasWithLabel(enablePlacementRules, 3) tc.SetMergeScheduleLimit(1) - mb, err := schedule.CreateScheduler(RandomMergeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(RandomMergeType, []string{"", ""})) + mb, err := CreateScheduler(RandomMergeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(RandomMergeType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 4) @@ -1276,7 +1275,7 @@ func checkScatterRangeBalance(re *require.Assertions, enablePlacementRules bool) tc.UpdateStoreStatus(uint64(i)) } - hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) + hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) re.NoError(err) scheduleAndApplyOperator(tc, hb, 100) @@ -1350,7 +1349,7 @@ func checkBalanceLeaderLimit(re *require.Assertions, enablePlacementRules bool) // test not allow schedule leader tc.SetLeaderScheduleLimit(0) - hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) + hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) re.NoError(err) scheduleAndApplyOperator(tc, hb, 100) @@ -1374,7 +1373,7 @@ func TestConcurrencyUpdateConfig(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) + hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) sche := hb.(*scatterRangeScheduler) re.NoError(err) ch := make(chan struct{}) @@ -1447,14 +1446,14 @@ func TestBalanceWhenRegionNotHeartbeat(t *testing.T) { tc.UpdateStoreStatus(uint64(i)) } - hb, err := schedule.CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_09", "t"})) + hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_09", "t"})) re.NoError(err) scheduleAndApplyOperator(tc, hb, 100) } // scheduleAndApplyOperator will try to schedule for `count` times and apply the operator if the operator is created. -func scheduleAndApplyOperator(tc *mockcluster.Cluster, hb schedule.Scheduler, count int) { +func scheduleAndApplyOperator(tc *mockcluster.Cluster, hb Scheduler, count int) { limit := 0 for { if limit > count { diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 730122416d63..6547523be2f9 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -106,7 +105,7 @@ func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfi } func (conf *balanceWitnessSchedulerConfig) persistLocked() error { - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } @@ -155,7 +154,7 @@ type balanceWitnessScheduler struct { // newBalanceWitnessScheduler creates a scheduler that tends to keep witnesses on // each store balanced. -func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) schedule.Scheduler { +func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) Scheduler { base := NewBaseScheduler(opController) s := &balanceWitnessScheduler{ BaseScheduler: base, @@ -209,7 +208,7 @@ func (b *balanceWitnessScheduler) GetType() string { func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { b.conf.mu.RLock() defer b.conf.mu.RUnlock() - return schedule.EncodeConfig(b.conf) + return EncodeConfig(b.conf) } func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/schedulers/balance_witness_test.go b/pkg/schedule/schedulers/balance_witness_test.go index d4508b310358..586bc0d13b97 100644 --- a/pkg/schedule/schedulers/balance_witness_test.go +++ b/pkg/schedule/schedulers/balance_witness_test.go @@ -20,7 +20,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/mock/mockcluster" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" @@ -35,7 +34,7 @@ type balanceWitnessSchedulerTestSuite struct { suite.Suite cancel context.CancelFunc tc *mockcluster.Cluster - lb schedule.Scheduler + lb Scheduler oc *operator.Controller conf config.Config } @@ -50,7 +49,7 @@ func (suite *balanceWitnessSchedulerTestSuite) SetupTest() { Count: 4, }, }) - lb, err := schedule.CreateScheduler(BalanceWitnessType, suite.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceWitnessType, []string{"", ""})) + lb, err := CreateScheduler(BalanceWitnessType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceWitnessType, []string{"", ""})) suite.NoError(err) suite.lb = lb } diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index cea51cd09552..c38216424228 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/typeutil" @@ -80,7 +79,7 @@ func (s *BaseScheduler) GetMinInterval() time.Duration { // EncodeConfig encode config for the scheduler func (s *BaseScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(nil) + return EncodeConfig(nil) } // GetNextInterval return the next interval for the scheduler diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index c3e0fbc34da8..46b9d79f5746 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -25,7 +25,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -108,7 +107,7 @@ func (conf *evictLeaderSchedulerConfig) Persist() error { name := conf.getSchedulerName() conf.mu.RLock() defer conf.mu.RUnlock() - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") }) @@ -171,7 +170,7 @@ type evictLeaderScheduler struct { // newEvictLeaderScheduler creates an admin scheduler that transfers all leaders // out of a store. -func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) schedule.Scheduler { +func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) Scheduler { base := NewBaseScheduler(opController) handler := newEvictLeaderHandler(conf) return &evictLeaderScheduler{ @@ -201,7 +200,7 @@ func (s *evictLeaderScheduler) GetType() string { func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *evictLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { diff --git a/pkg/schedule/schedulers/evict_leader_test.go b/pkg/schedule/schedulers/evict_leader_test.go index 8cc1d7a96d19..1babb2d56d5b 100644 --- a/pkg/schedule/schedulers/evict_leader_test.go +++ b/pkg/schedule/schedulers/evict_leader_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -42,7 +41,7 @@ func TestEvictLeader(t *testing.T) { tc.AddLeaderRegion(2, 2, 1) tc.AddLeaderRegion(3, 3, 1) - sl, err := schedule.CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(EvictLeaderType, []string{"1"})) + sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"})) re.NoError(err) re.True(sl.IsScheduleAllowed(tc)) ops, _ := sl.Schedule(tc, false) @@ -55,7 +54,7 @@ func TestEvictLeaderWithUnhealthyPeer(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := schedule.CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(EvictLeaderType, []string{"1"})) + sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"})) re.NoError(err) // Add stores 1, 2, 3 diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index c7da1183afa6..015086ac3deb 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -47,7 +46,7 @@ type evictSlowStoreSchedulerConfig struct { func (conf *evictSlowStoreSchedulerConfig) Persist() error { name := conf.getSchedulerName() - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") }) @@ -107,7 +106,7 @@ func (s *evictSlowStoreScheduler) GetType() string { } func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *evictSlowStoreScheduler) Prepare(cluster sche.ClusterInformer) error { @@ -211,7 +210,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.ClusterInformer, dryRun } // newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. -func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) schedule.Scheduler { +func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { base := NewBaseScheduler(opController) s := &evictSlowStoreScheduler{ diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 88a4864a804d..29ecf9fc022f 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -34,8 +33,8 @@ type evictSlowStoreTestSuite struct { suite.Suite cancel context.CancelFunc tc *mockcluster.Cluster - es schedule.Scheduler - bs schedule.Scheduler + es Scheduler + bs Scheduler oc *operator.Controller } @@ -57,9 +56,9 @@ func (suite *evictSlowStoreTestSuite) SetupTest() { storage := storage.NewStorageWithMemoryBackend() var err error - suite.es, err = schedule.CreateScheduler(EvictSlowStoreType, suite.oc, storage, schedule.ConfigSliceDecoder(EvictSlowStoreType, []string{})) + suite.es, err = CreateScheduler(EvictSlowStoreType, suite.oc, storage, ConfigSliceDecoder(EvictSlowStoreType, []string{})) suite.NoError(err) - suite.bs, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage, schedule.ConfigSliceDecoder(BalanceLeaderType, []string{})) + suite.bs, err = CreateScheduler(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{})) suite.NoError(err) } diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index d17972ef0912..ea13a707b6b0 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -48,7 +47,7 @@ type evictSlowTrendSchedulerConfig struct { func (conf *evictSlowTrendSchedulerConfig) Persist() error { name := conf.getSchedulerName() - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") }) @@ -137,7 +136,7 @@ func (s *evictSlowTrendScheduler) GetType() string { } func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *evictSlowTrendScheduler) Prepare(cluster sche.ClusterInformer) error { @@ -264,7 +263,7 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.ClusterInformer, dryRun return s.scheduleEvictLeader(cluster), nil } -func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) schedule.Scheduler { +func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { return &evictSlowTrendScheduler{ BaseScheduler: NewBaseScheduler(opController), conf: conf, diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index f8f5f6dbd682..c4ff7e501e8e 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -34,8 +33,8 @@ type evictSlowTrendTestSuite struct { suite.Suite cancel context.CancelFunc tc *mockcluster.Cluster - es schedule.Scheduler - bs schedule.Scheduler + es Scheduler + bs Scheduler oc *operator.Controller } @@ -69,9 +68,9 @@ func (suite *evictSlowTrendTestSuite) SetupTest() { storage := storage.NewStorageWithMemoryBackend() var err error - suite.es, err = schedule.CreateScheduler(EvictSlowTrendType, suite.oc, storage, schedule.ConfigSliceDecoder(EvictSlowTrendType, []string{})) + suite.es, err = CreateScheduler(EvictSlowTrendType, suite.oc, storage, ConfigSliceDecoder(EvictSlowTrendType, []string{})) suite.NoError(err) - suite.bs, err = schedule.CreateScheduler(BalanceLeaderType, suite.oc, storage, schedule.ConfigSliceDecoder(BalanceLeaderType, []string{})) + suite.bs, err = CreateScheduler(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{})) suite.NoError(err) } diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index f39876d825ee..fb8fc6343ca1 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -100,7 +99,7 @@ func (conf *grantHotRegionSchedulerConfig) Persist() error { name := conf.getSchedulerName() conf.mu.RLock() defer conf.mu.RUnlock() - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } @@ -147,7 +146,7 @@ func (s *grantHotRegionScheduler) GetType() string { } func (s *grantHotRegionScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } // IsScheduleAllowed returns whether the scheduler is allowed to schedule. diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 8ec8d3dd8917..e21e029ec4d3 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -24,7 +24,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -91,7 +90,7 @@ func (conf *grantLeaderSchedulerConfig) Persist() error { name := conf.getSchedulerName() conf.mu.RLock() defer conf.mu.RUnlock() - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } @@ -152,7 +151,7 @@ type grantLeaderScheduler struct { // newGrantLeaderScheduler creates an admin scheduler that transfers all leaders // to a store. -func newGrantLeaderScheduler(opController *operator.Controller, conf *grantLeaderSchedulerConfig) schedule.Scheduler { +func newGrantLeaderScheduler(opController *operator.Controller, conf *grantLeaderSchedulerConfig) Scheduler { base := NewBaseScheduler(opController) handler := newGrantLeaderHandler(conf) return &grantLeaderScheduler{ @@ -175,7 +174,7 @@ func (s *grantLeaderScheduler) GetType() string { } func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *grantLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index eb5683621bf5..1b1585199f1f 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -24,7 +24,6 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" @@ -149,7 +148,7 @@ type hotRegionSchedulerConfig struct { func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { conf.RLock() defer conf.RUnlock() - return schedule.EncodeConfig(conf) + return EncodeConfig(conf) } func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration { @@ -427,7 +426,7 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * } func (conf *hotRegionSchedulerConfig) persistLocked() error { - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index d3292aaf6d88..d6e129c16788 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" @@ -40,11 +39,11 @@ import ( func init() { schedulePeerPr = 1.0 - schedule.RegisterScheduler(statistics.Write.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(statistics.Write.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { cfg := initHotRegionScheduleConfig() return newHotWriteScheduler(opController, cfg), nil }) - schedule.RegisterScheduler(statistics.Read.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(statistics.Read.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { return newHotReadScheduler(opController, initHotRegionScheduleConfig()), nil }) } @@ -72,7 +71,7 @@ func TestUpgrade(t *testing.T) { cancel, _, _, oc := prepareSchedulersTest() defer cancel() // new - sche, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(HotRegionType, nil)) + sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(HotRegionType, nil)) re.NoError(err) hb := sche.(*hotScheduler) re.Equal([]string{statistics.QueryPriority, statistics.BytePriority}, hb.conf.GetReadPriorities()) @@ -80,7 +79,7 @@ func TestUpgrade(t *testing.T) { re.Equal([]string{statistics.BytePriority, statistics.KeyPriority}, hb.conf.GetWritePeerPriorities()) re.Equal("v2", hb.conf.GetRankFormulaVersion()) // upgrade from json(null) - sche, err = schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("null"))) + sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb = sche.(*hotScheduler) re.Equal([]string{statistics.QueryPriority, statistics.BytePriority}, hb.conf.GetReadPriorities()) @@ -89,7 +88,7 @@ func TestUpgrade(t *testing.T) { re.Equal("v2", hb.conf.GetRankFormulaVersion()) // upgrade from < 5.2 config51 := `{"min-hot-byte-rate":100,"min-hot-key-rate":10,"min-hot-query-rate":10,"max-zombie-rounds":5,"max-peer-number":1000,"byte-rate-rank-step-ratio":0.05,"key-rate-rank-step-ratio":0.05,"query-rate-rank-step-ratio":0.05,"count-rank-step-ratio":0.01,"great-dec-ratio":0.95,"minor-dec-ratio":0.99,"src-tolerance-ratio":1.05,"dst-tolerance-ratio":1.05,"strict-picking-store":"true","enable-for-tiflash":"true"}` - sche, err = schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte(config51))) + sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config51))) re.NoError(err) hb = sche.(*hotScheduler) re.Equal([]string{statistics.BytePriority, statistics.KeyPriority}, hb.conf.GetReadPriorities()) @@ -98,7 +97,7 @@ func TestUpgrade(t *testing.T) { re.Equal("v1", hb.conf.GetRankFormulaVersion()) // upgrade from < 6.4 config54 := `{"min-hot-byte-rate":100,"min-hot-key-rate":10,"min-hot-query-rate":10,"max-zombie-rounds":5,"max-peer-number":1000,"byte-rate-rank-step-ratio":0.05,"key-rate-rank-step-ratio":0.05,"query-rate-rank-step-ratio":0.05,"count-rank-step-ratio":0.01,"great-dec-ratio":0.95,"minor-dec-ratio":0.99,"src-tolerance-ratio":1.05,"dst-tolerance-ratio":1.05,"read-priorities":["query","byte"],"write-leader-priorities":["query","byte"],"write-peer-priorities":["byte","key"],"strict-picking-store":"true","enable-for-tiflash":"true","forbid-rw-type":"none"}` - sche, err = schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte(config54))) + sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config54))) re.NoError(err) hb = sche.(*hotScheduler) re.Equal([]string{statistics.QueryPriority, statistics.BytePriority}, hb.conf.GetReadPriorities()) @@ -124,7 +123,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { tc.PutStoreWithLabels(id) } - sche, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("null"))) + sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) @@ -209,7 +208,7 @@ func TestSplitBuckets(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() tc.SetHotRegionCacheHitsThreshold(1) defer cancel() - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader) region := core.NewTestRegionInfo(1, 1, []byte(""), []byte("")) @@ -251,7 +250,7 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) tc.SetHotRegionCacheHitsThreshold(0) @@ -454,7 +453,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { }, }, })) - sche, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := sche.(*hotScheduler) @@ -643,7 +642,7 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { statistics.Denoising = false statisticsInterval = 0 - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) @@ -679,7 +678,7 @@ func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -738,7 +737,7 @@ func TestHotWriteRegionScheduleUnhealthyStore(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -786,7 +785,7 @@ func TestHotWriteRegionScheduleCheckHot(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -821,7 +820,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{statistics.KeyPriority, statistics.BytePriority} re.NoError(err) @@ -887,7 +886,7 @@ func TestHotWriteRegionScheduleWithPendingInfluence(t *testing.T) { func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim int) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{statistics.KeyPriority, statistics.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" @@ -975,7 +974,7 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnablePlacementRules(true) - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{statistics.KeyPriority, statistics.BytePriority} @@ -1057,7 +1056,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - scheduler, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + scheduler, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := scheduler.(*hotScheduler) hb.conf.ReadPriorities = []string{statistics.BytePriority, statistics.KeyPriority} @@ -1180,7 +1179,7 @@ func TestHotReadRegionScheduleWithQuery(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) @@ -1215,7 +1214,7 @@ func TestHotReadRegionScheduleWithKeyRate(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -1277,7 +1276,7 @@ func TestHotReadRegionScheduleWithPendingInfluence(t *testing.T) { func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim int) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) // For test hb.(*hotScheduler).conf.RankFormulaVersion = "v1" @@ -1395,7 +1394,7 @@ func TestHotReadWithEvictLeaderScheduler(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) @@ -1703,7 +1702,7 @@ func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheC tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) - sche, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("null"))) + sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) heartbeat := tc.AddLeaderRegionWithWriteInfo @@ -1810,7 +1809,7 @@ func TestHotCacheSortHotPeer(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("null"))) + sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) leaderSolver := newBalanceSolver(hb, tc, statistics.Read, transferLeader) @@ -1869,7 +1868,7 @@ func TestInfluenceByRWType(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -1970,7 +1969,7 @@ func checkHotReadPeerSchedule(re *require.Assertions, enablePlacementRules bool) tc.PutStoreWithLabels(id) } - sche, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("null"))) + sche, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.ReadPriorities = []string{statistics.BytePriority, statistics.KeyPriority} @@ -1992,7 +1991,7 @@ func TestHotScheduleWithPriority(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05) @@ -2034,7 +2033,7 @@ func TestHotScheduleWithPriority(t *testing.T) { clearPendingInfluence(hb.(*hotScheduler)) // assert read priority schedule - hb, err = schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err = CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) tc.UpdateStorageReadStats(5, 10*units.MiB*statistics.StoreHeartBeatReportInterval, 10*units.MiB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadStats(4, 10*units.MiB*statistics.StoreHeartBeatReportInterval, 10*units.MiB*statistics.StoreHeartBeatReportInterval) @@ -2054,7 +2053,7 @@ func TestHotScheduleWithPriority(t *testing.T) { re.Len(ops, 1) operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3) - hb, err = schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err = CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{statistics.KeyPriority, statistics.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" re.NoError(err) @@ -2098,7 +2097,7 @@ func TestHotScheduleWithStddev(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1.0) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.0) @@ -2158,7 +2157,7 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -2201,7 +2200,7 @@ func TestCompatibility(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) // default checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ @@ -2269,7 +2268,7 @@ func TestCompatibilityConfig(t *testing.T) { defer cancel() // From new or 3.x cluster, it will use new config - hb, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder("hot-region", nil)) + hb, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ {statistics.QueryDim, statistics.ByteDim}, @@ -2278,8 +2277,8 @@ func TestCompatibilityConfig(t *testing.T) { }) // Config file is not currently supported - hb, err = schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), - schedule.ConfigSliceDecoder("hot-region", []string{"read-priorities=byte,query"})) + hb, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), + ConfigSliceDecoder("hot-region", []string{"read-priorities=byte,query"})) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ {statistics.QueryDim, statistics.ByteDim}, @@ -2290,7 +2289,7 @@ func TestCompatibilityConfig(t *testing.T) { // from 4.0 or 5.0 or 5.1 cluster var data []byte storage := storage.NewStorageWithMemoryBackend() - data, err = schedule.EncodeConfig(map[string]interface{}{ + data, err = EncodeConfig(map[string]interface{}{ "min-hot-byte-rate": 100, "min-hot-key-rate": 10, "max-zombie-rounds": 3, @@ -2306,7 +2305,7 @@ func TestCompatibilityConfig(t *testing.T) { re.NoError(err) err = storage.SaveScheduleConfig(HotRegionName, data) re.NoError(err) - hb, err = schedule.CreateScheduler(HotRegionType, oc, storage, schedule.ConfigJSONDecoder(data)) + hb, err = CreateScheduler(HotRegionType, oc, storage, ConfigJSONDecoder(data)) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ {statistics.ByteDim, statistics.KeyDim}, @@ -2318,11 +2317,11 @@ func TestCompatibilityConfig(t *testing.T) { cfg := initHotRegionScheduleConfig() cfg.ReadPriorities = []string{"key", "query"} cfg.WriteLeaderPriorities = []string{"query", "key"} - data, err = schedule.EncodeConfig(cfg) + data, err = EncodeConfig(cfg) re.NoError(err) err = storage.SaveScheduleConfig(HotRegionName, data) re.NoError(err) - hb, err = schedule.CreateScheduler(HotRegionType, oc, storage, schedule.ConfigJSONDecoder(data)) + hb, err = CreateScheduler(HotRegionType, oc, storage, ConfigJSONDecoder(data)) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ {statistics.KeyDim, statistics.QueryDim}, @@ -2428,7 +2427,7 @@ func TestMaxZombieDuration(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder("hot-region", nil)) + hb, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) re.NoError(err) maxZombieDur := hb.(*hotScheduler).conf.getValidConf().MaxZombieRounds testCases := []maxZombieDurTestCase{ @@ -2481,7 +2480,7 @@ func TestExpect(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder("hot-region", nil)) + hb, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) re.NoError(err) testCases := []struct { initFunc func(*balanceSolver) @@ -2784,7 +2783,7 @@ func TestEncodeConfig(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() defer cancel() - sche, err := schedule.CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("null"))) + sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) data, err := sche.EncodeConfig() re.NoError(err) diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index 1449b3a59468..f8f5959bba6d 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -19,7 +19,6 @@ import ( "github.com/docker/go-units" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" @@ -34,7 +33,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { defer cancel() statistics.Denoising = false - sche, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -96,7 +95,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { defer cancel() statistics.Denoising = false - sche, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -149,7 +148,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -211,7 +210,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -272,7 +271,7 @@ func TestSkipUniformStore(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 7205d034f0ab..268855ed593f 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -21,7 +21,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/storage/endpoint" ) @@ -37,7 +36,7 @@ func Register() { func schedulersRegister() { // balance leader - schedule.RegisterSliceDecoderBuilder(BalanceLeaderType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(BalanceLeaderType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*balanceLeaderSchedulerConfig) if !ok { @@ -53,7 +52,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(BalanceLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(BalanceLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &balanceLeaderSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -65,7 +64,7 @@ func schedulersRegister() { }) // balance region - schedule.RegisterSliceDecoderBuilder(BalanceRegionType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(BalanceRegionType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*balanceRegionSchedulerConfig) if !ok { @@ -81,7 +80,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(BalanceRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(BalanceRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &balanceRegionSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -90,7 +89,7 @@ func schedulersRegister() { }) // balance witness - schedule.RegisterSliceDecoderBuilder(BalanceWitnessType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(BalanceWitnessType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*balanceWitnessSchedulerConfig) if !ok { @@ -106,7 +105,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(BalanceWitnessType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(BalanceWitnessType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &balanceWitnessSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -118,7 +117,7 @@ func schedulersRegister() { }) // evict leader - schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) ConfigDecoder { return func(v interface{}) error { if len(args) != 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") @@ -142,7 +141,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -152,13 +151,13 @@ func schedulersRegister() { }) // evict slow store - schedule.RegisterSliceDecoderBuilder(EvictSlowStoreType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(EvictSlowStoreType, func(args []string) ConfigDecoder { return func(v interface{}) error { return nil } }) - schedule.RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} if err := decoder(conf); err != nil { return nil, err @@ -167,7 +166,7 @@ func schedulersRegister() { }) // grant hot region - schedule.RegisterSliceDecoderBuilder(GrantHotRegionType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(GrantHotRegionType, func(args []string) ConfigDecoder { return func(v interface{}) error { if len(args) != 2 { return errs.ErrSchedulerConfig.FastGenByArgs("id") @@ -197,7 +196,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(GrantHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(GrantHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage} conf.cluster = opController.GetCluster() if err := decoder(conf); err != nil { @@ -207,13 +206,13 @@ func schedulersRegister() { }) // hot region - schedule.RegisterSliceDecoderBuilder(HotRegionType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(HotRegionType, func(args []string) ConfigDecoder { return func(v interface{}) error { return nil } }) - schedule.RegisterScheduler(HotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(HotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := initHotRegionScheduleConfig() var data map[string]interface{} if err := decoder(&data); err != nil { @@ -234,7 +233,7 @@ func schedulersRegister() { }) // grant leader - schedule.RegisterSliceDecoderBuilder(GrantLeaderType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(GrantLeaderType, func(args []string) ConfigDecoder { return func(v interface{}) error { if len(args) != 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") @@ -258,7 +257,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(GrantLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(GrantLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} conf.cluster = opController.GetCluster() if err := decoder(conf); err != nil { @@ -268,7 +267,7 @@ func schedulersRegister() { }) // label - schedule.RegisterSliceDecoderBuilder(LabelType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(LabelType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*labelSchedulerConfig) if !ok { @@ -284,7 +283,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(LabelType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(LabelType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &labelSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -293,7 +292,7 @@ func schedulersRegister() { }) // random merge - schedule.RegisterSliceDecoderBuilder(RandomMergeType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(RandomMergeType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*randomMergeSchedulerConfig) if !ok { @@ -309,7 +308,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(RandomMergeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(RandomMergeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &randomMergeSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -319,7 +318,7 @@ func schedulersRegister() { // scatter range // args: [start-key, end-key, range-name]. - schedule.RegisterSliceDecoderBuilder(ScatterRangeType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(ScatterRangeType, func(args []string) ConfigDecoder { return func(v interface{}) error { if len(args) != 3 { return errs.ErrSchedulerConfig.FastGenByArgs("ranges and name") @@ -338,7 +337,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ScatterRangeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(ScatterRangeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &scatterRangeSchedulerConfig{ storage: storage, } @@ -353,7 +352,7 @@ func schedulersRegister() { }) // shuffle hot region - schedule.RegisterSliceDecoderBuilder(ShuffleHotRegionType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(ShuffleHotRegionType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*shuffleHotRegionSchedulerConfig) if !ok { @@ -372,7 +371,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ShuffleHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(ShuffleHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &shuffleHotRegionSchedulerConfig{Limit: uint64(1)} if err := decoder(conf); err != nil { return nil, err @@ -381,7 +380,7 @@ func schedulersRegister() { }) // shuffle leader - schedule.RegisterSliceDecoderBuilder(ShuffleLeaderType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(ShuffleLeaderType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*shuffleLeaderSchedulerConfig) if !ok { @@ -397,7 +396,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ShuffleLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(ShuffleLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &shuffleLeaderSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -406,7 +405,7 @@ func schedulersRegister() { }) // shuffle region - schedule.RegisterSliceDecoderBuilder(ShuffleRegionType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(ShuffleRegionType, func(args []string) ConfigDecoder { return func(v interface{}) error { conf, ok := v.(*shuffleRegionSchedulerConfig) if !ok { @@ -422,7 +421,7 @@ func schedulersRegister() { } }) - schedule.RegisterScheduler(ShuffleRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(ShuffleRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &shuffleRegionSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -431,13 +430,13 @@ func schedulersRegister() { }) // split bucket - schedule.RegisterSliceDecoderBuilder(SplitBucketType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(SplitBucketType, func(args []string) ConfigDecoder { return func(v interface{}) error { return nil } }) - schedule.RegisterScheduler(SplitBucketType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(SplitBucketType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := initSplitBucketConfig() if err := decoder(conf); err != nil { return nil, err @@ -447,24 +446,24 @@ func schedulersRegister() { }) // transfer witness leader - schedule.RegisterSliceDecoderBuilder(TransferWitnessLeaderType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(TransferWitnessLeaderType, func(args []string) ConfigDecoder { return func(v interface{}) error { return nil } }) - schedule.RegisterScheduler(TransferWitnessLeaderType, func(opController *operator.Controller, _ endpoint.ConfigStorage, _ schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(TransferWitnessLeaderType, func(opController *operator.Controller, _ endpoint.ConfigStorage, _ ConfigDecoder) (Scheduler, error) { return newTransferWitnessLeaderScheduler(opController), nil }) // evict slow store by trend - schedule.RegisterSliceDecoderBuilder(EvictSlowTrendType, func(args []string) schedule.ConfigDecoder { + RegisterSliceDecoderBuilder(EvictSlowTrendType, func(args []string) ConfigDecoder { return func(v interface{}) error { return nil } }) - schedule.RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: 0} if err := decoder(conf); err != nil { return nil, err diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index f71883a0d077..65103e390674 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -19,7 +19,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" @@ -57,7 +56,7 @@ type labelScheduler struct { // LabelScheduler is mainly based on the store's label information for scheduling. // Now only used for reject leader schedule, that will move the leader out of // the store with the specific label. -func newLabelScheduler(opController *operator.Controller, conf *labelSchedulerConfig) schedule.Scheduler { +func newLabelScheduler(opController *operator.Controller, conf *labelSchedulerConfig) Scheduler { return &labelScheduler{ BaseScheduler: NewBaseScheduler(opController), conf: conf, @@ -73,7 +72,7 @@ func (s *labelScheduler) GetType() string { } func (s *labelScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *labelScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index 04397d396eb8..980a69b0cee7 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -21,7 +21,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/checker" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" @@ -58,7 +57,7 @@ type randomMergeScheduler struct { // newRandomMergeScheduler creates an admin scheduler that randomly picks two adjacent regions // then merges them. -func newRandomMergeScheduler(opController *operator.Controller, conf *randomMergeSchedulerConfig) schedule.Scheduler { +func newRandomMergeScheduler(opController *operator.Controller, conf *randomMergeSchedulerConfig) Scheduler { base := NewBaseScheduler(opController) return &randomMergeScheduler{ BaseScheduler: base, @@ -75,7 +74,7 @@ func (s *randomMergeScheduler) GetType() string { } func (s *randomMergeScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 8a2c1e942d11..2892bd46c669 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -84,7 +84,7 @@ func (conf *scatterRangeSchedulerConfig) Persist() error { name := conf.getSchedulerName() conf.mu.RLock() defer conf.mu.RUnlock() - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } @@ -119,13 +119,13 @@ type scatterRangeScheduler struct { *BaseScheduler name string config *scatterRangeSchedulerConfig - balanceLeader schedule.Scheduler - balanceRegion schedule.Scheduler + balanceLeader Scheduler + balanceRegion Scheduler handler http.Handler } // newScatterRangeScheduler creates a scheduler that balances the distribution of leaders and regions that in the specified key range. -func newScatterRangeScheduler(opController *operator.Controller, config *scatterRangeSchedulerConfig) schedule.Scheduler { +func newScatterRangeScheduler(opController *operator.Controller, config *scatterRangeSchedulerConfig) Scheduler { base := NewBaseScheduler(opController) name := config.getSchedulerName() @@ -166,7 +166,7 @@ func (l *scatterRangeScheduler) GetType() string { func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { l.config.mu.RLock() defer l.config.mu.RUnlock() - return schedule.EncodeConfig(l.config) + return EncodeConfig(l.config) } func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/scheduler.go b/pkg/schedule/schedulers/scheduler.go similarity index 97% rename from pkg/schedule/scheduler.go rename to pkg/schedule/schedulers/scheduler.go index 7d1f821d77e3..6cd636cf5dbf 100644 --- a/pkg/schedule/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package schedulers import ( "encoding/json" @@ -35,7 +35,7 @@ import ( type Scheduler interface { http.Handler GetName() string - // GetType should in accordance with the name passing to schedule.RegisterScheduler() + // GetType should in accordance with the name passing to RegisterScheduler() GetType() string EncodeConfig() ([]byte, error) GetMinInterval() time.Duration diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 1dec83dbf1d5..42ea535b9b3b 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -24,7 +24,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/mock/mockconfig" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/operator" @@ -55,7 +54,7 @@ func TestShuffleLeader(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := schedule.CreateScheduler(ShuffleLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ShuffleLeaderType, []string{"", ""})) + sl, err := CreateScheduler(ShuffleLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleLeaderType, []string{"", ""})) re.NoError(err) ops, _ := sl.Schedule(tc, false) re.Empty(ops) @@ -93,7 +92,7 @@ func TestRejectLeader(t *testing.T) { tc.AddLeaderRegion(2, 2, 1, 3) // The label scheduler transfers leader out of store1. - sl, err := schedule.CreateScheduler(LabelType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(LabelType, []string{"", ""})) + sl, err := CreateScheduler(LabelType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(LabelType, []string{"", ""})) re.NoError(err) ops, _ := sl.Schedule(tc, false) operatorutil.CheckTransferLeaderFrom(re, ops[0], operator.OpLeader, 1) @@ -105,13 +104,13 @@ func TestRejectLeader(t *testing.T) { // As store3 is disconnected, store1 rejects leader. Balancer will not create // any operators. - bs, err := schedule.CreateScheduler(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + bs, err := CreateScheduler(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) re.NoError(err) ops, _ = bs.Schedule(tc, false) re.Empty(ops) // Can't evict leader from store2, neither. - el, err := schedule.CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(EvictLeaderType, []string{"2"})) + el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"2"})) re.NoError(err) ops, _ = el.Schedule(tc, false) re.Empty(ops) @@ -137,7 +136,7 @@ func TestRemoveRejectLeader(t *testing.T) { defer cancel() tc.AddRegionStore(1, 0) tc.AddRegionStore(2, 1) - el, err := schedule.CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(EvictLeaderType, []string{"1"})) + el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"})) re.NoError(err) tc.DeleteStore(tc.GetStore(1)) succ, _ := el.(*evictLeaderScheduler).conf.removeStore(1) @@ -157,7 +156,7 @@ func checkBalance(re *require.Assertions, enablePlacementRules bool) { tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) - hb, err := schedule.CreateScheduler(ShuffleHotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder("shuffle-hot-region", []string{"", ""})) + hb, err := CreateScheduler(ShuffleHotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("shuffle-hot-region", []string{"", ""})) re.NoError(err) // Add stores 1, 2, 3, 4, 5, 6 with hot peer counts 3, 2, 2, 2, 0, 0. tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) @@ -204,7 +203,7 @@ func TestHotRegionScheduleAbnormalReplica(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetHotRegionScheduleLimit(0) - hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) tc.AddRegionStore(1, 3) @@ -229,7 +228,7 @@ func TestShuffleRegion(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := schedule.CreateScheduler(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) + sl, err := CreateScheduler(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) re.NoError(err) re.True(sl.IsScheduleAllowed(tc)) ops, _ := sl.Schedule(tc, false) @@ -293,7 +292,7 @@ func TestShuffleRegionRole(t *testing.T) { }, peers[0]) tc.PutRegion(region) - sl, err := schedule.CreateScheduler(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) + sl, err := CreateScheduler(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) re.NoError(err) conf := sl.(*shuffleRegionScheduler).conf @@ -313,10 +312,10 @@ func TestSpecialUseHotRegion(t *testing.T) { defer cancel() storage := storage.NewStorageWithMemoryBackend() - cd := schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}) - bs, err := schedule.CreateScheduler(BalanceRegionType, oc, storage, cd) + cd := ConfigSliceDecoder(BalanceRegionType, []string{"", ""}) + bs, err := CreateScheduler(BalanceRegionType, oc, storage, cd) re.NoError(err) - hs, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage, cd) + hs, err := CreateScheduler(statistics.Write.String(), oc, storage, cd) re.NoError(err) tc.SetHotRegionCacheHitsThreshold(0) @@ -365,8 +364,8 @@ func TestSpecialUseReserved(t *testing.T) { defer cancel() storage := storage.NewStorageWithMemoryBackend() - cd := schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""}) - bs, err := schedule.CreateScheduler(BalanceRegionType, oc, storage, cd) + cd := ConfigSliceDecoder(BalanceRegionType, []string{"", ""}) + bs, err := CreateScheduler(BalanceRegionType, oc, storage, cd) re.NoError(err) tc.SetHotRegionCacheHitsThreshold(0) @@ -400,7 +399,7 @@ func TestBalanceLeaderWithConflictRule(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnablePlacementRules(true) - lb, err := schedule.CreateScheduler(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateScheduler(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) re.NoError(err) tc.AddLeaderStore(1, 1) diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 97cb18471ed1..d3241087d196 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -57,7 +56,7 @@ type shuffleHotRegionScheduler struct { } // newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions -func newShuffleHotRegionScheduler(opController *operator.Controller, conf *shuffleHotRegionSchedulerConfig) schedule.Scheduler { +func newShuffleHotRegionScheduler(opController *operator.Controller, conf *shuffleHotRegionSchedulerConfig) Scheduler { base := newBaseHotScheduler(opController) ret := &shuffleHotRegionScheduler{ baseHotScheduler: base, @@ -75,7 +74,7 @@ func (s *shuffleHotRegionScheduler) GetType() string { } func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index e2ecd40ccae2..b406ad1a782f 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -19,7 +19,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -54,7 +53,7 @@ type shuffleLeaderScheduler struct { // newShuffleLeaderScheduler creates an admin scheduler that shuffles leaders // between stores. -func newShuffleLeaderScheduler(opController *operator.Controller, conf *shuffleLeaderSchedulerConfig) schedule.Scheduler { +func newShuffleLeaderScheduler(opController *operator.Controller, conf *shuffleLeaderSchedulerConfig) Scheduler { filters := []filter.Filter{ &filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true, OperatorLevel: constant.Low}, filter.NewSpecialUseFilter(conf.Name), @@ -76,7 +75,7 @@ func (s *shuffleLeaderScheduler) GetType() string { } func (s *shuffleLeaderScheduler) EncodeConfig() ([]byte, error) { - return schedule.EncodeConfig(s.conf) + return EncodeConfig(s.conf) } func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index f964e13de503..32ff4eb708a2 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -52,7 +51,7 @@ type shuffleRegionScheduler struct { // newShuffleRegionScheduler creates an admin scheduler that shuffles regions // between stores. -func newShuffleRegionScheduler(opController *operator.Controller, conf *shuffleRegionSchedulerConfig) schedule.Scheduler { +func newShuffleRegionScheduler(opController *operator.Controller, conf *shuffleRegionSchedulerConfig) Scheduler { filters := []filter.Filter{ &filter.StoreStateFilter{ActionScope: ShuffleRegionName, MoveRegion: true, OperatorLevel: constant.Low}, filter.NewSpecialUseFilter(ShuffleRegionName), diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index d36fbc7674fc..97ccab136b5f 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -19,7 +19,6 @@ import ( "github.com/gorilla/mux" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" @@ -47,7 +46,7 @@ type shuffleRegionSchedulerConfig struct { func (conf *shuffleRegionSchedulerConfig) EncodeConfig() ([]byte, error) { conf.RLock() defer conf.RUnlock() - return schedule.EncodeConfig(conf) + return EncodeConfig(conf) } func (conf *shuffleRegionSchedulerConfig) GetRoles() []string { @@ -106,7 +105,7 @@ func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, } func (conf *shuffleRegionSchedulerConfig) persist() error { - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 6595433d1488..a1ff2add0968 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -24,7 +24,6 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -81,7 +80,7 @@ func (conf *splitBucketSchedulerConfig) Clone() *splitBucketSchedulerConfig { } func (conf *splitBucketSchedulerConfig) persistLocked() error { - data, err := schedule.EncodeConfig(conf) + data, err := EncodeConfig(conf) if err != nil { return err } diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index e3792bd09b1c..07a9858a0a7b 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -20,7 +20,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -53,7 +52,7 @@ type trasferWitnessLeaderScheduler struct { } // newTransferWitnessLeaderScheduler creates an admin scheduler that transfers witness leader of a region. -func newTransferWitnessLeaderScheduler(opController *operator.Controller) schedule.Scheduler { +func newTransferWitnessLeaderScheduler(opController *operator.Controller) Scheduler { return &trasferWitnessLeaderScheduler{ BaseScheduler: NewBaseScheduler(opController), regions: make(chan *core.RegionInfo, transferWitnessLeaderRecvMaxRegionSize), @@ -126,6 +125,6 @@ func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ } // RecvRegionInfo receives a checked region from coordinator -func RecvRegionInfo(s schedule.Scheduler) chan<- *core.RegionInfo { +func RecvRegionInfo(s Scheduler) chan<- *core.RegionInfo { return s.(*trasferWitnessLeaderScheduler).regions } diff --git a/pkg/schedule/schedulers/transfer_witness_leader_test.go b/pkg/schedule/schedulers/transfer_witness_leader_test.go index 7d2dfeb32247..1da968d8dc2f 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader_test.go +++ b/pkg/schedule/schedulers/transfer_witness_leader_test.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -39,7 +38,7 @@ func TestTransferWitnessLeader(t *testing.T) { // Add regions 1 with leader in stores 1 tc.AddLeaderRegion(1, 1, 2, 3) - sl, err := schedule.CreateScheduler(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) + sl, err := CreateScheduler(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) RecvRegionInfo(sl) <- tc.GetRegion(1) re.True(sl.IsScheduleAllowed(tc)) @@ -54,7 +53,7 @@ func TestTransferWitnessLeaderWithUnhealthyPeer(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := schedule.CreateScheduler(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) + sl, err := CreateScheduler(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) // Add stores 1, 2, 3 diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 81f4411fbf7d..9aea0cd4c6aa 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -25,7 +25,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" @@ -46,7 +45,7 @@ const ( ) func init() { - schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder { + schedulers.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedulers.ConfigDecoder { return func(v interface{}) error { if len(args) != 1 { return errors.New("should specify the store-id") @@ -69,7 +68,7 @@ func init() { } }) - schedule.RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + schedulers.RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedulers.ConfigDecoder) (schedulers.Scheduler, error) { conf := &evictLeaderSchedulerConfig{StoreIDWitRanges: make(map[uint64][]core.KeyRange), storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -130,7 +129,7 @@ func (conf *evictLeaderSchedulerConfig) Persist() error { name := conf.getScheduleName() conf.mu.RLock() defer conf.mu.RUnlock() - data, err := schedule.EncodeConfig(conf) + data, err := schedulers.EncodeConfig(conf) if err != nil { return err } @@ -159,7 +158,7 @@ type evictLeaderScheduler struct { // newEvictLeaderScheduler creates an admin scheduler that transfers all leaders // out of a store. -func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) schedule.Scheduler { +func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) schedulers.Scheduler { base := schedulers.NewBaseScheduler(opController) handler := newEvictLeaderHandler(conf) return &evictLeaderScheduler{ @@ -184,7 +183,7 @@ func (s *evictLeaderScheduler) GetType() string { func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() - return schedule.EncodeConfig(s.conf) + return schedulers.EncodeConfig(s.conf) } func (s *evictLeaderScheduler) Prepare(cluster sche.ClusterInformer) error { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a373c4c90534..a583a902bc70 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -692,7 +692,7 @@ func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler { } // AddScheduler adds a scheduler. -func (c *RaftCluster) AddScheduler(scheduler schedule.Scheduler, args ...string) error { +func (c *RaftCluster) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { return c.coordinator.addScheduler(scheduler, args...) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index f76d2db8d83c..ca67f1052b33 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/progress" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" @@ -307,9 +306,9 @@ func TestSetOfflineWithReplica(t *testing.T) { re.NoError(cluster.RemoveStore(3, true)) } -func addEvictLeaderScheduler(cluster *RaftCluster, storeID uint64) (evictScheduler schedule.Scheduler, err error) { +func addEvictLeaderScheduler(cluster *RaftCluster, storeID uint64) (evictScheduler schedulers.Scheduler, err error) { args := []string{fmt.Sprintf("%d", storeID)} - evictScheduler, err = schedule.CreateScheduler(schedulers.EvictLeaderType, cluster.GetOperatorController(), cluster.storage, schedule.ConfigSliceDecoder(schedulers.EvictLeaderType, args)) + evictScheduler, err = schedulers.CreateScheduler(schedulers.EvictLeaderType, cluster.GetOperatorController(), cluster.storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, args)) if err != nil { return } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index dd3056fad304..8eb80a07efe6 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -357,7 +357,7 @@ func (c *coordinator) run() { // The new way to create scheduler with the independent configuration. for i, name := range scheduleNames { data := configs[i] - typ := schedule.FindSchedulerTypeByName(name) + typ := schedulers.FindSchedulerTypeByName(name) var cfg config.SchedulerConfig for _, c := range scheduleCfg.Schedulers { if c.Type == typ { @@ -373,7 +373,7 @@ func (c *coordinator) run() { log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args)) continue } - s, err := schedule.CreateScheduler(cfg.Type, c.opController, c.cluster.storage, schedule.ConfigJSONDecoder([]byte(data))) + s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.storage, schedulers.ConfigJSONDecoder([]byte(data))) if err != nil { log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) continue @@ -394,7 +394,7 @@ func (c *coordinator) run() { continue } - s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.storage, schedule.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)) + s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.storage, schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)) if err != nil { log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) continue @@ -443,7 +443,7 @@ func (c *coordinator) LoadPlugin(pluginPath string, ch chan string) { } schedulerArgs := SchedulerArgs.(func() []string) // create and add user scheduler - s, err := schedule.CreateScheduler(schedulerType(), c.opController, c.cluster.storage, schedule.ConfigSliceDecoder(schedulerType(), schedulerArgs())) + s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.storage, schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs())) if err != nil { log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err)) return @@ -623,7 +623,7 @@ func (c *coordinator) shouldRun() bool { return c.prepareChecker.check(c.cluster.GetBasicCluster()) } -func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) error { +func (c *coordinator) addScheduler(scheduler schedulers.Scheduler, args ...string) error { c.Lock() defer c.Unlock() @@ -681,8 +681,8 @@ func (c *coordinator) removeOptScheduler(o *config.PersistOptions, name string) v := o.GetScheduleConfig().Clone() for i, schedulerCfg := range v.Schedulers { // To create a temporary scheduler is just used to get scheduler's name - decoder := schedule.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args) - tmp, err := schedule.CreateScheduler(schedulerCfg.Type, operator.NewController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder) + decoder := schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args) + tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, operator.NewController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder) if err != nil { return err } @@ -851,9 +851,9 @@ func (c *coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error return c.diagnosticManager.getDiagnosticResult(name) } -// scheduleController is used to manage a scheduler to schedule. +// scheduleController is used to manage a scheduler to schedulers. type scheduleController struct { - schedule.Scheduler + schedulers.Scheduler cluster *RaftCluster opController *operator.Controller nextInterval time.Duration @@ -865,7 +865,7 @@ type scheduleController struct { } // newScheduleController creates a new scheduleController. -func newScheduleController(c *coordinator, s schedule.Scheduler) *scheduleController { +func newScheduleController(c *coordinator, s schedulers.Scheduler) *scheduleController { ctx, cancel := context.WithCancel(c.ctx) return &scheduleController{ Scheduler: s, @@ -939,7 +939,7 @@ func (s *scheduleController) GetInterval() time.Duration { return s.nextInterval } -// AllowSchedule returns if a scheduler is allowed to schedule. +// AllowSchedule returns if a scheduler is allowed to schedulers. func (s *scheduleController) AllowSchedule(diagnosable bool) bool { if !s.Scheduler.IsScheduleAllowed(s.cluster) { if diagnosable { diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 16fcac79ce32..2f2a5055e0b1 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/mock/mockhbstream" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" @@ -594,7 +593,7 @@ func TestPeerState(t *testing.T) { stream := mockhbstream.NewHeartbeatStream() - // Wait for schedule. + // Wait for schedulers. waitOperator(re, co, 1) operatorutil.CheckTransferPeer(re, co.opController.GetOperator(1), operator.OpKind(0), 4, 1) @@ -746,7 +745,7 @@ func TestAddScheduler(t *testing.T) { oc := co.opController // test ConfigJSONDecoder create - bl, err := schedule.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("{}"))) + bl, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) conf, err := bl.EncodeConfig() re.NoError(err) @@ -756,16 +755,16 @@ func TestAddScheduler(t *testing.T) { batch := data["batch"].(float64) re.Equal(4, int(batch)) - gls, err := schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"})) + gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"})) re.NoError(err) re.NotNil(co.addScheduler(gls)) re.NotNil(co.removeScheduler(gls.GetName())) - gls, err = schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) + gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) re.NoError(err) re.NoError(co.addScheduler(gls)) - hb, err := schedule.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigJSONDecoder([]byte("{}"))) + hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) conf, err = hb.EncodeConfig() re.NoError(err) @@ -807,10 +806,10 @@ func TestPersistScheduler(t *testing.T) { oc := co.opController storage := tc.RaftCluster.storage - gls1, err := schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) re.NoError(err) re.NoError(co.addScheduler(gls1, "1")) - evict, err := schedule.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"})) + evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"})) re.NoError(err) re.NoError(co.addScheduler(evict, "2")) re.Len(co.schedulers, defaultCount+2) @@ -832,7 +831,7 @@ func TestPersistScheduler(t *testing.T) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() re.NoError(err) - _, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) + _, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) re.NoError(err) // suppose we add a new default enable scheduler config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"}) @@ -864,10 +863,10 @@ func TestPersistScheduler(t *testing.T) { co = newCoordinator(ctx, tc.RaftCluster, hbStreams) co.run() re.Len(co.schedulers, 3) - bls, err := schedule.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) re.NoError(co.addScheduler(bls)) - brs, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) re.NoError(co.addScheduler(brs)) re.Len(co.schedulers, defaultCount) @@ -953,7 +952,7 @@ func TestRemoveScheduler(t *testing.T) { oc := co.opController storage := tc.RaftCluster.storage - gls1, err := schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) re.NoError(err) re.NoError(co.addScheduler(gls1, "1")) re.Len(co.schedulers, defaultCount+1) @@ -1139,7 +1138,7 @@ func TestStoreOverloaded(t *testing.T) { tc, co, cleanup := prepare(nil, nil, nil, re) defer cleanup() oc := co.opController - lb, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) opt := tc.GetOpts() re.NoError(tc.addRegionStore(4, 100)) @@ -1193,7 +1192,7 @@ func TestStoreOverloadedWithReplace(t *testing.T) { tc, co, cleanup := prepare(nil, nil, nil, re) defer cleanup() oc := co.opController - lb, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) re.NoError(tc.addRegionStore(4, 100)) @@ -1266,7 +1265,7 @@ func TestDownStoreLimit(t *testing.T) { // FIXME: remove after move into schedulers package type mockLimitScheduler struct { - schedule.Scheduler + schedulers.Scheduler limit uint64 counter *operator.Controller kind operator.OpKind @@ -1285,7 +1284,7 @@ func TestController(t *testing.T) { re.NoError(tc.addLeaderRegion(1, 1)) re.NoError(tc.addLeaderRegion(2, 2)) - scheduler, err := schedule.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + scheduler, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) lb := &mockLimitScheduler{ Scheduler: scheduler, @@ -1371,7 +1370,7 @@ func TestInterval(t *testing.T) { _, co, cleanup := prepare(nil, nil, nil, re) defer cleanup() - lb, err := schedule.CreateScheduler(schedulers.BalanceLeaderType, co.opController, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.opController, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) sc := newScheduleController(co, lb) diff --git a/server/handler.go b/server/handler.go index 7366047e25d2..8426d2eda5cf 100644 --- a/server/handler.go +++ b/server/handler.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" @@ -221,7 +220,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } - s, err := schedule.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedule.ConfigSliceDecoder(name, args)) + s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args)) if err != nil { return err } diff --git a/server/server.go b/server/server.go index 89ed91a0505b..18c9759e2943 100644 --- a/server/server.go +++ b/server/server.go @@ -56,9 +56,9 @@ import ( mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/ratelimit" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" @@ -908,7 +908,7 @@ func (s *Server) GetConfig() *config.Config { payload := make(map[string]interface{}) for i, sche := range sches { var config interface{} - err := schedule.DecodeConfig([]byte(configs[i]), &config) + err := schedulers.DecodeConfig([]byte(configs[i]), &config) if err != nil { log.Error("failed to decode scheduler config", zap.String("config", configs[i]),