From 9845c12d2a40ef3e28e0ddf9c803f33994102f81 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Nov 2023 11:10:11 +0800 Subject: [PATCH] mcs: dynamic enable scheduling jobs (#7325) ref tikv/pd#5839, close tikv/pd#7375 Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 9 +- pkg/mcs/scheduling/server/server.go | 1 + pkg/schedule/coordinator.go | 6 +- .../schedulers/scheduler_controller.go | 11 +- server/api/middleware.go | 4 +- server/cluster/cluster.go | 103 +++++++------- server/cluster/cluster_test.go | 10 +- server/cluster/cluster_worker.go | 3 + server/cluster/scheduling_controller.go | 127 ++++++++++++++---- .../mcs/scheduling/config_test.go | 3 +- .../mcs/scheduling/server_test.go | 38 ++++++ tests/pdctl/hot/hot_test.go | 1 + tests/pdctl/keyspace/keyspace_group_test.go | 3 +- tests/server/api/operator_test.go | 46 +++---- tests/testutil.go | 3 + 15 files changed, 249 insertions(+), 119 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 028c2a12b37..ac15212553b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -502,8 +502,8 @@ func (c *Cluster) collectClusterMetrics() { func (c *Cluster) resetMetrics() { statistics.Reset() - c.coordinator.GetSchedulersController().ResetSchedulerMetrics() - c.coordinator.ResetHotSpotMetrics() + schedulers.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() c.resetClusterMetrics() } @@ -538,6 +538,11 @@ func (c *Cluster) StopBackgroundJobs() { c.wg.Wait() } +// IsBackgroundJobsRunning returns whether the background jobs are running. Only for test purpose. +func (c *Cluster) IsBackgroundJobsRunning() bool { + return c.running.Load() +} + // HandleRegionHeartbeat processes RegionInfo reports from client. func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if err := c.processRegionHeartbeat(region); err != nil { diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 32b241fee91..c5b73dea5fc 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -405,6 +405,7 @@ func (s *Server) startServer() (err error) { // different service modes provided by the same pd-server binary serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) + s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 8fb9ec8b286..6a02e68811d 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -88,8 +88,8 @@ type Coordinator struct { } // NewCoordinator creates a new Coordinator. -func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator { - ctx, cancel := context.WithCancel(ctx) +func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator { + ctx, cancel := context.WithCancel(parentCtx) opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams) schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController) checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController) @@ -714,7 +714,7 @@ func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, t } // ResetHotSpotMetrics resets hot spot metrics. -func (c *Coordinator) ResetHotSpotMetrics() { +func ResetHotSpotMetrics() { hotSpotStatusGauge.Reset() schedulers.HotPendingSum.Reset() } diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 79c8cbfbc92..5097a5f3f1c 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -38,8 +38,6 @@ const maxScheduleRetries = 10 var ( denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") - rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count") - groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count") ) // Controller is used to manage all schedulers. @@ -128,8 +126,8 @@ func (c *Controller) CollectSchedulerMetrics() { } ruleCnt := ruleMgr.GetRulesCount() groupCnt := ruleMgr.GetGroupsCount() - rulesCntStatusGauge.Set(float64(ruleCnt)) - groupsCntStatusGauge.Set(float64(groupCnt)) + ruleStatusGauge.WithLabelValues("rule_count").Set(float64(ruleCnt)) + ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt)) } func (c *Controller) isSchedulingHalted() bool { @@ -137,12 +135,9 @@ func (c *Controller) isSchedulingHalted() bool { } // ResetSchedulerMetrics resets metrics of all schedulers. -func (c *Controller) ResetSchedulerMetrics() { +func ResetSchedulerMetrics() { schedulerStatusGauge.Reset() ruleStatusGauge.Reset() - // create in map again - rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count") - groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count") } // AddSchedulerHandler adds the HTTP handler for a scheduler. diff --git a/server/api/middleware.go b/server/api/middleware.go index cfeb0844dcf..627d7fecc92 100644 --- a/server/api/middleware.go +++ b/server/api/middleware.go @@ -114,7 +114,7 @@ func newAuditMiddleware(s *server.Server) negroni.Handler { return &auditMiddleware{svr: s} } -// ServeHTTP is used to implememt negroni.Handler for auditMiddleware +// ServeHTTP is used to implement negroni.Handler for auditMiddleware func (s *auditMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { if !s.svr.GetServiceMiddlewarePersistOptions().IsAuditEnabled() { next(w, r) @@ -164,7 +164,7 @@ func newRateLimitMiddleware(s *server.Server) negroni.Handler { return &rateLimitMiddleware{svr: s} } -// ServeHTTP is used to implememt negroni.Handler for rateLimitMiddleware +// ServeHTTP is used to implement negroni.Handler for rateLimitMiddleware func (s *rateLimitMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { if !s.svr.GetServiceMiddlewarePersistOptions().IsRateLimitEnabled() { next(w, r) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3b50ae16d9b..0df543c96c2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -41,15 +41,14 @@ import ( "github.com/tikv/pd/pkg/gctuner" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/replication" - "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" - "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" @@ -262,7 +261,7 @@ func (c *RaftCluster) InitCluster( storage storage.Storage, basicCluster *core.BasicCluster, hbstreams *hbstream.HeartbeatStreams, - keyspaceGroupManager *keyspace.GroupManager) { + keyspaceGroupManager *keyspace.GroupManager) error { c.core, c.opt, c.storage, c.id = basicCluster, opt.(*config.PersistOptions), storage, id c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.progressManager = progress.NewManager() @@ -271,7 +270,15 @@ func (c *RaftCluster) InitCluster( c.unsafeRecoveryController = unsaferecovery.NewController(c) c.keyspaceGroupManager = keyspaceGroupManager c.hbstreams = hbstreams - c.schedulingController = newSchedulingController(c.ctx) + c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) + if c.opt.IsPlacementRulesEnabled() { + err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) + if err != nil { + return err + } + } + c.schedulingController = newSchedulingController(c.ctx, c.core, c.opt, c.ruleManager) + return nil } // Start starts a cluster. @@ -285,7 +292,10 @@ func (c *RaftCluster) Start(s Server) error { } c.isAPIServiceMode = s.IsAPIServiceMode() - c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) + err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) + if err != nil { + return err + } cluster, err := c.LoadClusterInfo() if err != nil { return err @@ -294,24 +304,21 @@ func (c *RaftCluster) Start(s Server) error { return nil } - c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) - if c.opt.IsPlacementRulesEnabled() { - err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) - if err != nil { - return err - } - } c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval) if err != nil { return err } + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { + for _, store := range c.GetStores() { + storeID := store.GetID() + c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) + } + } c.replicationMode, err = replication.NewReplicationModeManager(s.GetConfig().ReplicationMode, c.storage, cluster, s) if err != nil { return err } - - c.schedulingController.init(c.core, c.opt, schedule.NewCoordinator(c.ctx, c, c.GetHeartbeatStreams()), c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.externalTS, err = c.storage.LoadExternalTS() if err != nil { @@ -326,6 +333,7 @@ func (c *RaftCluster) Start(s Server) error { return err } } + c.checkServices() c.wg.Add(9) go c.runServiceCheckJob() go c.runMetricsCollectionJob() @@ -341,25 +349,39 @@ func (c *RaftCluster) Start(s Server) error { return nil } -func (c *RaftCluster) runServiceCheckJob() { - defer logutil.LogPanic() - defer c.wg.Done() - - var once sync.Once +var once sync.Once - checkFn := func() { - if c.isAPIServiceMode { - once.Do(c.initSchedulers) - c.independentServices.Store(mcsutils.SchedulingServiceName, true) - return - } - if c.startSchedulingJobs() { +func (c *RaftCluster) checkServices() { + if c.isAPIServiceMode { + servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName) + if err != nil || len(servers) == 0 { + c.startSchedulingJobs(c, c.hbstreams) c.independentServices.Delete(mcsutils.SchedulingServiceName) + } else { + if c.stopSchedulingJobs() { + c.initCoordinator(c.ctx, c, c.hbstreams) + } else { + once.Do(func() { + c.initCoordinator(c.ctx, c, c.hbstreams) + }) + } + c.independentServices.Store(mcsutils.SchedulingServiceName, true) } + } else { + c.startSchedulingJobs(c, c.hbstreams) + c.independentServices.Delete(mcsutils.SchedulingServiceName) } - checkFn() +} + +func (c *RaftCluster) runServiceCheckJob() { + defer logutil.LogPanic() + defer c.wg.Done() ticker := time.NewTicker(serviceCheckInterval) + failpoint.Inject("highFrequencyClusterJobs", func() { + ticker.Stop() + ticker = time.NewTicker(time.Millisecond * 10) + }) defer ticker.Stop() for { @@ -368,7 +390,7 @@ func (c *RaftCluster) runServiceCheckJob() { log.Info("service check job is stopped") return case <-ticker.C: - checkFn() + c.checkServices() } } } @@ -621,12 +643,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { zap.Int("count", c.core.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), ) - if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - for _, store := range c.GetStores() { - storeID := store.GetID() - c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) - } - } + return c, nil } @@ -724,7 +741,7 @@ func (c *RaftCluster) Stop() { c.Unlock() c.wg.Wait() - log.Info("raftcluster is stopped") + log.Info("raft cluster is stopped") } // IsRunning return if the cluster is running. @@ -749,16 +766,6 @@ func (c *RaftCluster) GetHeartbeatStreams() *hbstream.HeartbeatStreams { return c.hbstreams } -// GetCoordinator returns the coordinator. -func (c *RaftCluster) GetCoordinator() *schedule.Coordinator { - return c.coordinator -} - -// GetOperatorController returns the operator controller. -func (c *RaftCluster) GetOperatorController() *operator.Controller { - return c.coordinator.GetOperatorController() -} - // AllocID returns a global unique ID. func (c *RaftCluster) AllocID() (uint64, error) { return c.id.Alloc() @@ -997,7 +1004,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew { + if !saveKV && !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1028,9 +1035,11 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionUpdateCacheEventCounter.Inc() } + isPrepared := true if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) + isPrepared = c.IsPrepared() } + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, isPrepared) if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index d424ea98e7b..0e34ba4c743 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2138,7 +2138,7 @@ func newTestRaftCluster( panic(err) } } - rc.schedulingController.init(basicCluster, opt, nil, rc.ruleManager) + rc.schedulingController = newSchedulingController(rc.ctx, rc.core, rc.opt, rc.ruleManager) return rc } @@ -2505,8 +2505,8 @@ func TestCollectMetricsConcurrent(t *testing.T) { controller.CollectSchedulerMetrics() co.GetCluster().(*RaftCluster).collectStatisticsMetrics() } - co.ResetHotSpotMetrics() - controller.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() + schedulers.ResetSchedulerMetrics() co.GetCluster().(*RaftCluster).resetStatisticsMetrics() wg.Wait() } @@ -2551,8 +2551,8 @@ func TestCollectMetrics(t *testing.T) { s.Stats = nil } re.Equal(status1, status2) - co.ResetHotSpotMetrics() - controller.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() + schedulers.ResetSchedulerMetrics() co.GetCluster().(*RaftCluster).resetStatisticsMetrics() } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 3a319c48196..74a445ad78e 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -38,6 +38,9 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return err } + if c.IsServiceIndependent(mcsutils.SchedulingServiceName) { + return nil + } c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) return nil } diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 1c41c830cf6..bb6470252b0 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -25,6 +25,10 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/checker" + sc "github.com/tikv/pd/pkg/schedule/config" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" @@ -33,9 +37,9 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/logutil" - "github.com/tikv/pd/server/config" ) +// schedulingController is used to manage all schedulers and checkers. type schedulingController struct { parentCtx context.Context ctx context.Context @@ -43,7 +47,7 @@ type schedulingController struct { mu sync.RWMutex wg sync.WaitGroup *core.BasicCluster - opt *config.PersistOptions + opt sc.ConfProvider coordinator *schedule.Coordinator labelStats *statistics.LabelStatistics regionStats *statistics.RegionStatistics @@ -52,25 +56,22 @@ type schedulingController struct { running bool } -func newSchedulingController(parentCtx context.Context) *schedulingController { +// newSchedulingController creates a new scheduling controller. +func newSchedulingController(parentCtx context.Context, basicCluster *core.BasicCluster, opt sc.ConfProvider, ruleManager *placement.RuleManager) *schedulingController { ctx, cancel := context.WithCancel(parentCtx) return &schedulingController{ - parentCtx: parentCtx, - ctx: ctx, - cancel: cancel, - labelStats: statistics.NewLabelStatistics(), - hotStat: statistics.NewHotStat(parentCtx), - slowStat: statistics.NewSlowStat(parentCtx), + parentCtx: parentCtx, + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + opt: opt, + labelStats: statistics.NewLabelStatistics(), + hotStat: statistics.NewHotStat(parentCtx), + slowStat: statistics.NewSlowStat(parentCtx), + regionStats: statistics.NewRegionStatistics(basicCluster, opt, ruleManager), } } -func (sc *schedulingController) init(basicCluster *core.BasicCluster, opt *config.PersistOptions, coordinator *schedule.Coordinator, ruleManager *placement.RuleManager) { - sc.BasicCluster = basicCluster - sc.opt = opt - sc.coordinator = coordinator - sc.regionStats = statistics.NewRegionStatistics(basicCluster, opt, ruleManager) -} - func (sc *schedulingController) stopSchedulingJobs() bool { sc.mu.Lock() defer sc.mu.Unlock() @@ -85,20 +86,31 @@ func (sc *schedulingController) stopSchedulingJobs() bool { return true } -func (sc *schedulingController) startSchedulingJobs() bool { +func (sc *schedulingController) startSchedulingJobs(cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { sc.mu.Lock() defer sc.mu.Unlock() if sc.running { - return false + return } - sc.ctx, sc.cancel = context.WithCancel(sc.parentCtx) + sc.initCoordinatorLocked(sc.parentCtx, cluster, hbstreams) sc.wg.Add(3) go sc.runCoordinator() go sc.runStatsBackgroundJobs() go sc.runSchedulingMetricsCollectionJob() sc.running = true log.Info("scheduling service is started") - return true +} + +func (sc *schedulingController) initCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.initCoordinatorLocked(ctx, cluster, hbstreams) + sc.coordinator.InitSchedulers(false) +} + +func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { + sc.ctx, sc.cancel = context.WithCancel(ctx) + sc.coordinator = schedule.NewCoordinator(sc.ctx, cluster, hbstreams) } // runCoordinator runs the main scheduling loop. @@ -156,8 +168,8 @@ func (sc *schedulingController) runSchedulingMetricsCollectionJob() { func (sc *schedulingController) resetSchedulingMetrics() { statistics.Reset() - sc.coordinator.GetSchedulersController().ResetSchedulerMetrics() - sc.coordinator.ResetHotSpotMetrics() + schedulers.ResetSchedulerMetrics() + schedule.ResetHotSpotMetrics() sc.resetStatisticsMetrics() } @@ -287,88 +299,136 @@ func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) ma return sc.hotStat.BucketsStats(degree, regionIDs...) } +// GetCoordinator returns the coordinator. +func (sc *schedulingController) GetCoordinator() *schedule.Coordinator { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator +} + // GetPausedSchedulerDelayAt returns DelayAt of a paused scheduler func (sc *schedulingController) GetPausedSchedulerDelayAt(name string) (int64, error) { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayAt(name) } // GetPausedSchedulerDelayUntil returns DelayUntil of a paused scheduler func (sc *schedulingController) GetPausedSchedulerDelayUntil(name string) (int64, error) { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayUntil(name) } +// GetOperatorController returns the operator controller. +func (sc *schedulingController) GetOperatorController() *operator.Controller { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.coordinator.GetOperatorController() +} + // GetRegionScatterer returns the region scatter. func (sc *schedulingController) GetRegionScatterer() *scatter.RegionScatterer { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetRegionScatterer() } // GetRegionSplitter returns the region splitter func (sc *schedulingController) GetRegionSplitter() *splitter.RegionSplitter { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetRegionSplitter() } // GetMergeChecker returns merge checker. func (sc *schedulingController) GetMergeChecker() *checker.MergeChecker { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetMergeChecker() } // GetRuleChecker returns rule checker. func (sc *schedulingController) GetRuleChecker() *checker.RuleChecker { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetRuleChecker() } // GetSchedulers gets all schedulers. func (sc *schedulingController) GetSchedulers() []string { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().GetSchedulerNames() } // GetSchedulerHandlers gets all scheduler handlers. func (sc *schedulingController) GetSchedulerHandlers() map[string]http.Handler { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().GetSchedulerHandlers() } // AddSchedulerHandler adds a scheduler handler. func (sc *schedulingController) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) } // RemoveSchedulerHandler removes a scheduler handler. func (sc *schedulingController) RemoveSchedulerHandler(name string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().RemoveSchedulerHandler(name) } // AddScheduler adds a scheduler. func (sc *schedulingController) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) } // RemoveScheduler removes a scheduler. func (sc *schedulingController) RemoveScheduler(name string) error { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().RemoveScheduler(name) } // PauseOrResumeScheduler pauses or resumes a scheduler. func (sc *schedulingController) PauseOrResumeScheduler(name string, t int64) error { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetSchedulersController().PauseOrResumeScheduler(name, t) } // PauseOrResumeChecker pauses or resumes checker. func (sc *schedulingController) PauseOrResumeChecker(name string, t int64) error { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.PauseOrResumeChecker(name, t) } // AddSuspectRegions adds regions to suspect list. func (sc *schedulingController) AddSuspectRegions(regionIDs ...uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() sc.coordinator.GetCheckerController().AddSuspectRegions(regionIDs...) } // GetSuspectRegions gets all suspect regions. func (sc *schedulingController) GetSuspectRegions() []uint64 { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetCheckerController().GetSuspectRegions() } // RemoveSuspectRegion removes region from suspect list. func (sc *schedulingController) RemoveSuspectRegion(id uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() sc.coordinator.GetCheckerController().RemoveSuspectRegion(id) } @@ -376,11 +436,15 @@ func (sc *schedulingController) RemoveSuspectRegion(id uint64) { // it would return value and true if pop success, or return empty [][2][]byte and false // if suspectKeyRanges couldn't pop keyRange group. func (sc *schedulingController) PopOneSuspectKeyRange() ([2][]byte, bool) { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetCheckerController().PopOneSuspectKeyRange() } // ClearSuspectKeyRanges clears the suspect keyRanges, only for unit test func (sc *schedulingController) ClearSuspectKeyRanges() { + sc.mu.RLock() + defer sc.mu.RUnlock() sc.coordinator.GetCheckerController().ClearSuspectKeyRanges() } @@ -388,14 +452,14 @@ func (sc *schedulingController) ClearSuspectKeyRanges() { // The instance of each keyRange is like following format: // [2][]byte: start key/end key func (sc *schedulingController) AddSuspectKeyRange(start, end []byte) { + sc.mu.RLock() + defer sc.mu.RUnlock() sc.coordinator.GetCheckerController().AddSuspectKeyRange(start, end) } -func (sc *schedulingController) initSchedulers() { - sc.coordinator.InitSchedulers(false) -} - func (sc *schedulingController) getEvictLeaderStores() (evictStores []uint64) { + sc.mu.RLock() + defer sc.mu.RUnlock() if sc.coordinator == nil { return nil } @@ -415,10 +479,21 @@ func (sc *schedulingController) getEvictLeaderStores() (evictStores []uint64) { // IsPrepared return true if the prepare checker is ready. func (sc *schedulingController) IsPrepared() bool { + sc.mu.RLock() + defer sc.mu.RUnlock() return sc.coordinator.GetPrepareChecker().IsPrepared() } // SetPrepared set the prepare check to prepared. Only for test purpose. func (sc *schedulingController) SetPrepared() { + sc.mu.RLock() + defer sc.mu.RUnlock() sc.coordinator.GetPrepareChecker().SetPrepared() } + +// IsSchedulingControllerRunning returns whether the scheduling controller is running. Only for test purpose. +func (sc *schedulingController) IsSchedulingControllerRunning() bool { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.running +} diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 06d73caf130..ccf7cdaf48c 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -149,8 +149,9 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { ) re.NoError(err) // Get all default scheduler names. - var namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() + var namesFromAPIServer []string testutil.Eventually(re, func() bool { + namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() return len(namesFromAPIServer) == len(sc.DefaultSchedulers) }) // Check all default schedulers' configs. diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index a359e1d023a..41c00b8e9b4 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -195,6 +195,44 @@ func (suite *serverTestSuite) TestForwardStoreHeartbeat() { }) } +func (suite *serverTestSuite) TestDynamicSwitch() { + re := suite.Require() + // API server will execute scheduling jobs since there is no scheduler server. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs. + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) + tc.GetPrimaryServer().Close() + // Stop scheduling server. API server will execute scheduling jobs again. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc1.Destroy() + tc1.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs again. + testutil.Eventually(re, func() bool { + return tc1.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) +} + func (suite *serverTestSuite) TestSchedulerSync() { re := suite.Require() tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index ac9bb3d83bf..8cab8ea9ab2 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -349,6 +349,7 @@ func (suite *hotTestSuite) checkHotWithoutHotPeer(cluster *tests.TestCluster) { hotRegion := statistics.StoreHotPeersInfos{} re.NoError(err) re.NoError(json.Unmarshal(output, &hotRegion)) + re.NotNil(hotRegion.AsPeer[1]) re.Equal(hotRegion.AsPeer[1].Count, 0) re.Equal(0.0, hotRegion.AsPeer[1].TotalBytesRate) re.Equal(load, hotRegion.AsPeer[1].StoreByteRate) diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 0b09550d967..cbfdf1d099a 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -503,7 +503,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { for i := 0; i < 10; i++ { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -528,7 +528,6 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { args := []string{"-u", pdAddr, "keyspace-group"} output, err := pdctl.ExecuteCommand(cmd, append(args, defaultKeyspaceGroupID)...) re.NoError(err) - err = json.Unmarshal(output, &keyspaceGroup) re.NoError(err) re.Equal(utils.DefaultKeyspaceGroupID, keyspaceGroup.ID) diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index e36ead7e44d..14b8618f6a6 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -27,7 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" - pdoperator "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" @@ -285,10 +285,10 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3]}`), expectedError: nil, expectSteps: convertStepsToStr([]string{ - pdoperator.AddLearner{ToStore: 3, PeerID: 1}.String(), - pdoperator.PromoteLearner{ToStore: 3, PeerID: 1}.String(), - pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), - pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), + operator.AddLearner{ToStore: 3, PeerID: 1}.String(), + operator.PromoteLearner{ToStore: 3, PeerID: 1}.String(), + operator.TransferLeader{FromStore: 1, ToStore: 2}.String(), + operator.RemovePeer{FromStore: 1, PeerID: 1}.String(), }), }, { @@ -297,11 +297,11 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`), expectedError: nil, expectSteps: convertStepsToStr([]string{ - pdoperator.AddLearner{ToStore: 3, PeerID: 2}.String(), - pdoperator.PromoteLearner{ToStore: 3, PeerID: 2}.String(), - pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), - pdoperator.RemovePeer{FromStore: 1, PeerID: 2}.String(), - pdoperator.TransferLeader{FromStore: 2, ToStore: 3}.String(), + operator.AddLearner{ToStore: 3, PeerID: 2}.String(), + operator.PromoteLearner{ToStore: 3, PeerID: 2}.String(), + operator.TransferLeader{FromStore: 1, ToStore: 2}.String(), + operator.RemovePeer{FromStore: 1, PeerID: 2}.String(), + operator.TransferLeader{FromStore: 2, ToStore: 3}.String(), }), }, { @@ -316,11 +316,11 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te placementRuleEnable: true, input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`), expectSteps: convertStepsToStr([]string{ - pdoperator.AddLearner{ToStore: 3, PeerID: 3}.String(), - pdoperator.PromoteLearner{ToStore: 3, PeerID: 3}.String(), - pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), - pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), - pdoperator.TransferLeader{FromStore: 2, ToStore: 3}.String(), + operator.AddLearner{ToStore: 3, PeerID: 3}.String(), + operator.PromoteLearner{ToStore: 3, PeerID: 3}.String(), + operator.TransferLeader{FromStore: 1, ToStore: 2}.String(), + operator.RemovePeer{FromStore: 1, PeerID: 1}.String(), + operator.TransferLeader{FromStore: 2, ToStore: 3}.String(), }), }, { @@ -377,10 +377,10 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["follower", "leader"]}`), expectedError: nil, expectSteps: convertStepsToStr([]string{ - pdoperator.AddLearner{ToStore: 3, PeerID: 5}.String(), - pdoperator.PromoteLearner{ToStore: 3, PeerID: 5}.String(), - pdoperator.TransferLeader{FromStore: 1, ToStore: 3}.String(), - pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), + operator.AddLearner{ToStore: 3, PeerID: 5}.String(), + operator.PromoteLearner{ToStore: 3, PeerID: 5}.String(), + operator.TransferLeader{FromStore: 1, ToStore: 3}.String(), + operator.RemovePeer{FromStore: 1, PeerID: 1}.String(), }), }, { @@ -417,10 +417,10 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te input: []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [2, 3], "peer_roles":["leader", "follower"]}`), expectedError: nil, expectSteps: convertStepsToStr([]string{ - pdoperator.AddLearner{ToStore: 3, PeerID: 6}.String(), - pdoperator.PromoteLearner{ToStore: 3, PeerID: 6}.String(), - pdoperator.TransferLeader{FromStore: 1, ToStore: 2}.String(), - pdoperator.RemovePeer{FromStore: 1, PeerID: 1}.String(), + operator.AddLearner{ToStore: 3, PeerID: 6}.String(), + operator.PromoteLearner{ToStore: 3, PeerID: 6}.String(), + operator.TransferLeader{FromStore: 1, ToStore: 2}.String(), + operator.RemovePeer{FromStore: 1, PeerID: 1}.String(), }), }, } diff --git a/tests/testutil.go b/tests/testutil.go index 059a152f06f..2ccf6fb76be 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -299,6 +299,7 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) { leaderServer := s.cluster.GetServer(s.cluster.GetLeader()) re.NoError(leaderServer.BootstrapCluster()) case apiMode: + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) s.cluster, err = NewTestAPICluster(s.ctx, 1, s.opts...) re.NoError(err) err = s.cluster.RunInitialServers() @@ -306,11 +307,13 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) { re.NotEmpty(s.cluster.WaitLeader()) leaderServer := s.cluster.GetServer(s.cluster.GetLeader()) re.NoError(leaderServer.BootstrapCluster()) + leaderServer.GetRaftCluster().SetPrepared() // start scheduling cluster tc, err := NewTestSchedulingCluster(s.ctx, 1, leaderServer.GetAddr()) re.NoError(err) tc.WaitForPrimaryServing(re) s.cluster.SetSchedulingCluster(tc) time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } }