Skip to content

Commit

Permalink
mcs: dynamic enable scheduling jobs (#7325)
Browse files Browse the repository at this point in the history
ref #5839, close #7375

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Nov 20, 2023
1 parent dda748a commit 9845c12
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 119 deletions.
9 changes: 7 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ func (c *Cluster) collectClusterMetrics() {
func (c *Cluster) resetMetrics() {
statistics.Reset()

c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}

Expand Down Expand Up @@ -538,6 +538,11 @@ func (c *Cluster) StopBackgroundJobs() {
c.wg.Wait()
}

// IsBackgroundJobsRunning returns whether the background jobs are running. Only for test purpose.
func (c *Cluster) IsBackgroundJobsRunning() bool {
return c.running.Load()
}

// HandleRegionHeartbeat processes RegionInfo reports from client.
func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if err := c.processRegionHeartbeat(region); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func (s *Server) startServer() (err error) {
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
uniqueName := s.cfg.GetAdvertiseListenAddr()
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ type Coordinator struct {
}

// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(parentCtx)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController)
Expand Down Expand Up @@ -714,7 +714,7 @@ func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, t
}

// ResetHotSpotMetrics resets hot spot metrics.
func (c *Coordinator) ResetHotSpotMetrics() {
func ResetHotSpotMetrics() {
hotSpotStatusGauge.Reset()
schedulers.HotPendingSum.Reset()
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const maxScheduleRetries = 10

var (
denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny")
rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count")
groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count")
)

// Controller is used to manage all schedulers.
Expand Down Expand Up @@ -128,21 +126,18 @@ func (c *Controller) CollectSchedulerMetrics() {
}
ruleCnt := ruleMgr.GetRulesCount()
groupCnt := ruleMgr.GetGroupsCount()
rulesCntStatusGauge.Set(float64(ruleCnt))
groupsCntStatusGauge.Set(float64(groupCnt))
ruleStatusGauge.WithLabelValues("rule_count").Set(float64(ruleCnt))
ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt))
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func (c *Controller) ResetSchedulerMetrics() {
func ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
ruleStatusGauge.Reset()
// create in map again
rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count")
groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count")
}

// AddSchedulerHandler adds the HTTP handler for a scheduler.
Expand Down
4 changes: 2 additions & 2 deletions server/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newAuditMiddleware(s *server.Server) negroni.Handler {
return &auditMiddleware{svr: s}
}

// ServeHTTP is used to implememt negroni.Handler for auditMiddleware
// ServeHTTP is used to implement negroni.Handler for auditMiddleware
func (s *auditMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
if !s.svr.GetServiceMiddlewarePersistOptions().IsAuditEnabled() {
next(w, r)
Expand Down Expand Up @@ -164,7 +164,7 @@ func newRateLimitMiddleware(s *server.Server) negroni.Handler {
return &rateLimitMiddleware{svr: s}
}

// ServeHTTP is used to implememt negroni.Handler for rateLimitMiddleware
// ServeHTTP is used to implement negroni.Handler for rateLimitMiddleware
func (s *rateLimitMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
if !s.svr.GetServiceMiddlewarePersistOptions().IsRateLimitEnabled() {
next(w, r)
Expand Down
103 changes: 56 additions & 47 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ import (
"github.com/tikv/pd/pkg/gctuner"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/replication"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
Expand Down Expand Up @@ -262,7 +261,7 @@ func (c *RaftCluster) InitCluster(
storage storage.Storage,
basicCluster *core.BasicCluster,
hbstreams *hbstream.HeartbeatStreams,
keyspaceGroupManager *keyspace.GroupManager) {
keyspaceGroupManager *keyspace.GroupManager) error {
c.core, c.opt, c.storage, c.id = basicCluster, opt.(*config.PersistOptions), storage, id
c.ctx, c.cancel = context.WithCancel(c.serverCtx)
c.progressManager = progress.NewManager()
Expand All @@ -271,7 +270,15 @@ func (c *RaftCluster) InitCluster(
c.unsafeRecoveryController = unsaferecovery.NewController(c)
c.keyspaceGroupManager = keyspaceGroupManager
c.hbstreams = hbstreams
c.schedulingController = newSchedulingController(c.ctx)
c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts())
if c.opt.IsPlacementRulesEnabled() {
err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel())
if err != nil {
return err
}
}
c.schedulingController = newSchedulingController(c.ctx, c.core, c.opt, c.ruleManager)
return nil
}

// Start starts a cluster.
Expand All @@ -285,7 +292,10 @@ func (c *RaftCluster) Start(s Server) error {
}

c.isAPIServiceMode = s.IsAPIServiceMode()
c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
}
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand All @@ -294,24 +304,21 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts())
if c.opt.IsPlacementRulesEnabled() {
err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel())
if err != nil {
return err
}
}
c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval)
if err != nil {
return err
}

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
for _, store := range c.GetStores() {
storeID := store.GetID()
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
}
}
c.replicationMode, err = replication.NewReplicationModeManager(s.GetConfig().ReplicationMode, c.storage, cluster, s)
if err != nil {
return err
}

c.schedulingController.init(c.core, c.opt, schedule.NewCoordinator(c.ctx, c, c.GetHeartbeatStreams()), c.ruleManager)
c.limiter = NewStoreLimiter(s.GetPersistOptions())
c.externalTS, err = c.storage.LoadExternalTS()
if err != nil {
Expand All @@ -326,6 +333,7 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
}
c.checkServices()
c.wg.Add(9)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
Expand All @@ -341,25 +349,39 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

func (c *RaftCluster) runServiceCheckJob() {
defer logutil.LogPanic()
defer c.wg.Done()

var once sync.Once
var once sync.Once

checkFn := func() {
if c.isAPIServiceMode {
once.Do(c.initSchedulers)
c.independentServices.Store(mcsutils.SchedulingServiceName, true)
return
}
if c.startSchedulingJobs() {
func (c *RaftCluster) checkServices() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if err != nil || len(servers) == 0 {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() {
c.initCoordinator(c.ctx, c, c.hbstreams)
} else {
once.Do(func() {
c.initCoordinator(c.ctx, c, c.hbstreams)
})
}
c.independentServices.Store(mcsutils.SchedulingServiceName, true)
}
} else {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
}
checkFn()
}

func (c *RaftCluster) runServiceCheckJob() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(serviceCheckInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond * 10)
})
defer ticker.Stop()

