Skip to content

Commit

Permalink
statistics: small refactor of hot statistics (#4461)
Browse files Browse the repository at this point in the history
* small refactor of hot statistics

close #4463

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* address the comment

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* address the comment

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx and ti-chi-bot authored Dec 28, 2021
1 parent ea57c6d commit 909f2fd
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 218 deletions.
10 changes: 9 additions & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {

// HotRegionsFromStore picks hot regions in specify store.
func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*core.RegionInfo {
stats := mc.HotCache.HotRegionsFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold())
stats := hotRegionsFromStore(mc.HotCache, store, kind, mc.GetHotRegionCacheHitsThreshold())
regions := make([]*core.RegionInfo, 0, len(stats))
for _, stat := range stats {
region := mc.GetRegion(stat.RegionID)
Expand All @@ -141,6 +141,14 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*
return regions
}

// hotRegionsFromStore picks hot region in specify store.
func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics.RWType, minHotDegree int) []*statistics.HotPeerStat {
if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 {
return stats
}
return nil
}

// AllocPeer allocs a new peer on a store.
func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.AllocID()
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.limiter.Collect(newStore.GetStoreStats())
}

regionIDs := make(map[uint64]struct{}, len(stats.GetPeerStats()))
regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats()))
for _, peerStat := range stats.GetPeerStats() {
regionID := peerStat.GetRegionId()
regionIDs[regionID] = struct{}{}
region := c.GetRegion(regionID)
regions[regionID] = region
if region == nil {
log.Warn("discard hot peer stat for unknown region",
zap.Uint64("region-id", regionID),
Expand All @@ -624,7 +624,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval))
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR
}
}
stat := core.HistoryHotRegion{
// store in ms.
// store in ms.
UpdateTime: hotPeerStat.LastUpdateTime.UnixNano() / int64(time.Millisecond),
RegionID: hotPeerStat.RegionID,
StoreID: hotPeerStat.StoreID,
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {
c.Check(len(items), Greater, 0)
for _, item := range items {
if item.StoreID == 3 {
c.Check(item.IsNeedDelete(), IsTrue)
c.Check(item.GetActionType(), Equals, statistics.Remove)
continue
}
c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader+2)
Expand Down Expand Up @@ -1586,9 +1586,9 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
c.Check(item.GetActionType(), Equals, statistics.Remove)
} else {
c.Check(item.IsNeedDelete(), IsFalse)
c.Check(item.GetActionType(), Equals, statistics.Update)
}
}
}
Expand Down
102 changes: 43 additions & 59 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,53 +28,49 @@ const queueCap = 20000

// HotCache is a cache hold hot regions.
type HotCache struct {
ctx context.Context
readFlowQueue chan FlowItemTask
writeFlowQueue chan FlowItemTask
writeFlow *hotPeerCache
readFlow *hotPeerCache
ctx context.Context
writeCache *hotPeerCache
readCache *hotPeerCache
}

// NewHotCache creates a new hot spot cache.
func NewHotCache(ctx context.Context) *HotCache {
w := &HotCache{
ctx: ctx,
readFlowQueue: make(chan FlowItemTask, queueCap),
writeFlowQueue: make(chan FlowItemTask, queueCap),
writeFlow: NewHotPeerCache(Write),
readFlow: NewHotPeerCache(Read),
ctx: ctx,
writeCache: NewHotPeerCache(Write),
readCache: NewHotPeerCache(Read),
}
go w.updateItems(w.readFlowQueue, w.runReadTask)
go w.updateItems(w.writeFlowQueue, w.runWriteTask)
go w.updateItems(w.readCache.taskQueue, w.runReadTask)
go w.updateItems(w.writeCache.taskQueue, w.runWriteTask)
return w
}

// CheckWritePeerSync checks the write status, returns update items.
// This is used for mockcluster.
func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
return w.writeFlow.CheckPeerFlow(peer, region)
return w.writeCache.checkPeerFlow(peer, region)
}

// CheckReadPeerSync checks the read status, returns update items.
// This is used for mockcluster.
func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
return w.readFlow.CheckPeerFlow(peer, region)
return w.readCache.checkPeerFlow(peer, region)
}

