Skip to content

Commit

Permalink
mcs: add region stats to scheduling cluster (#6985)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Aug 28, 2023
1 parent 50368e5 commit 6307aa8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
25 changes: 18 additions & 7 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
Expand All @@ -18,32 +20,41 @@ import (
// Cluster is used to manage all information for scheduling purpose.
type Cluster struct {
*core.BasicCluster
persistConfig *config.PersistConfig
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
persistConfig *config.PersistConfig
regionStats *statistics.RegionStatistics
hotStat *statistics.HotStat
storage storage.Storage
coordinator *schedule.Coordinator
}

const regionLabelGCInterval = time.Hour

// NewCluster creates a new cluster.
func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config) (*Cluster, error) {
basicCluster := core.NewBasicCluster()
func NewCluster(ctx context.Context, cfg *config.Config, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams) (*Cluster, error) {
persistConfig := config.NewPersistConfig(cfg)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval)
if err != nil {
return nil, err
}

return &Cluster{
ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig)
c := &Cluster{
BasicCluster: basicCluster,
ruleManager: placement.NewRuleManager(storage, basicCluster, persistConfig),
ruleManager: ruleManager,
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager),
storage: storage,
}, nil
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
return c, nil
}

// GetCoordinator returns the coordinator
func (c *Cluster) GetCoordinator() *schedule.Coordinator {
return c.coordinator
}

// GetBasicCluster returns the basic cluster.
Expand Down
20 changes: 13 additions & 7 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/sysutil"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
Expand Down Expand Up @@ -113,8 +114,6 @@ type Server struct {
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint

coordinator *schedule.Coordinator

// for watching the PD API server meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
Expand Down Expand Up @@ -260,6 +259,7 @@ func (s *Server) Close() {
s.stopHTTPServer()
s.stopGRPCServer()
s.muxListener.Close()
s.GetCoordinator().Stop()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down Expand Up @@ -328,9 +328,14 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetCluster returns the cluster.
func (s *Server) GetCluster() *Cluster {
return s.cluster
}

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.coordinator
return s.GetCluster().GetCoordinator()
}

func (s *Server) initClient() error {
Expand Down Expand Up @@ -495,12 +500,12 @@ func (s *Server) startServer() (err error) {
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil)
s.cluster, err = NewCluster(s.ctx, s.storage, s.cfg)
basicCluster := core.NewBasicCluster()
s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, basicCluster)
s.cluster, err = NewCluster(s.ctx, s.cfg, s.storage, basicCluster, s.hbStreams)
if err != nil {
return err
}
s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, s.cluster.GetBasicCluster())
s.coordinator = schedule.NewCoordinator(s.ctx, s.cluster, s.hbStreams)

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
Expand All @@ -524,7 +529,8 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
go s.coordinator.RunUntilStop()

go s.GetCoordinator().RunUntilStop()
serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
Expand Down

0 comments on commit 6307aa8

Please sign in to comment.