From 5a4e9efd846e0ee0d15ae4f9f91aab81f4382da3 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 20 Nov 2023 17:46:10 +0800 Subject: [PATCH 1/4] mcs: fix scheduler memory sync in api server (#7389) close tikv/pd#7388 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/config/watcher.go | 3 +- pkg/schedule/schedulers/base_scheduler.go | 8 +- pkg/schedule/schedulers/evict_leader.go | 4 +- pkg/schedule/schedulers/evict_slow_store.go | 4 +- .../schedulers/evict_slow_store_test.go | 4 +- pkg/schedule/schedulers/evict_slow_trend.go | 4 +- .../schedulers/evict_slow_trend_test.go | 4 +- pkg/schedule/schedulers/grant_leader.go | 4 +- pkg/schedule/schedulers/scheduler.go | 4 +- .../schedulers/scheduler_controller.go | 8 +- plugin/scheduler_example/evict_leader.go | 4 +- tests/pdctl/scheduler/scheduler_test.go | 126 +++++++++++++----- 12 files changed, 121 insertions(+), 56 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 6ad37045000..433933674ea 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -147,7 +147,8 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { name := strings.TrimPrefix(string(kv.Key), prefixToTrim) - log.Info("update scheduler config", zap.String("name", string(kv.Value))) + log.Info("update scheduler config", zap.String("name", name), + zap.String("value", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 6e712c18fe3..f4c8c577767 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -92,8 +92,8 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration { return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth) } -// Prepare does some prepare work -func (s *BaseScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil } +// PrepareConfig does some prepare work about config. +func (s *BaseScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return nil } -// Cleanup does some cleanup work -func (s *BaseScheduler) Cleanup(cluster sche.SchedulerCluster) {} +// CleanConfig does some cleanup work about config. +func (s *BaseScheduler) CleanConfig(cluster sche.SchedulerCluster) {} diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index a5c67856df8..332002043a3 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -239,7 +239,7 @@ func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint6 } } -func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -251,7 +251,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { return res } -func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWithRanges { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index cc1b16300c5..563f9f68c45 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -189,7 +189,7 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictStore := s.conf.evictStore() if evictStore != 0 { return cluster.SlowStoreEvicted(evictStore) @@ -197,7 +197,7 @@ func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil } -func (s *evictSlowStoreScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.cleanupEvictLeader(cluster) } diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 813d17ae541..11cd69e60f7 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -123,13 +123,13 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { suite.True(ok) suite.Zero(es2.conf.evictStore()) // prepare with no evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictStore()) suite.False(es2.conf.readyForRecovery()) // prepare with evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) } func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePersistFail() { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index f31ba420c97..0d2c10e2bfe 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -270,7 +270,7 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictedStoreID := s.conf.evictedStore() if evictedStoreID == 0 { return nil @@ -278,7 +278,7 @@ func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error { return cluster.SlowTrendEvicted(evictedStoreID) } -func (s *evictSlowTrendScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.cleanupEvictLeader(cluster) } diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index c6ad058455f..75ea50d73b4 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -255,10 +255,10 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendPrepare() { suite.True(ok) suite.Zero(es2.conf.evictedStore()) // prepare with no evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictedStore()) // prepare with evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) } diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index f244228a10f..84f830f368b 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -197,7 +197,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error { return nil } -func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -209,7 +209,7 @@ func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { return res } -func (s *grantLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWithRanges { diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 9262f7d0a65..1c788989454 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -42,8 +42,8 @@ type Scheduler interface { ReloadConfig() error GetMinInterval() time.Duration GetNextInterval(interval time.Duration) time.Duration - Prepare(cluster sche.SchedulerCluster) error - Cleanup(cluster sche.SchedulerCluster) + PrepareConfig(cluster sche.SchedulerCluster) error + CleanConfig(cluster sche.SchedulerCluster) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) IsScheduleAllowed(cluster sche.SchedulerCluster) bool } diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 5097a5f3f1c..b65173c1f5b 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -156,7 +156,8 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er return err } c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) - return nil + err := scheduler.PrepareConfig(c.cluster) + return err } // RemoveSchedulerHandler removes the HTTP handler for a scheduler. @@ -183,6 +184,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error { return err } + s.(Scheduler).CleanConfig(c.cluster) delete(c.schedulerHandlers, name) return nil @@ -198,7 +200,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { } s := NewScheduleController(c.ctx, c.cluster, c.opController, scheduler) - if err := s.Scheduler.Prepare(c.cluster); err != nil { + if err := s.Scheduler.PrepareConfig(c.cluster); err != nil { return err } @@ -343,7 +345,7 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) { func (c *Controller) runScheduler(s *ScheduleController) { defer logutil.LogPanic() defer c.wg.Done() - defer s.Scheduler.Cleanup(c.cluster) + defer s.Scheduler.CleanConfig(c.cluster) ticker := time.NewTicker(s.GetInterval()) defer ticker.Stop() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 8919d1bdb4b..063ae9eb150 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -186,7 +186,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { return schedulers.EncodeConfig(s.conf) } -func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -198,7 +198,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { return res } -func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWitRanges { diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index d0fac2c1137..7098637c84a 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -17,6 +17,7 @@ package scheduler_test import ( "context" "encoding/json" + "fmt" "reflect" "strings" "testing" @@ -28,6 +29,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" @@ -84,7 +86,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { checkSchedulerCommand := func(args []string, expected map[string]bool) { if args != nil { - mustExec(re, cmd, args, nil) + echo := mustExec(re, cmd, args, nil) + re.Contains(echo, "Success!") } testutil.Eventually(re, func() bool { var schedulers []string @@ -137,9 +140,40 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { } checkSchedulerCommand(args, expected) - schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler"} + // avoid the influence of the scheduler order + schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} + + checkStorePause := func(changedStores []uint64, schedulerName string) { + status := func() string { + switch schedulerName { + case "evict-leader-scheduler": + return "paused" + case "grant-leader-scheduler": + return "resumed" + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + return "" + } + }() + for _, store := range stores { + isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() + if slice.AnyOf(changedStores, func(i int) bool { + return store.GetId() == changedStores[i] + }) { + re.True(isStorePaused, + fmt.Sprintf("store %d should be %s with %s", store.GetId(), status, schedulerName)) + } else { + re.False(isStorePaused, + fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) + } + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) + } + } + } for idx := range schedulers { + checkStorePause([]uint64{}, schedulers[idx]) // scheduler add command args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} expected = map[string]bool{ @@ -155,6 +189,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { expectedConfig := make(map[string]interface{}) expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) // scheduler config update command args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} @@ -165,14 +200,12 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, } - checkSchedulerCommand(args, expected) // check update success - // FIXME: remove this check after scheduler config is updated - if cluster.GetSchedulingPrimaryServer() == nil && schedulers[idx] == "grant-leader-scheduler" { - expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - } + checkSchedulerCommand(args, expected) + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 3}, schedulers[idx]) // scheduler delete command args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} @@ -183,6 +216,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "balance-witness-scheduler": true, } checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) // scheduler add command args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} @@ -194,6 +228,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "balance-witness-scheduler": true, } checkSchedulerCommand(args, expected) + checkStorePause([]uint64{2}, schedulers[idx]) // scheduler add command twice args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} @@ -209,6 +244,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { // check add success expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "4": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 4}, schedulers[idx]) // scheduler remove command [old] args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} @@ -224,6 +260,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { // check remove success expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) // scheduler remove command, when remove the last store, it should remove whole scheduler args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} @@ -234,6 +271,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "balance-witness-scheduler": true, } checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) } // test shuffle region config @@ -247,7 +285,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { var roles []string mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) re.Equal([]string{"leader", "follower", "learner"}, roles) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) // todo:add check output + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) re.Equal([]string{"learner"}, roles) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) @@ -270,7 +309,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) re.Equal(expected3, conf3) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + re.Contains(echo, "Success!") expected3["store-leader-id"] = float64(2) // FIXME: remove this check after scheduler config is updated if cluster.GetSchedulingPrimaryServer() == nil { // "grant-hot-region-scheduler" @@ -279,7 +319,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { } // test remove and add scheduler - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") @@ -326,7 +366,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { re.Equal(expected1, conf) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) re.Equal(expected1, conf) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + re.Contains(echo, "Success!") expected1["src-tolerance-ratio"] = 1.02 var conf1 map[string]interface{} // FIXME: remove this check after scheduler config is updated @@ -334,52 +375,66 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + re.Contains(echo, "Success!") expected1["read-priorities"] = []interface{}{"byte", "key"} mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + re.Contains(echo, "Success!") expected1["read-priorities"] = []interface{}{"key", "byte"} mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) // write-priorities is divided into write-leader-priorities and write-peer-priorities - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + re.Contains(echo, "Failed!") + re.Contains(echo, "Config item is not found.") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) expected1["rank-formula-version"] = "v2" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) expected1["rank-formula-version"] = "v1" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) expected1["forbid-rw-type"] = "read" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) @@ -412,7 +467,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { conf1 = make(map[string]interface{}) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) re.Equal(4., conf["batch"]) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + re.Contains(echo, "Success!") testutil.Eventually(re, func() bool { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) return conf1["batch"] == 3. @@ -465,7 +521,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { } mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") checkSchedulerWithStatusCommand("paused", []string{ "balance-leader-scheduler", }) @@ -476,7 +533,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { }, testutil.WithWaitFor(30*time.Second)) mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") checkSchedulerWithStatusCommand("paused", nil) // set label scheduler to disabled manually. @@ -547,11 +605,14 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") // scheduler delete command - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") } @@ -604,7 +665,8 @@ func TestForwardSchedulerRequest(t *testing.T) { re.Contains(string(output), "Usage") } mustUsage([]string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler"}) - mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + echo := mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") checkSchedulerWithStatusCommand := func(status string, expected []string) { var schedulers []string mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "show", "--status", status}, &schedulers) From 89c83748a253df8effd6d8b77c5a8a5bec4e0f74 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Nov 2023 18:01:12 +0800 Subject: [PATCH 2/4] mcs: fix sync store label (#7396) close tikv/pd#7391, close tikv/pd#7393, close tikv/pd#7394 Signed-off-by: Ryan Leung --- pkg/core/store_option.go | 20 +++++++++++++++++++ pkg/mcs/scheduling/server/cluster.go | 3 +-- pkg/mcs/scheduling/server/meta/watcher.go | 2 +- .../integrations/mcs/scheduling/meta_test.go | 11 ++++++++++ .../mcs/scheduling/server_test.go | 3 ++- tests/server/cluster/cluster_test.go | 1 + 6 files changed, 36 insertions(+), 4 deletions(-) diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 8a2aa1ef089..0bdfaffa44f 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -274,3 +274,23 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption { store.lastAwakenTime = lastAwaken } } + +// SetStoreMeta sets the meta for the store. +func SetStoreMeta(newMeta *metapb.Store) StoreCreateOption { + return func(store *StoreInfo) { + meta := typeutil.DeepClone(store.meta, StoreFactory) + meta.Version = newMeta.GetVersion() + meta.GitHash = newMeta.GetGitHash() + meta.Address = newMeta.GetAddress() + meta.StatusAddress = newMeta.GetStatusAddress() + meta.PeerAddress = newMeta.GetPeerAddress() + meta.StartTimestamp = newMeta.GetStartTimestamp() + meta.DeployPath = newMeta.GetDeployPath() + meta.LastHeartbeat = newMeta.GetLastHeartbeat() + meta.State = newMeta.GetState() + meta.Labels = newMeta.GetLabels() + meta.NodeState = newMeta.GetNodeState() + meta.PhysicallyDestroyed = newMeta.GetPhysicallyDestroyed() + store.meta = meta + } +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index ac15212553b..96ee88259da 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -381,8 +381,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq return errors.Errorf("store %v not found", storeID) } - nowTime := time.Now() - newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + newStore := store.Clone(core.SetStoreStats(stats)) if store := c.GetStore(storeID); store != nil { statistics.UpdateStoreHeartbeatMetrics(store) diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 3dbd0fc8c92..3a04c261163 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -81,7 +81,7 @@ func (w *Watcher) initializeStoreWatcher() error { w.basicCluster.PutStore(core.NewStoreInfo(store)) return nil } - w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed()))) + w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store))) return nil } deleteFn := func(kv *mvccpb.KeyValue) error { diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index 74497e0b552..ce0dc620aef 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -99,4 +99,15 @@ func (suite *metaTestSuite) TestStoreWatch() { testutil.Eventually(re, func() bool { return cluster.GetStore(2) == nil }) + + // test synchronized store labels + suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + &metapb.Store{Id: 5, Address: "mock-5", State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}}, + ) + testutil.Eventually(re, func() bool { + if len(cluster.GetStore(5).GetLabels()) == 0 { + return false + } + return cluster.GetStore(5).GetLabels()[0].GetValue() == "z1" + }) } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 41c00b8e9b4..eb99411d27e 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -59,7 +59,7 @@ func TestServerTestSuite(t *testing.T) { func (suite *serverTestSuite) SetupSuite() { var err error re := suite.Require() - + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3) re.NoError(err) @@ -76,6 +76,7 @@ func (suite *serverTestSuite) SetupSuite() { func (suite *serverTestSuite) TearDownSuite() { suite.cluster.Destroy() suite.cancel() + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } func (suite *serverTestSuite) TestAllocID() { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index ccb469c04cb..6d233a8c8ab 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -510,6 +510,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) { err = rc.PutStore(store) re.NoError(err) re.NotNil(tc) + rc.Stop() // let the job run at small interval re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) From f9f9be60531d0561860f1f34a741252f843554b0 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 Nov 2023 12:04:10 +0800 Subject: [PATCH 3/4] *: clean up handling metrics process (#7370) ref tikv/pd#5839, close tikv/pd#7391 Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 16 ---------------- pkg/statistics/hot_cache.go | 4 ++-- pkg/statistics/region_collection.go | 8 ++++---- pkg/statistics/store_collection.go | 3 +++ server/cluster/cluster.go | 4 ++-- server/cluster/cluster_test.go | 21 ++++++++++++++------- server/cluster/scheduling_controller.go | 21 +++++---------------- tests/server/cluster/cluster_test.go | 2 -- 8 files changed, 30 insertions(+), 49 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 96ee88259da..59b2e691658 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -485,10 +485,6 @@ func (c *Cluster) collectMetrics() { c.coordinator.GetSchedulersController().CollectSchedulerMetrics() c.coordinator.CollectHotSpotMetrics() - c.collectClusterMetrics() -} - -func (c *Cluster) collectClusterMetrics() { if c.regionStats == nil { return } @@ -500,20 +496,8 @@ func (c *Cluster) collectClusterMetrics() { func (c *Cluster) resetMetrics() { statistics.Reset() - schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() - c.resetClusterMetrics() -} - -func (c *Cluster) resetClusterMetrics() { - if c.regionStats == nil { - return - } - c.regionStats.Reset() - c.labelStats.Reset() - // reset hot cache metrics - c.hotStat.ResetMetrics() } // StartBackgroundJobs starts background jobs. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index de7189a1332..1868e323b0f 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -125,8 +125,8 @@ func (w *HotCache) CollectMetrics() { w.CheckReadAsync(newCollectMetricsTask()) } -// ResetMetrics resets the hot cache metrics. -func (w *HotCache) ResetMetrics() { +// ResetHotCacheStatusMetrics resets the hot cache metrics. +func ResetHotCacheStatusMetrics() { hotCacheStatusGauge.Reset() } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 26cbea9ef92..21af8e152fd 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -272,8 +272,8 @@ func (r *RegionStatistics) Collect() { regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader]))) } -// Reset resets the metrics of the regions' status. -func (r *RegionStatistics) Reset() { +// ResetRegionStatsMetrics resets the metrics of the regions' status. +func ResetRegionStatsMetrics() { regionMissPeerRegionCounter.Set(0) regionExtraPeerRegionCounter.Set(0) regionDownPeerRegionCounter.Set(0) @@ -326,8 +326,8 @@ func (l *LabelStatistics) Collect() { } } -// Reset resets the metrics of the label status. -func (l *LabelStatistics) Reset() { +// ResetLabelStatsMetrics resets the metrics of the label status. +func ResetLabelStatsMetrics() { regionLabelLevelGauge.Reset() } diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index dcdd77d9112..aacd45338d1 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -322,4 +322,7 @@ func Reset() { storeStatusGauge.Reset() clusterStatusGauge.Reset() placementStatusGauge.Reset() + ResetRegionStatsMetrics() + ResetLabelStatsMetrics() + ResetHotCacheStatusMetrics() } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3b826d8d33e..e9cece0faa7 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -654,7 +654,7 @@ func (c *RaftCluster) runMetricsCollectionJob() { ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { ticker.Stop() - ticker = time.NewTicker(time.Microsecond) + ticker = time.NewTicker(time.Millisecond) }) defer ticker.Stop() @@ -734,10 +734,10 @@ func (c *RaftCluster) Stop() { return } c.running = false + c.cancel() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.cancel() c.Unlock() c.wg.Wait() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index e9ce35dfb54..b1a3535e90f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2485,7 +2485,10 @@ func TestCollectMetricsConcurrent(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() - + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() // Make sure there are no problem when concurrent write and read var wg sync.WaitGroup count := 10 @@ -2498,15 +2501,14 @@ func TestCollectMetricsConcurrent(t *testing.T) { } }(i) } - controller := co.GetSchedulersController() for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + rc.collectSchedulingMetrics() } schedule.ResetHotSpotMetrics() schedulers.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetStatisticsMetrics() + rc.resetSchedulingMetrics() wg.Wait() } @@ -2520,6 +2522,11 @@ func TestCollectMetrics(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() + + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() count := 10 for i := 0; i <= count; i++ { for k := 0; k < 200; k++ { @@ -2533,11 +2540,11 @@ func TestCollectMetrics(t *testing.T) { tc.hotStat.HotCache.Update(item, utils.Write) } } - controller := co.GetSchedulersController() + for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + rc.collectSchedulingMetrics() } stores := co.GetCluster().GetStores() regionStats := co.GetCluster().RegionWriteStats() @@ -2552,7 +2559,7 @@ func TestCollectMetrics(t *testing.T) { re.Equal(status1, status2) schedule.ResetHotSpotMetrics() schedulers.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetStatisticsMetrics() + rc.resetSchedulingMetrics() } func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index bb6470252b0..04c77498948 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -149,7 +149,7 @@ func (sc *schedulingController) runSchedulingMetricsCollectionJob() { ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { ticker.Stop() - ticker = time.NewTicker(time.Microsecond) + ticker = time.NewTicker(time.Millisecond) }) defer ticker.Stop() @@ -170,7 +170,10 @@ func (sc *schedulingController) resetSchedulingMetrics() { statistics.Reset() schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() - sc.resetStatisticsMetrics() + statistics.ResetRegionStatsMetrics() + statistics.ResetLabelStatsMetrics() + // reset hot cache metrics + statistics.ResetHotCacheStatusMetrics() } func (sc *schedulingController) collectSchedulingMetrics() { @@ -183,20 +186,6 @@ func (sc *schedulingController) collectSchedulingMetrics() { statsMap.Collect() sc.coordinator.GetSchedulersController().CollectSchedulerMetrics() sc.coordinator.CollectHotSpotMetrics() - sc.collectStatisticsMetrics() -} - -func (sc *schedulingController) resetStatisticsMetrics() { - if sc.regionStats == nil { - return - } - sc.regionStats.Reset() - sc.labelStats.Reset() - // reset hot cache metrics - sc.hotStat.ResetMetrics() -} - -func (sc *schedulingController) collectStatisticsMetrics() { if sc.regionStats == nil { return } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 6d233a8c8ab..a5861f1ba43 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -518,8 +518,6 @@ func TestRaftClusterMultipleRestart(t *testing.T) { err = rc.Start(leaderServer.GetServer()) re.NoError(err) time.Sleep(time.Millisecond) - rc = leaderServer.GetRaftCluster() - re.NotNil(rc) rc.Stop() } re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) From 5a2a8d6965b8921230124ce478aed5b4203d2363 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 21 Nov 2023 14:14:41 +0800 Subject: [PATCH 4/4] *: make TestConfigTestSuite stable (#7398) close tikv/pd#7395 Signed-off-by: lhy1024 --- client/retry/backoff.go | 4 +++- pkg/schedule/operator/operator_test.go | 3 +-- tests/integrations/mcs/tso/server_test.go | 2 +- tests/pdctl/config/config_test.go | 5 +++-- tests/server/api/rule_test.go | 1 - tests/server/cluster/cluster_test.go | 6 +++--- tests/server/member/member_test.go | 4 ++-- tests/server/region_syncer/region_syncer_test.go | 2 +- 8 files changed, 14 insertions(+), 13 deletions(-) diff --git a/client/retry/backoff.go b/client/retry/backoff.go index e2ca9ab3972..b47a39d8eaa 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -34,9 +34,11 @@ func (bo *BackOffer) Exec( fn func() error, ) error { if err := fn(); err != nil { + after := time.NewTimer(bo.nextInterval()) + defer after.Stop() select { case <-ctx.Done(): - case <-time.After(bo.nextInterval()): + case <-after.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true }) diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 9d924738543..c16d929f379 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -17,7 +17,6 @@ package operator import ( "context" "encoding/json" - "fmt" "sync/atomic" "testing" "time" @@ -514,7 +513,7 @@ func (suite *operatorTestSuite) TestOpStepTimeout() { }, } for i, v := range testData { - fmt.Printf("case:%d\n", i) + suite.T().Logf("case: %d", i) for _, step := range v.step { suite.Equal(v.expect, step.Timeout(v.regionSize)) } diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 58006b87eeb..643fb3c7911 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -89,7 +89,7 @@ func (suite *tsoServerTestSuite) TearDownSuite() { func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() { defer func() { if r := recover(); r != nil { - fmt.Println("Recovered from an unexpected panic", r) + suite.T().Log("Recovered from an unexpected panic", r) suite.T().Errorf("Expected no panic, but something bad occurred with") } }() diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 91d6723c2ac..badccd9becc 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -832,7 +832,6 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *tests.TestCluster) { LastHeartbeat: time.Now().UnixNano(), } tests.MustPutStore(re, cluster, store) - defer cluster.Destroy() output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "server") re.NoError(err) @@ -844,7 +843,9 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *tests.TestCluster) { re.Equal("table", conf.KeyType) re.Equal(typeutil.StringSlice([]string{}), conf.RuntimeServices) re.Equal("", conf.MetricStorage) - re.Equal("auto", conf.DashboardAddress) + if conf.DashboardAddress != "auto" { // dashboard has been assigned + re.Equal(leaderServer.GetAddr(), conf.DashboardAddress) + } re.Equal(int(3), conf.FlowRoundByDigit) } diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 9176a00e66d..ffdf56b6567 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -1097,7 +1097,6 @@ func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCl var label labeler.LabelRule escapedID := url.PathEscape("keyspaces/0") u = fmt.Sprintf("%s/config/region-label/rule/%s", urlPrefix, escapedID) - fmt.Println("u====", u) err = tu.ReadGetJSON(re, testDialClient, u, &label) suite.NoError(err) suite.Equal(label.ID, "keyspaces/0") diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a5861f1ba43..18a82bcf0fe 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1288,7 +1288,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { re.NoError(err) tc.WaitLeader() // start - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc) @@ -1327,7 +1327,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { tc.ResignLeader() rc.Stop() tc.WaitLeader() - leaderServer = tc.GetServer(tc.GetLeader()) + leaderServer = tc.GetLeaderServer() rc1 := leaderServer.GetServer().GetRaftCluster() rc1.Start(leaderServer.GetServer()) re.NoError(err) @@ -1347,7 +1347,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { tc.ResignLeader() rc1.Stop() tc.WaitLeader() - leaderServer = tc.GetServer(tc.GetLeader()) + leaderServer = tc.GetLeaderServer() rc = leaderServer.GetServer().GetRaftCluster() rc.Start(leaderServer.GetServer()) re.NotNil(rc) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 5965f9e22a6..e6657ffd223 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -260,7 +260,7 @@ func TestPDLeaderLostWhileEtcdLeaderIntact(t *testing.T) { re.NoError(err) leader1 := cluster.WaitLeader() - memberID := cluster.GetServer(leader1).GetLeader().GetMemberId() + memberID := cluster.GetLeaderServer().GetLeader().GetMemberId() re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID))) re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID))) @@ -338,7 +338,7 @@ func TestCampaignLeaderFrequently(t *testing.T) { re.NotEmpty(cluster.GetLeader()) for i := 0; i < 3; i++ { - cluster.GetServers()[cluster.GetLeader()].ResetPDLeader() + cluster.GetLeaderServer().ResetPDLeader() cluster.WaitLeader() } // leader should be changed when campaign leader frequently diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index b73d4abb9b5..6521432c0dc 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -259,7 +259,7 @@ func TestPrepareCheckerWithTransferLeader(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc)