Skip to content

Commit

Permalink
merge hot stats
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Aug 3, 2023
1 parent c8c5e1e commit 92abfc0
Show file tree
Hide file tree
Showing 37 changed files with 1,070 additions and 1,041 deletions.
29 changes: 15 additions & 14 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
Expand Down Expand Up @@ -145,15 +146,15 @@ func (mc *Cluster) IsRegionHot(region *core.RegionInfo) bool {
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (mc *Cluster) GetHotPeerStat(rw statistics.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
func (mc *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
return mc.HotCache.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// We directly use threshold for read stats for mockCluster
return mc.HotCache.RegionStats(statistics.Read, mc.GetHotRegionCacheHitsThreshold())
return mc.HotCache.RegionStats(utils.Read, mc.GetHotRegionCacheHitsThreshold())
}

// BucketsStats returns hot region's buckets stats.
Expand All @@ -168,11 +169,11 @@ func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buc
// RegionWriteStats returns hot region's write stats.
// The result only includes peers that are hot enough.
func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
return mc.HotCache.RegionStats(statistics.Write, mc.GetHotRegionCacheHitsThreshold())
return mc.HotCache.RegionStats(utils.Write, mc.GetHotRegionCacheHitsThreshold())
}

// HotRegionsFromStore picks hot regions in specify store.
func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*core.RegionInfo {
func (mc *Cluster) HotRegionsFromStore(store uint64, kind utils.RWType) []*core.RegionInfo {
stats := hotRegionsFromStore(mc.HotCache, store, kind, mc.GetHotRegionCacheHitsThreshold())
regions := make([]*core.RegionInfo, 0, len(stats))
for _, stat := range stats {
Expand All @@ -185,7 +186,7 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*
}

// hotRegionsFromStore picks hot region in specify store.
func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics.RWType, minHotDegree int) []*statistics.HotPeerStat {
func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind utils.RWType, minHotDegree int) []*statistics.HotPeerStat {
if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 {
return stats
}
Expand Down Expand Up @@ -464,7 +465,7 @@ func (mc *Cluster) AddRegionWithReadInfo(
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(0, reportInterval))
r = r.Clone(core.SetReadQuery(readQuery))
filledNum := statistics.DefaultAotSize
filledNum := utils.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand All @@ -473,7 +474,7 @@ func (mc *Cluster) AddRegionWithReadInfo(
for i := 0; i < filledNum; i++ {
items = mc.CheckRegionRead(r)
for _, item := range items {
mc.HotCache.Update(item, statistics.Read)
mc.HotCache.Update(item, utils.Read)
}
}
mc.PutRegion(r)
Expand All @@ -485,7 +486,7 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStor
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes), core.SetReadKeys(readKeys), core.SetReportInterval(0, reportInterval))
filledNum := statistics.DefaultAotSize
filledNum := utils.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand All @@ -494,7 +495,7 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStor
items = mc.CheckRegionRead(r)
for _, item := range items {
if item.StoreID == targetStoreID {
mc.HotCache.Update(item, statistics.Read)
mc.HotCache.Update(item, utils.Read)
}
}
}
Expand All @@ -513,7 +514,7 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo(
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReadQuery(readQuery))
r = r.Clone(core.SetReportInterval(0, reportInterval))
filledNum := statistics.DefaultAotSize
filledNum := utils.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand All @@ -522,7 +523,7 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo(
for i := 0; i < filledNum; i++ {
items = mc.CheckRegionLeaderRead(r)
for _, item := range items {
mc.HotCache.Update(item, statistics.Read)
mc.HotCache.Update(item, utils.Read)
}
}
mc.PutRegion(r)
Expand All @@ -541,7 +542,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(
r = r.Clone(core.SetReportInterval(0, reportInterval))
r = r.Clone(core.SetWrittenQuery(writtenQuery))

filledNum := statistics.DefaultAotSize
filledNum := utils.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand All @@ -550,7 +551,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(
for i := 0; i < filledNum; i++ {
items = mc.CheckRegionWrite(r)
for _, item := range items {
mc.HotCache.Update(item, statistics.Write)
mc.HotCache.Update(item, utils.Write)
}
}
mc.PutRegion(r)
Expand Down Expand Up @@ -734,7 +735,7 @@ func (mc *Cluster) updateStorageStatistics(storeID uint64, update func(*pdpb.Sto
newStats := typeutil.DeepClone(store.GetStoreStats(), core.StoreStatsFactory)
update(newStats)
now := time.Now().Unix()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - utils.ReadReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
Expand Down
33 changes: 17 additions & 16 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -503,18 +504,18 @@ func (c *Coordinator) Stop() {
}

// GetHotRegionsByType gets hot regions' statistics by RWType.
func (c *Coordinator) GetHotRegionsByType(typ statistics.RWType) *statistics.StoreHotPeersInfos {
func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHotPeersInfos {
isTraceFlow := c.cluster.GetSchedulerConfig().IsTraceRegionFlow()
storeLoads := c.cluster.GetStoresLoads()
stores := c.cluster.GetStores()
var infos *statistics.StoreHotPeersInfos
switch typ {
case statistics.Write:
case utils.Write:
regionStats := c.cluster.RegionWriteStats()
infos = statistics.GetHotStatus(stores, storeLoads, regionStats, statistics.Write, isTraceFlow)
case statistics.Read:
infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Write, isTraceFlow)
case utils.Read:
regionStats := c.cluster.RegionReadStats()
infos = statistics.GetHotStatus(stores, storeLoads, regionStats, statistics.Read, isTraceFlow)
infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Read, isTraceFlow)
default:
}
// update params `IsLearner` and `LastUpdateTime`
Expand All @@ -528,11 +529,11 @@ func (c *Coordinator) GetHotRegionsByType(typ statistics.RWType) *statistics.Sto
h.IsLearner = core.IsLearner(region.GetPeer(h.StoreID))
}
switch typ {
case statistics.Write:
case utils.Write:
if region != nil {
h.LastUpdateTime = time.Unix(int64(region.GetInterval().GetEndTimestamp()), 0)
}
case statistics.Read:
case utils.Read:
store := c.cluster.GetStore(h.StoreID)
if store != nil {
ts := store.GetMeta().GetLastHeartbeat()
Expand All @@ -555,24 +556,24 @@ func (c *Coordinator) GetWaitGroup() *sync.WaitGroup {
func (c *Coordinator) CollectHotSpotMetrics() {
stores := c.cluster.GetStores()
// Collects hot write region metrics.
collectHotMetrics(c.cluster, stores, statistics.Write)
collectHotMetrics(c.cluster, stores, utils.Write)
// Collects hot read region metrics.
collectHotMetrics(c.cluster, stores, statistics.Read)
collectHotMetrics(c.cluster, stores, utils.Read)
}

func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, typ statistics.RWType) {
func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, typ utils.RWType) {
var (
kind string
regionStats map[uint64][]*statistics.HotPeerStat
)

switch typ {
case statistics.Read:
case utils.Read:
regionStats = cluster.RegionReadStats()
kind = statistics.Read.String()
case statistics.Write:
kind = utils.Read.String()
case utils.Write:
regionStats = cluster.RegionWriteStats()
kind = statistics.Write.String()
kind = utils.Write.String()
}
status := statistics.CollectHotPeerInfos(stores, regionStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count

Expand Down Expand Up @@ -608,8 +609,8 @@ func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, t
}

if !hasHotLeader && !hasHotPeer {
statistics.ForeachRegionStats(func(rwTy statistics.RWType, dim int, _ statistics.RegionStatKind) {
schedulers.HotPendingSum.DeleteLabelValues(storeLabel, rwTy.String(), statistics.DimToString(dim))
utils.ForeachRegionStats(func(rwTy utils.RWType, dim int, _ utils.RegionStatKind) {
schedulers.HotPendingSum.DeleteLabelValues(storeLabel, rwTy.String(), utils.DimToString(dim))
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand Down Expand Up @@ -234,7 +235,7 @@ func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
return s.dispatch(rw, cluster), nil
}

func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
func (s *grantHotRegionScheduler) dispatch(typ utils.RWType, cluster sche.SchedulerCluster) []*operator.Operator {
stLoadInfos := s.stLoadInfos[buildResourceType(typ, constant.RegionKind)]
infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos))
index := 0
Expand All @@ -243,7 +244,7 @@ func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster sche.S
index++
}
sort.Slice(infos, func(i, j int) bool {
return infos[i].LoadPred.Current.Loads[statistics.ByteDim] > infos[j].LoadPred.Current.Loads[statistics.ByteDim]
return infos[i].LoadPred.Current.Loads[utils.ByteDim] > infos[j].LoadPred.Current.Loads[utils.ByteDim]
})
return s.randomSchedule(cluster, infos)
}
Expand Down
Loading

0 comments on commit 92abfc0

Please sign in to comment.