Skip to content

Commit

Permalink
statistics: fix the hot region API cannot work without hot scheduler (#…
Browse files Browse the repository at this point in the history
…4424)

* refacter store load in hot region scheduler

ref #3879

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* resolve conflicts

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* fix the hot region API cannot work when hot scheduler is disabled

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* resolve conflicts

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* clean up

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* make drop region atomically

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* address the comment

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Jan 20, 2022
1 parent 4242300 commit f605a2c
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 373 deletions.
27 changes: 6 additions & 21 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,11 +852,7 @@ func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {

// DropCacheRegion removes a region from the cache.
func (c *RaftCluster) DropCacheRegion(id uint64) {
c.RLock()
defer c.RUnlock()
if region := c.GetRegion(id); region != nil {
c.core.RemoveRegion(region)
}
c.core.RemoveRegionIfExist(id)
}

// GetCacheCluster gets the cached cluster.
Expand All @@ -883,10 +879,7 @@ func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo {

// IsRegionHot checks if a region is in hot state.
func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool {
c.RLock()
hotStat := c.hotStat
c.RUnlock()
return hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
}

// GetAdjacentRegions returns regions' information that are adjacent with the specific region ID.
Expand Down Expand Up @@ -1456,8 +1449,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager {

// GetStoresLoads returns load stats of all stores.
func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 {
c.RLock()
defer c.RUnlock()
return c.hotStat.GetStoresLoads()
}

Expand Down Expand Up @@ -1508,23 +1499,17 @@ func (c *RaftCluster) GetRegionLabeler() *labeler.RegionLabeler {

// GetHotWriteRegions gets hot write regions' info.
func (c *RaftCluster) GetHotWriteRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos {
c.RLock()
co := c.coordinator
c.RUnlock()
hotWriteRegions := co.getHotWriteRegions()
if len(storeIDs) > 0 {
hotWriteRegions := c.coordinator.getHotRegionsByType(statistics.Write)
if len(storeIDs) > 0 && hotWriteRegions != nil {
hotWriteRegions = getHotRegionsByStoreIDs(hotWriteRegions, storeIDs...)
}
return hotWriteRegions
}

// GetHotReadRegions gets hot read regions' info.
func (c *RaftCluster) GetHotReadRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos {
c.RLock()
co := c.coordinator
c.RUnlock()
hotReadRegions := co.getHotReadRegions()
if len(storeIDs) > 0 {
hotReadRegions := c.coordinator.getHotRegionsByType(statistics.Read)
if len(storeIDs) > 0 && hotReadRegions != nil {
hotReadRegions = getHotRegionsByStoreIDs(hotReadRegions, storeIDs...)
}
return hotReadRegions
Expand Down
68 changes: 23 additions & 45 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/tikv/pd/server/schedule/checker"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/storage"
"go.uber.org/zap"
Expand Down Expand Up @@ -470,35 +469,18 @@ func (c *coordinator) stop() {
c.cancel()
}

// Hack to retrieve info from scheduler.
// TODO: remove it.
type hasHotStatus interface {
GetHotStatus(statistics.RWType) *statistics.StoreHotPeersInfos
GetPendingInfluence() map[uint64]*statistics.Influence
}

func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos {
c.RLock()
defer c.RUnlock()
s, ok := c.schedulers[schedulers.HotRegionName]
if !ok {
return nil
}
if h, ok := s.Scheduler.(hasHotStatus); ok {
return h.GetHotStatus(statistics.Write)
}
return nil
}

func (c *coordinator) getHotReadRegions() *statistics.StoreHotPeersInfos {
c.RLock()
defer c.RUnlock()
s, ok := c.schedulers[schedulers.HotRegionName]
if !ok {
return nil
}
if h, ok := s.Scheduler.(hasHotStatus); ok {
return h.GetHotStatus(statistics.Read)
func (c *coordinator) getHotRegionsByType(typ statistics.RWType) *statistics.StoreHotPeersInfos {
isTraceFlow := c.cluster.GetOpts().IsTraceRegionFlow()
storeLoads := c.cluster.GetStoresLoads()
stores := c.cluster.GetStores()
switch typ {
case statistics.Write:
regionStats := c.cluster.RegionWriteStats()
return statistics.GetHotStatus(stores, storeLoads, regionStats, statistics.Write, isTraceFlow)
case statistics.Read:
regionStats := c.cluster.RegionReadStats()
return statistics.GetHotStatus(stores, storeLoads, regionStats, statistics.Read, isTraceFlow)
default:
}
return nil
}
Expand Down Expand Up @@ -542,36 +524,32 @@ func (c *coordinator) resetSchedulerMetrics() {
}

func (c *coordinator) collectHotSpotMetrics() {
c.RLock()
// Collects hot write region metrics.
s, ok := c.schedulers[schedulers.HotRegionName]
if !ok {
c.RUnlock()
return
}
c.RUnlock()
stores := c.cluster.GetStores()
// Collects hot write region metrics.
collectHotMetrics(s, stores, statistics.Write)
collectHotMetrics(c.cluster, stores, statistics.Write)
// Collects hot read region metrics.
collectHotMetrics(s, stores, statistics.Read)
collectHotMetrics(c.cluster, stores, statistics.Read)
// Collects pending influence.
collectPendingInfluence(s, stores)
collectPendingInfluence(stores)
}

func collectHotMetrics(s *scheduleController, stores []*core.StoreInfo, typ statistics.RWType) {
status := s.Scheduler.(hasHotStatus).GetHotStatus(typ)
func collectHotMetrics(cluster *RaftCluster, stores []*core.StoreInfo, typ statistics.RWType) {
var (
kind string
byteTyp, keyTyp, queryTyp statistics.RegionStatKind
regionStats map[uint64][]*statistics.HotPeerStat
)

switch typ {
case statistics.Read:
regionStats = cluster.RegionReadStats()
kind, byteTyp, keyTyp, queryTyp = statistics.Read.String(), statistics.RegionReadBytes, statistics.RegionReadKeys, statistics.RegionReadQuery
case statistics.Write:
regionStats = cluster.RegionWriteStats()
kind, byteTyp, keyTyp, queryTyp = statistics.Write.String(), statistics.RegionWriteBytes, statistics.RegionWriteKeys, statistics.RegionWriteQuery
}
status := statistics.GetHotStatus(stores, cluster.GetStoresLoads(), regionStats, typ, cluster.GetOpts().IsTraceRegionFlow())

for _, s := range stores {
storeAddress := s.GetAddress()
storeID := s.GetID()
Expand Down Expand Up @@ -604,8 +582,8 @@ func collectHotMetrics(s *scheduleController, stores []*core.StoreInfo, typ stat
}
}

func collectPendingInfluence(s *scheduleController, stores []*core.StoreInfo) {
pendings := s.Scheduler.(hasHotStatus).GetPendingInfluence()
func collectPendingInfluence(stores []*core.StoreInfo) {
pendings := statistics.GetPendingInfluence(stores)
for _, s := range stores {
storeAddress := s.GetAddress()
storeID := s.GetID()
Expand Down
9 changes: 9 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,15 @@ func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
return bc.PutRegion(region)
}

// RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists.
func (bc *BasicCluster) RemoveRegionIfExist(id uint64) {
bc.Lock()
defer bc.Unlock()
if region := bc.Regions.GetRegion(id); region != nil {
bc.Regions.RemoveRegion(region)
}
}

// RemoveRegion removes RegionInfo from regionTree and regionMap.
func (bc *BasicCluster) RemoveRegion(region *RegionInfo) {
bc.Lock()
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,21 @@ func (s *grantHotRegionScheduler) Schedule(cluster schedule.Cluster) []*operator
}

func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) []*operator.Operator {
storeInfos := statistics.SummaryStoreInfos(cluster)
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()

var stLoadInfos map[uint64]*statistics.StoreLoadDetail
switch typ {
case statistics.Read:
stLoadInfos = summaryStoresLoad(
stLoadInfos = statistics.SummaryStoresLoad(
storeInfos,
storesLoads,
cluster.RegionReadStats(),
isTraceRegionFlow,
statistics.Read, core.RegionKind)
case statistics.Write:
stLoadInfos = summaryStoresLoad(
stLoadInfos = statistics.SummaryStoresLoad(
storeInfos,
storesLoads,
cluster.RegionWriteStats(),
Expand Down
46 changes: 5 additions & 41 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule.Cluster) {
h.stInfos = statistics.SummaryStoreInfos(cluster)
h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence()
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()
Expand All @@ -183,13 +183,13 @@ func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule
case statistics.Read:
// update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
h.stLoadInfos[readLeader] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionRead,
isTraceRegionFlow,
statistics.Read, core.LeaderKind)
h.stLoadInfos[readPeer] = summaryStoresLoad(
h.stLoadInfos[readPeer] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionRead,
Expand All @@ -198,13 +198,13 @@ func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule
case statistics.Write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
h.stLoadInfos[writeLeader] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionWrite,
isTraceRegionFlow,
statistics.Write, core.LeaderKind)
h.stLoadInfos[writePeer] = summaryStoresLoad(
h.stLoadInfos[writePeer] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionWrite,
Expand Down Expand Up @@ -1127,42 +1127,6 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic
return op, infl
}

func (h *hotScheduler) GetHotStatus(typ statistics.RWType) *statistics.StoreHotPeersInfos {
h.RLock()
defer h.RUnlock()
var leaderTyp, peerTyp resourceType
switch typ {
case statistics.Read:
leaderTyp, peerTyp = readLeader, readPeer
case statistics.Write:
leaderTyp, peerTyp = writeLeader, writePeer
}
asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[leaderTyp]))
asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[peerTyp]))
for id, detail := range h.stLoadInfos[leaderTyp] {
asLeader[id] = detail.ToHotPeersStat()
}
for id, detail := range h.stLoadInfos[peerTyp] {
asPeer[id] = detail.ToHotPeersStat()
}
return &statistics.StoreHotPeersInfos{
AsLeader: asLeader,
AsPeer: asPeer,
}
}

func (h *hotScheduler) GetPendingInfluence() map[uint64]*statistics.Influence {
h.RLock()
defer h.RUnlock()
ret := make(map[uint64]*statistics.Influence, len(h.stInfos))
for id, info := range h.stInfos {
if info.PendingSum != nil {
ret[id] = info.PendingSum
}
}
return ret
}

// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1]
func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) {
status := op.CheckAndGetStatus()
Expand Down
9 changes: 0 additions & 9 deletions server/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ var schedulerStatus = prometheus.NewGaugeVec(
Help: "Inner status of the scheduler.",
}, []string{"type", "name"})

var hotPeerSummary = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "hot_peers_summary",
Help: "Hot peers summary for each store",
}, []string{"type", "store"})

var opInfluenceStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Expand Down Expand Up @@ -123,7 +115,6 @@ var hotPendingStatus = prometheus.NewGaugeVec(
func init() {
prometheus.MustRegister(schedulerCounter)
prometheus.MustRegister(schedulerStatus)
prometheus.MustRegister(hotPeerSummary)
prometheus.MustRegister(balanceLeaderCounter)
prometheus.MustRegister(balanceRegionCounter)
prometheus.MustRegister(hotSchedulerResultCounter)
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,21 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster schedule.Cluster) []*operat
}

func (s *shuffleHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) []*operator.Operator {
storeInfos := statistics.SummaryStoreInfos(cluster)
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()

switch typ {
case statistics.Read:
s.stLoadInfos[readLeader] = summaryStoresLoad(
s.stLoadInfos[readLeader] = statistics.SummaryStoresLoad(
storeInfos,
storesLoads,
cluster.RegionReadStats(),
isTraceRegionFlow,
statistics.Read, core.LeaderKind)
return s.randomSchedule(cluster, s.stLoadInfos[readLeader])
case statistics.Write:
s.stLoadInfos[writeLeader] = summaryStoresLoad(
s.stLoadInfos[writeLeader] = statistics.SummaryStoresLoad(
storeInfos,
storesLoads,
cluster.RegionWriteStats(),
Expand Down
Loading

0 comments on commit f605a2c

Please sign in to comment.