Skip to content

Commit

Permalink
start coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Aug 10, 2023
1 parent 46d7cc7 commit ca8679b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
4 changes: 3 additions & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ func (s *Server) primaryElectionLoop() {
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
select {
case <-s.ctx.Done():
log.Info("server is closed, exit resource manager primary election loop")
return
default:
}

primary, checkAgain := s.participant.CheckLeader()
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
storage: storage,
}, nil
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ func (s *Server) primaryElectionLoop() {
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
log.Info("server is closed, exit scheduling primary election loop")
select {
case <-s.ctx.Done():
log.Info("server is closed, exit resource manager primary election loop")
return
default:
}

primary, checkAgain := s.participant.CheckLeader()
Expand Down Expand Up @@ -487,13 +489,18 @@ func (s *Server) startServer() (err error) {
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.etcdClient, endpoint.SchedulingSvcRootPath(s.clusterID)), nil)
kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil)
s.cluster, err = NewCluster(s.ctx, s.storage, s.cfg)
if err != nil {
return err
}
s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, s.cluster.GetBasicCluster())
s.coordinator = schedule.NewCoordinator(s.ctx, s.cluster, s.hbStreams)

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand All @@ -511,7 +518,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}

go s.coordinator.RunUntilStop()
serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
Expand Down

0 comments on commit ca8679b

Please sign in to comment.