// CheckWriteAsync puts the flowItem into queue, and check it asynchronously
func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool {
func (w *HotCache) CheckWriteAsync(task flowItemTask) bool {
select {
case w.writeFlowQueue <- task:
case w.writeCache.taskQueue <- task:
return true
default:
return false
}
}

// CheckReadAsync puts the flowItem into queue, and check it asynchronously
func (w *HotCache) CheckReadAsync(task FlowItemTask) bool {
func (w *HotCache) CheckReadAsync(task flowItemTask) bool {
select {
case w.readFlowQueue <- task:
case w.readCache.taskQueue <- task:
return true
default:
return false
Expand All @@ -86,39 +82,26 @@ func (w *HotCache) CheckReadAsync(task FlowItemTask) bool {
func (w *HotCache) Update(item *HotPeerStat) {
switch item.Kind {
case Write:
update(item, w.writeFlow)
updateStat(w.writeCache, item)
case Read:
update(item, w.readFlow)
updateStat(w.readCache, item)
}
}

// RegionStats returns hot items according to kind
func (w *HotCache) RegionStats(kind RWType, minHotDegree int) map[uint64][]*HotPeerStat {
task := newCollectRegionStatsTask(minHotDegree)
var succ bool
switch kind {
case Write:
task := newCollectRegionStatsTask(minHotDegree)
succ := w.CheckWriteAsync(task)
if !succ {
return nil
}
return task.waitRet(w.ctx)
succ = w.CheckWriteAsync(task)
case Read:
task := newCollectRegionStatsTask(minHotDegree)
succ := w.CheckReadAsync(task)
if !succ {
return nil
}
return task.waitRet(w.ctx)
succ = w.CheckReadAsync(task)
}
return nil
}

// HotRegionsFromStore picks hot region in specify store.
func (w *HotCache) HotRegionsFromStore(storeID uint64, kind RWType, minHotDegree int) []*HotPeerStat {
if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 {
return stats
if !succ {
return nil
}
return nil
return task.waitRet(w.ctx)
}

// IsRegionHot checks if the region is hot.
Expand Down Expand Up @@ -149,13 +132,13 @@ func (w *HotCache) ResetMetrics() {
// ExpiredReadItems returns the read items which are already expired.
// This is used for mockcluster.
func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat {
return w.readFlow.CollectExpiredItems(region)
return w.readCache.collectExpiredItems(region)
}

// ExpiredWriteItems returns the write items which are already expired.
// This is used for mockcluster.
func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat {
return w.writeFlow.CollectExpiredItems(region)
return w.writeCache.collectExpiredItems(region)
}

func incMetrics(name string, storeID uint64, kind RWType) {
Expand All @@ -172,14 +155,14 @@ func incMetrics(name string, storeID uint64, kind RWType) {
func (w *HotCache) GetFilledPeriod(kind RWType) int {
switch kind {
case Write:
return w.writeFlow.getDefaultTimeMedian().GetFilledPeriod()
return w.writeCache.getDefaultTimeMedian().GetFilledPeriod()
case Read:
return w.readFlow.getDefaultTimeMedian().GetFilledPeriod()
return w.readCache.getDefaultTimeMedian().GetFilledPeriod()
}
return 0
}

func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task FlowItemTask)) {
func (w *HotCache) updateItems(queue <-chan flowItemTask, runTask func(task flowItemTask)) {
for {
select {
case <-w.ctx.Done():
Expand All @@ -190,29 +173,30 @@ func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task Flow
}
}

func (w *HotCache) runReadTask(task FlowItemTask) {
func (w *HotCache) runReadTask(task flowItemTask) {
if task != nil {
// TODO: do we need a run-task timeout to protect the queue won't be stucked by a task?
task.runTask(w.readFlow)
hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readFlowQueue)))
// TODO: do we need a run-task timeout to protect the queue won't be stuck by a task?
task.runTask(w.readCache)
hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readCache.taskQueue)))
}
}

func (w *HotCache) runWriteTask(task FlowItemTask) {
func (w *HotCache) runWriteTask(task flowItemTask) {
if task != nil {
// TODO: do we need a run-task timeout to protect the queue won't be stucked by a task?
task.runTask(w.writeFlow)
hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeFlowQueue)))
// TODO: do we need a run-task timeout to protect the queue won't be stuck by a task?
task.runTask(w.writeCache)
hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeCache.taskQueue)))
}
}

