Skip to content

Commit

Permalink
create coordinator with scheduling cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Aug 3, 2023
1 parent 92abfc0 commit f7b184f
Show file tree
Hide file tree
Showing 16 changed files with 394 additions and 85 deletions.
94 changes: 92 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ import (

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
)

// Cluster is used to manage all information for scheduling purpose.
type Cluster struct {
basicCluster *core.BasicCluster
*core.BasicCluster
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
persistConfig *config.PersistConfig
hotStat *statistics.HotStat
}

const regionLabelGCInterval = time.Hour
Expand All @@ -29,10 +35,94 @@ func NewCluster(ctx context.Context, storage endpoint.RuleStorage, cfg *config.C
if err != nil {
return nil, err
}

return &Cluster{
basicCluster: basicCluster,
BasicCluster: basicCluster,
ruleManager: placement.NewRuleManager(storage, basicCluster, persistConfig),
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
}, nil
}

// GetBasicCluster returns the basic cluster.
func (c *Cluster) GetBasicCluster() *core.BasicCluster {
return c.BasicCluster
}

// GetSharedConfig returns the shared config.
func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider {
return c.persistConfig
}

// GetRuleManager returns the rule manager.
func (c *Cluster) GetRuleManager() *placement.RuleManager {
return c.ruleManager
}

// GetRegionLabeler returns the region labeler.
func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler {
return c.labelerManager
}

// GetStoresLoads returns load stats of all stores.
func (c *Cluster) GetStoresLoads() map[uint64][]float64 {
return c.hotStat.GetStoresLoads()
}

// IsRegionHot checks if a region is in hot state.
func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool {
return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold())
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
return c.hotStat.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
// RegionStats is a thread-safe method
func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// As read stats are reported by store heartbeat, the threshold needs to be adjusted.
threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() *
(utils.WriteReportInterval / utils.ReadReportInterval)
return c.hotStat.RegionStats(utils.Read, threshold)
}

// RegionWriteStats returns hot region's write stats.
// The result only includes peers that are hot enough.
func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold())
}

// BucketsStats returns hot region's buckets stats.
func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat {
return c.hotStat.BucketsStats(degree, regionIDs...)
}

// GetPersistOptions returns the persist options.
func (c *Cluster) GetPersistOptions() sc.ConfProvider {
return c.persistConfig
}

// TODO: implement the following methods

// GetStorage returns the storage.
func (c *Cluster) GetStorage() storage.Storage { return nil }

// UpdateRegionsLabelLevelStats updates the region label level stats.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

// GetStoreConfig returns the store config.
func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return nil }

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) { return 0, nil }

// GetCheckerConfig returns the checker config.
func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return nil }

// GetSchedulerConfig returns the scheduler config.
func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return nil }
Loading

0 comments on commit f7b184f

Please sign in to comment.