for {
Expand All @@ -368,7 +390,7 @@ func (c *RaftCluster) runServiceCheckJob() {
log.Info("service check job is stopped")
return
case <-ticker.C:
checkFn()
c.checkServices()
}
}
}
Expand Down Expand Up @@ -621,12 +643,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
zap.Int("count", c.core.GetTotalRegionCount()),
zap.Duration("cost", time.Since(start)),
)
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
for _, store := range c.GetStores() {
storeID := store.GetID()
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
}
}

return c, nil
}

Expand Down Expand Up @@ -724,7 +741,7 @@ func (c *RaftCluster) Stop() {
c.Unlock()

c.wg.Wait()
log.Info("raftcluster is stopped")
log.Info("raft cluster is stopped")
}

// IsRunning return if the cluster is running.
Expand All @@ -749,16 +766,6 @@ func (c *RaftCluster) GetHeartbeatStreams() *hbstream.HeartbeatStreams {
return c.hbstreams
}

// GetCoordinator returns the coordinator.
func (c *RaftCluster) GetCoordinator() *schedule.Coordinator {
return c.coordinator
}

// GetOperatorController returns the operator controller.
func (c *RaftCluster) GetOperatorController() *operator.Controller {
return c.coordinator.GetOperatorController()
}

// AllocID returns a global unique ID.
func (c *RaftCluster) AllocID() (uint64, error) {
return c.id.Alloc()
Expand Down Expand Up @@ -997,7 +1004,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew {
if !saveKV && !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down Expand Up @@ -1028,9 +1035,11 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionUpdateCacheEventCounter.Inc()
}

isPrepared := true
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
isPrepared = c.IsPrepared()
}
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, isPrepared)

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down
10 changes: 5 additions & 5 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2138,7 +2138,7 @@ func newTestRaftCluster(
panic(err)
}
}
rc.schedulingController.init(basicCluster, opt, nil, rc.ruleManager)
rc.schedulingController = newSchedulingController(rc.ctx, rc.core, rc.opt, rc.ruleManager)
return rc
}

Expand Down Expand Up @@ -2505,8 +2505,8 @@ func TestCollectMetricsConcurrent(t *testing.T) {
controller.CollectSchedulerMetrics()
co.GetCluster().(*RaftCluster).collectStatisticsMetrics()
}
co.ResetHotSpotMetrics()
controller.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetStatisticsMetrics()
wg.Wait()
}
Expand Down Expand Up @@ -2551,8 +2551,8 @@ func TestCollectMetrics(t *testing.T) {
s.Stats = nil
}
re.Equal(status1, status2)
co.ResetHotSpotMetrics()
controller.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetStatisticsMetrics()
}

Expand Down
3 changes: 3 additions & 0 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return err
}

if c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
return nil
}
c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
return nil
}
Expand Down
Loading

0 comments on commit 9845c12

Please sign in to comment.