func update(item *HotPeerStat, flow *hotPeerCache) {
flow.Update(item)
if item.IsNeedDelete() {
incMetrics("remove_item", item.StoreID, item.Kind)
} else if item.IsNew() {
func updateStat(cache *hotPeerCache, item *HotPeerStat) {
cache.update(item)
switch item.actionType {
case Add:
incMetrics("add_item", item.StoreID, item.Kind)
} else {
case Remove:
incMetrics("remove_item", item.StoreID, item.Kind)
case Update:
incMetrics("update_item", item.StoreID, item.Kind)
}
}
54 changes: 27 additions & 27 deletions server/statistics/hot_cache_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const (
collectMetricsTaskType
)

// FlowItemTask indicates the task in flowItem queue
type FlowItemTask interface {
// flowItemTask indicates the task in flowItem queue
type flowItemTask interface {
taskType() flowItemTaskKind
runTask(flow *hotPeerCache)
runTask(cache *hotPeerCache)
}

type checkPeerTask struct {
Expand All @@ -43,7 +43,7 @@ type checkPeerTask struct {
}

// NewCheckPeerTask creates task to update peerInfo
func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask {
func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) flowItemTask {
return &checkPeerTask{
peerInfo: peerInfo,
regionInfo: regionInfo,
Expand All @@ -54,10 +54,10 @@ func (t *checkPeerTask) taskType() flowItemTaskKind {
return checkPeerTaskType
}

func (t *checkPeerTask) runTask(flow *hotPeerCache) {
stat := flow.CheckPeerFlow(t.peerInfo, t.regionInfo)
func (t *checkPeerTask) runTask(cache *hotPeerCache) {
stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo)
if stat != nil {
update(stat, flow)
updateStat(cache, stat)
}
}

Expand All @@ -66,7 +66,7 @@ type checkExpiredTask struct {
}

// NewCheckExpiredItemTask creates task to collect expired items
func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask {
func NewCheckExpiredItemTask(region *core.RegionInfo) flowItemTask {
return &checkExpiredTask{
region: region,
}
Expand All @@ -76,36 +76,36 @@ func (t *checkExpiredTask) taskType() flowItemTaskKind {
return checkExpiredTaskType
}

func (t *checkExpiredTask) runTask(flow *hotPeerCache) {
expiredStats := flow.CollectExpiredItems(t.region)
func (t *checkExpiredTask) runTask(cache *hotPeerCache) {
expiredStats := cache.collectExpiredItems(t.region)
for _, stat := range expiredStats {
update(stat, flow)
updateStat(cache, stat)
}
}

type collectUnReportedPeerTask struct {
storeID uint64
regionIDs map[uint64]struct{}
interval uint64
storeID uint64
regions map[uint64]*core.RegionInfo
interval uint64
}

// NewCollectUnReportedPeerTask creates task to collect unreported peers
func NewCollectUnReportedPeerTask(storeID uint64, regionIDs map[uint64]struct{}, interval uint64) FlowItemTask {
func NewCollectUnReportedPeerTask(storeID uint64, regions map[uint64]*core.RegionInfo, interval uint64) flowItemTask {
return &collectUnReportedPeerTask{
storeID: storeID,
regionIDs: regionIDs,
interval: interval,
storeID: storeID,
regions: regions,
interval: interval,
}
}

func (t *collectUnReportedPeerTask) taskType() flowItemTaskKind {
return collectUnReportedPeerTaskType
}

func (t *collectUnReportedPeerTask) runTask(flow *hotPeerCache) {
stats := flow.CheckColdPeer(t.storeID, t.regionIDs, t.interval)
func (t *collectUnReportedPeerTask) runTask(cache *hotPeerCache) {
stats := cache.checkColdPeer(t.storeID, t.regions, t.interval)
for _, stat := range stats {
update(stat, flow)
updateStat(cache, stat)
}
}

Expand All @@ -125,8 +125,8 @@ func (t *collectRegionStatsTask) taskType() flowItemTaskKind {
return collectRegionStatsTaskType
}

func (t *collectRegionStatsTask) runTask(flow *hotPeerCache) {
t.ret <- flow.RegionStats(t.minDegree)
func (t *collectRegionStatsTask) runTask(cache *hotPeerCache) {
t.ret <- cache.RegionStats(t.minDegree)
}

// TODO: do we need a wait-return timeout?
Expand Down Expand Up @@ -157,8 +157,8 @@ func (t *isRegionHotTask) taskType() flowItemTaskKind {
return isRegionHotTaskType
}

func (t *isRegionHotTask) runTask(flow *hotPeerCache) {
t.ret <- flow.isRegionHotWithAnyPeers(t.region, t.minHotDegree)
func (t *isRegionHotTask) runTask(cache *hotPeerCache) {
t.ret <- cache.isRegionHotWithAnyPeers(t.region, t.minHotDegree)
}

// TODO: do we need a wait-return timeout?
Expand All @@ -185,6 +185,6 @@ func (t *collectMetricsTask) taskType() flowItemTaskKind {
return collectMetricsTaskType
}

func (t *collectMetricsTask) runTask(flow *hotPeerCache) {
flow.CollectMetrics(t.typ)
func (t *collectMetricsTask) runTask(cache *hotPeerCache) {
cache.collectMetrics(t.typ)
}
Loading

0 comments on commit 909f2fd

Please sign in to comment.