diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 403b676e691..2e4b8311583 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -6,7 +6,9 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" @@ -18,32 +20,41 @@ import ( // Cluster is used to manage all information for scheduling purpose. type Cluster struct { *core.BasicCluster + persistConfig *config.PersistConfig ruleManager *placement.RuleManager labelerManager *labeler.RegionLabeler - persistConfig *config.PersistConfig + regionStats *statistics.RegionStatistics hotStat *statistics.HotStat storage storage.Storage + coordinator *schedule.Coordinator } const regionLabelGCInterval = time.Hour // NewCluster creates a new cluster. -func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config) (*Cluster, error) { - basicCluster := core.NewBasicCluster() +func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams) (*Cluster, error) { persistConfig := config.NewPersistConfig(cfg) labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) if err != nil { return nil, err } - - return &Cluster{ + ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) + c := &Cluster{ BasicCluster: basicCluster, - ruleManager: placement.NewRuleManager(storage, basicCluster, persistConfig), + ruleManager: ruleManager, labelerManager: labelerManager, persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx), + regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), storage: storage, - }, nil + } + c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + return c, nil +} + +// GetCoordinator returns the coordinator +func (c *Cluster) GetCoordinator() *schedule.Coordinator { + return c.coordinator } // GetBasicCluster returns the basic cluster. diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 991d513e9b1..5b15ae317d2 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/sysutil" "github.com/soheilhy/cmux" "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" @@ -113,8 +114,6 @@ type Server struct { hbStreams *hbstream.HeartbeatStreams storage *endpoint.StorageEndpoint - coordinator *schedule.Coordinator - // for watching the PD API server meta info updates that are related to the scheduling. configWatcher *config.Watcher ruleWatcher *rule.Watcher @@ -260,6 +259,7 @@ func (s *Server) Close() { s.stopHTTPServer() s.stopGRPCServer() s.muxListener.Close() + s.GetCoordinator().Stop() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -328,9 +328,14 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig } +// GetCluster returns the cluster. +func (s *Server) GetCluster() *Cluster { + return s.cluster +} + // GetCoordinator returns the coordinator. func (s *Server) GetCoordinator() *schedule.Coordinator { - return s.coordinator + return s.GetCluster().GetCoordinator() } func (s *Server) initClient() error { @@ -495,12 +500,12 @@ func (s *Server) startServer() (err error) { utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) s.storage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil) - s.cluster, err = NewCluster(s.ctx, s.storage, s.cfg) + basicCluster := core.NewBasicCluster() + s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, basicCluster) + s.cluster, err = NewCluster(s.ctx, s.cfg, s.storage, basicCluster, s.hbStreams) if err != nil { return err } - s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, s.cluster.GetBasicCluster()) - s.coordinator = schedule.NewCoordinator(s.ctx, s.cluster, s.hbStreams) s.listenURL, err = url.Parse(s.cfg.ListenAddr) if err != nil { @@ -524,7 +529,8 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - go s.coordinator.RunUntilStop() + + go s.GetCoordinator().RunUntilStop() serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1)