Skip to content

Commit

Permalink
reorganize cluster start and stop process
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Sep 26, 2023
1 parent eac55a7 commit a63ecc2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
32 changes: 31 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Cluster struct {
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
running atomic.Bool
}

const regionLabelGCInterval = time.Hour
Expand Down Expand Up @@ -215,6 +216,9 @@ func (c *Cluster) updateScheduler() {
// Make sure the check will be triggered once later.
notifier <- struct{}{}
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
Expand All @@ -224,6 +228,18 @@ func (c *Cluster) updateScheduler() {
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
notifier <- struct{}{}
continue
}
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
var (
schedulersController = c.coordinator.GetSchedulersController()
Expand Down Expand Up @@ -394,15 +410,29 @@ func (c *Cluster) runUpdateStoreStats() {
}
}

// runCoordinator runs the main scheduling loop.
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
c.coordinator.RunUntilStop()
}

// StartBackgroundJobs starts background jobs.
func (c *Cluster) StartBackgroundJobs() {
c.wg.Add(2)
c.wg.Add(3)
go c.updateScheduler()
go c.runUpdateStoreStats()
go c.runCoordinator()
c.running.Store(true)
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
if !c.running.Load() {
return
}
c.running.Store(false)
c.coordinator.Stop()
c.cancel()
c.wg.Wait()
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,12 @@ func (s *Server) startCluster(context.Context) error {
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
s.cluster.StartBackgroundJobs()
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.cluster.StopBackgroundJobs()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
Expand All @@ -469,6 +465,12 @@ func (s *Server) startWatcher() (err error) {
return err
}

func (s *Server) stopWatcher() {
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down

0 comments on commit a63ecc2

Please sign in to comment.