Skip to content

Commit

Permalink
mcs: reorganize cluster start and stop process (#7155)
Browse files Browse the repository at this point in the history
close #7106, close #7140

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Oct 9, 2023
1 parent edf1f95 commit 2556b5b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
42 changes: 40 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Cluster struct {
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
running atomic.Bool
}

const regionLabelGCInterval = time.Hour
Expand Down Expand Up @@ -203,6 +204,14 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

func trySend(notifier chan struct{}) {
select {
case notifier <- struct{}{}:
// If the channel is not empty, it means the check is triggered.
default:
}
}

// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) updateScheduler() {
defer logutil.LogPanic()
Expand All @@ -213,8 +222,11 @@ func (c *Cluster) updateScheduler() {
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)
// Make sure the check will be triggered once later.
notifier <- struct{}{}
trySend(notifier)
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
Expand All @@ -224,6 +236,18 @@ func (c *Cluster) updateScheduler() {
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
trySend(notifier)
continue
}
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
var (
schedulersController = c.coordinator.GetSchedulersController()
Expand Down Expand Up @@ -394,15 +418,29 @@ func (c *Cluster) runUpdateStoreStats() {
}
}

// runCoordinator runs the main scheduling loop.
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
c.coordinator.RunUntilStop()
}

// StartBackgroundJobs starts background jobs.
func (c *Cluster) StartBackgroundJobs() {
c.wg.Add(2)
c.wg.Add(3)
go c.updateScheduler()
go c.runUpdateStoreStats()
go c.runCoordinator()
c.running.Store(true)
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
if !c.running.Load() {
return
}
c.running.Store(false)
c.coordinator.Stop()
c.cancel()
c.wg.Wait()
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,16 +456,12 @@ func (s *Server) startCluster(context.Context) error {
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
s.cluster.StartBackgroundJobs()
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.cluster.StopBackgroundJobs()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
Expand All @@ -481,6 +477,12 @@ func (s *Server) startWatcher() (err error) {
return err
}

func (s *Server) stopWatcher() {
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage e

// Wait waits on all schedulers to exit.
func (c *Controller) Wait() {
c.Lock()
defer c.Unlock()
c.wg.Wait()
}

Expand Down

0 comments on commit 2556b5b

Please sign in to comment.