From 909f2fdebb3f06b704dfc4498b498708efe9ee32 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 28 Dec 2021 13:41:49 +0800 Subject: [PATCH] statistics: small refactor of hot statistics (#4461) * small refactor of hot statistics close #4463 Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/mock/mockcluster/mockcluster.go | 10 +- server/cluster/cluster.go | 6 +- server/handler.go | 2 +- server/schedulers/hot_region_test.go | 6 +- server/statistics/hot_cache.go | 102 +++++++++------------ server/statistics/hot_cache_task.go | 54 +++++------ server/statistics/hot_peer.go | 60 ++++++------ server/statistics/hot_peer_cache.go | 111 ++++++++++++----------- server/statistics/hot_peer_cache_test.go | 70 +++++++------- server/statistics/kind.go | 22 +++++ 10 files changed, 225 insertions(+), 218 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index ce8c79652ab..0864e9e525c 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -130,7 +130,7 @@ func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { // HotRegionsFromStore picks hot regions in specify store. func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*core.RegionInfo { - stats := mc.HotCache.HotRegionsFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) + stats := hotRegionsFromStore(mc.HotCache, store, kind, mc.GetHotRegionCacheHitsThreshold()) regions := make([]*core.RegionInfo, 0, len(stats)) for _, stat := range stats { region := mc.GetRegion(stat.RegionID) @@ -141,6 +141,14 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []* return regions } +// hotRegionsFromStore picks hot region in specify store. +func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics.RWType, minHotDegree int) []*statistics.HotPeerStat { + if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { + return stats + } + return nil +} + // AllocPeer allocs a new peer on a store. func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { peerID, err := mc.AllocID() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index fa9fce926ef..c6738b916fb 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -594,11 +594,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { c.limiter.Collect(newStore.GetStoreStats()) } - regionIDs := make(map[uint64]struct{}, len(stats.GetPeerStats())) + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) for _, peerStat := range stats.GetPeerStats() { regionID := peerStat.GetRegionId() - regionIDs[regionID] = struct{}{} region := c.GetRegion(regionID) + regions[regionID] = region if region == nil { log.Warn("discard hot peer stat for unknown region", zap.Uint64("region-id", regionID), @@ -624,7 +624,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { peerInfo := core.NewPeerInfo(peer, loads, interval) c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval)) + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) return nil } diff --git a/server/handler.go b/server/handler.go index df3285120b6..46fc5c98455 100644 --- a/server/handler.go +++ b/server/handler.go @@ -999,7 +999,7 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR } } stat := core.HistoryHotRegion{ - // store in ms. + // store in ms. UpdateTime: hotPeerStat.LastUpdateTime.UnixNano() / int64(time.Millisecond), RegionID: hotPeerStat.RegionID, StoreID: hotPeerStat.StoreID, diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index dd7192cba55..39328ceac74 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -1552,7 +1552,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { c.Check(len(items), Greater, 0) for _, item := range items { if item.StoreID == 3 { - c.Check(item.IsNeedDelete(), IsTrue) + c.Check(item.GetActionType(), Equals, statistics.Remove) continue } c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader+2) @@ -1586,9 +1586,9 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) { items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1) for _, item := range items { if item.StoreID < 4 { - c.Check(item.IsNeedDelete(), IsTrue) + c.Check(item.GetActionType(), Equals, statistics.Remove) } else { - c.Check(item.IsNeedDelete(), IsFalse) + c.Check(item.GetActionType(), Equals, statistics.Update) } } } diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index 125b5f6206e..ece8932c959 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -28,43 +28,39 @@ const queueCap = 20000 // HotCache is a cache hold hot regions. type HotCache struct { - ctx context.Context - readFlowQueue chan FlowItemTask - writeFlowQueue chan FlowItemTask - writeFlow *hotPeerCache - readFlow *hotPeerCache + ctx context.Context + writeCache *hotPeerCache + readCache *hotPeerCache } // NewHotCache creates a new hot spot cache. func NewHotCache(ctx context.Context) *HotCache { w := &HotCache{ - ctx: ctx, - readFlowQueue: make(chan FlowItemTask, queueCap), - writeFlowQueue: make(chan FlowItemTask, queueCap), - writeFlow: NewHotPeerCache(Write), - readFlow: NewHotPeerCache(Read), + ctx: ctx, + writeCache: NewHotPeerCache(Write), + readCache: NewHotPeerCache(Read), } - go w.updateItems(w.readFlowQueue, w.runReadTask) - go w.updateItems(w.writeFlowQueue, w.runWriteTask) + go w.updateItems(w.readCache.taskQueue, w.runReadTask) + go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) return w } // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster. func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.writeFlow.CheckPeerFlow(peer, region) + return w.writeCache.checkPeerFlow(peer, region) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster. func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.readFlow.CheckPeerFlow(peer, region) + return w.readCache.checkPeerFlow(peer, region) } // CheckWriteAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { +func (w *HotCache) CheckWriteAsync(task flowItemTask) bool { select { - case w.writeFlowQueue <- task: + case w.writeCache.taskQueue <- task: return true default: return false @@ -72,9 +68,9 @@ func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { } // CheckReadAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { +func (w *HotCache) CheckReadAsync(task flowItemTask) bool { select { - case w.readFlowQueue <- task: + case w.readCache.taskQueue <- task: return true default: return false @@ -86,39 +82,26 @@ func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { func (w *HotCache) Update(item *HotPeerStat) { switch item.Kind { case Write: - update(item, w.writeFlow) + updateStat(w.writeCache, item) case Read: - update(item, w.readFlow) + updateStat(w.readCache, item) } } // RegionStats returns hot items according to kind func (w *HotCache) RegionStats(kind RWType, minHotDegree int) map[uint64][]*HotPeerStat { + task := newCollectRegionStatsTask(minHotDegree) + var succ bool switch kind { case Write: - task := newCollectRegionStatsTask(minHotDegree) - succ := w.CheckWriteAsync(task) - if !succ { - return nil - } - return task.waitRet(w.ctx) + succ = w.CheckWriteAsync(task) case Read: - task := newCollectRegionStatsTask(minHotDegree) - succ := w.CheckReadAsync(task) - if !succ { - return nil - } - return task.waitRet(w.ctx) + succ = w.CheckReadAsync(task) } - return nil -} - -// HotRegionsFromStore picks hot region in specify store. -func (w *HotCache) HotRegionsFromStore(storeID uint64, kind RWType, minHotDegree int) []*HotPeerStat { - if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { - return stats + if !succ { + return nil } - return nil + return task.waitRet(w.ctx) } // IsRegionHot checks if the region is hot. @@ -149,13 +132,13 @@ func (w *HotCache) ResetMetrics() { // ExpiredReadItems returns the read items which are already expired. // This is used for mockcluster. func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat { - return w.readFlow.CollectExpiredItems(region) + return w.readCache.collectExpiredItems(region) } // ExpiredWriteItems returns the write items which are already expired. // This is used for mockcluster. func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat { - return w.writeFlow.CollectExpiredItems(region) + return w.writeCache.collectExpiredItems(region) } func incMetrics(name string, storeID uint64, kind RWType) { @@ -172,14 +155,14 @@ func incMetrics(name string, storeID uint64, kind RWType) { func (w *HotCache) GetFilledPeriod(kind RWType) int { switch kind { case Write: - return w.writeFlow.getDefaultTimeMedian().GetFilledPeriod() + return w.writeCache.getDefaultTimeMedian().GetFilledPeriod() case Read: - return w.readFlow.getDefaultTimeMedian().GetFilledPeriod() + return w.readCache.getDefaultTimeMedian().GetFilledPeriod() } return 0 } -func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task FlowItemTask)) { +func (w *HotCache) updateItems(queue <-chan flowItemTask, runTask func(task flowItemTask)) { for { select { case <-w.ctx.Done(): @@ -190,29 +173,30 @@ func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task Flow } } -func (w *HotCache) runReadTask(task FlowItemTask) { +func (w *HotCache) runReadTask(task flowItemTask) { if task != nil { - // TODO: do we need a run-task timeout to protect the queue won't be stucked by a task? - task.runTask(w.readFlow) - hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readFlowQueue))) + // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? + task.runTask(w.readCache) + hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readCache.taskQueue))) } } -func (w *HotCache) runWriteTask(task FlowItemTask) { +func (w *HotCache) runWriteTask(task flowItemTask) { if task != nil { - // TODO: do we need a run-task timeout to protect the queue won't be stucked by a task? - task.runTask(w.writeFlow) - hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeFlowQueue))) + // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? + task.runTask(w.writeCache) + hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeCache.taskQueue))) } } -func update(item *HotPeerStat, flow *hotPeerCache) { - flow.Update(item) - if item.IsNeedDelete() { - incMetrics("remove_item", item.StoreID, item.Kind) - } else if item.IsNew() { +func updateStat(cache *hotPeerCache, item *HotPeerStat) { + cache.update(item) + switch item.actionType { + case Add: incMetrics("add_item", item.StoreID, item.Kind) - } else { + case Remove: + incMetrics("remove_item", item.StoreID, item.Kind) + case Update: incMetrics("update_item", item.StoreID, item.Kind) } } diff --git a/server/statistics/hot_cache_task.go b/server/statistics/hot_cache_task.go index 672306b35d0..2f71c5ecee5 100644 --- a/server/statistics/hot_cache_task.go +++ b/server/statistics/hot_cache_task.go @@ -31,10 +31,10 @@ const ( collectMetricsTaskType ) -// FlowItemTask indicates the task in flowItem queue -type FlowItemTask interface { +// flowItemTask indicates the task in flowItem queue +type flowItemTask interface { taskType() flowItemTaskKind - runTask(flow *hotPeerCache) + runTask(cache *hotPeerCache) } type checkPeerTask struct { @@ -43,7 +43,7 @@ type checkPeerTask struct { } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask { +func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) flowItemTask { return &checkPeerTask{ peerInfo: peerInfo, regionInfo: regionInfo, @@ -54,10 +54,10 @@ func (t *checkPeerTask) taskType() flowItemTaskKind { return checkPeerTaskType } -func (t *checkPeerTask) runTask(flow *hotPeerCache) { - stat := flow.CheckPeerFlow(t.peerInfo, t.regionInfo) +func (t *checkPeerTask) runTask(cache *hotPeerCache) { + stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo) if stat != nil { - update(stat, flow) + updateStat(cache, stat) } } @@ -66,7 +66,7 @@ type checkExpiredTask struct { } // NewCheckExpiredItemTask creates task to collect expired items -func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask { +func NewCheckExpiredItemTask(region *core.RegionInfo) flowItemTask { return &checkExpiredTask{ region: region, } @@ -76,25 +76,25 @@ func (t *checkExpiredTask) taskType() flowItemTaskKind { return checkExpiredTaskType } -func (t *checkExpiredTask) runTask(flow *hotPeerCache) { - expiredStats := flow.CollectExpiredItems(t.region) +func (t *checkExpiredTask) runTask(cache *hotPeerCache) { + expiredStats := cache.collectExpiredItems(t.region) for _, stat := range expiredStats { - update(stat, flow) + updateStat(cache, stat) } } type collectUnReportedPeerTask struct { - storeID uint64 - regionIDs map[uint64]struct{} - interval uint64 + storeID uint64 + regions map[uint64]*core.RegionInfo + interval uint64 } // NewCollectUnReportedPeerTask creates task to collect unreported peers -func NewCollectUnReportedPeerTask(storeID uint64, regionIDs map[uint64]struct{}, interval uint64) FlowItemTask { +func NewCollectUnReportedPeerTask(storeID uint64, regions map[uint64]*core.RegionInfo, interval uint64) flowItemTask { return &collectUnReportedPeerTask{ - storeID: storeID, - regionIDs: regionIDs, - interval: interval, + storeID: storeID, + regions: regions, + interval: interval, } } @@ -102,10 +102,10 @@ func (t *collectUnReportedPeerTask) taskType() flowItemTaskKind { return collectUnReportedPeerTaskType } -func (t *collectUnReportedPeerTask) runTask(flow *hotPeerCache) { - stats := flow.CheckColdPeer(t.storeID, t.regionIDs, t.interval) +func (t *collectUnReportedPeerTask) runTask(cache *hotPeerCache) { + stats := cache.checkColdPeer(t.storeID, t.regions, t.interval) for _, stat := range stats { - update(stat, flow) + updateStat(cache, stat) } } @@ -125,8 +125,8 @@ func (t *collectRegionStatsTask) taskType() flowItemTaskKind { return collectRegionStatsTaskType } -func (t *collectRegionStatsTask) runTask(flow *hotPeerCache) { - t.ret <- flow.RegionStats(t.minDegree) +func (t *collectRegionStatsTask) runTask(cache *hotPeerCache) { + t.ret <- cache.RegionStats(t.minDegree) } // TODO: do we need a wait-return timeout? @@ -157,8 +157,8 @@ func (t *isRegionHotTask) taskType() flowItemTaskKind { return isRegionHotTaskType } -func (t *isRegionHotTask) runTask(flow *hotPeerCache) { - t.ret <- flow.isRegionHotWithAnyPeers(t.region, t.minHotDegree) +func (t *isRegionHotTask) runTask(cache *hotPeerCache) { + t.ret <- cache.isRegionHotWithAnyPeers(t.region, t.minHotDegree) } // TODO: do we need a wait-return timeout? @@ -185,6 +185,6 @@ func (t *collectMetricsTask) taskType() flowItemTaskKind { return collectMetricsTaskType } -func (t *collectMetricsTask) runTask(flow *hotPeerCache) { - flow.CollectMetrics(t.typ) +func (t *collectMetricsTask) runTask(cache *hotPeerCache) { + cache.collectMetrics(t.typ) } diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index e7055a38af3..0bebaaa7a5f 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -33,48 +33,48 @@ const ( type dimStat struct { typ RegionStatKind - Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. - LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. + rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. + lastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. } func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat { return &dimStat{ typ: typ, - Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), - LastAverage: movingaverage.NewAvgOverTime(reportInterval), + rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), + lastAverage: movingaverage.NewAvgOverTime(reportInterval), } } func (d *dimStat) Add(delta float64, interval time.Duration) { - d.LastAverage.Add(delta, interval) - d.Rolling.Add(delta, interval) + d.lastAverage.Add(delta, interval) + d.rolling.Add(delta, interval) } func (d *dimStat) isLastAverageHot(threshold float64) bool { - return d.LastAverage.Get() >= threshold + return d.lastAverage.Get() >= threshold } func (d *dimStat) isHot(threshold float64) bool { - return d.Rolling.Get() >= threshold + return d.rolling.Get() >= threshold } func (d *dimStat) isFull() bool { - return d.LastAverage.IsFull() + return d.lastAverage.IsFull() } func (d *dimStat) clearLastAverage() { - d.LastAverage.Clear() + d.lastAverage.Clear() } func (d *dimStat) Get() float64 { - return d.Rolling.Get() + return d.rolling.Get() } func (d *dimStat) Clone() *dimStat { return &dimStat{ typ: d.typ, - Rolling: d.Rolling.Clone(), - LastAverage: d.LastAverage.Clone(), + rolling: d.rolling.Clone(), + lastAverage: d.lastAverage.Clone(), } } @@ -97,11 +97,8 @@ type HotPeerStat struct { // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` - needDelete bool - isLeader bool - isNew bool - // TODO: remove it when we send peer stat by store info - justTransferLeader bool + actionType ActionType + isLeader bool interval uint64 thresholds []float64 peers []uint64 @@ -140,10 +137,9 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi zap.Int("hot-degree", stat.HotDegree), zap.Int("hot-anti-count", stat.AntiCount), zap.Duration("sum-interval", stat.getIntervalSum()), - zap.Bool("need-delete", stat.IsNeedDelete()), zap.String("source", stat.source.String()), zap.Bool("allow-adopt", stat.allowAdopt), - zap.Bool("just-transfer-leader", stat.justTransferLeader), + zap.String("action-type", stat.actionType.String()), zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime)) } @@ -152,22 +148,17 @@ func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool { return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.hotStatReportInterval()) } -// IsNeedDelete to delete the item in cache. -func (stat *HotPeerStat) IsNeedDelete() bool { - return stat.needDelete -} - // IsLeader indicates the item belong to the leader. func (stat *HotPeerStat) IsLeader() bool { return stat.isLeader } -// IsNew indicates the item is first update in the cache of the region. -func (stat *HotPeerStat) IsNew() bool { - return stat.isNew +// GetActionType returns the item action type. +func (stat *HotPeerStat) GetActionType() ActionType { + return stat.actionType } -// GetLoad returns denoised load if possible. +// GetLoad returns denoising load if possible. func (stat *HotPeerStat) GetLoad(k RegionStatKind) float64 { if len(stat.rollingLoads) > int(k) { return math.Round(stat.rollingLoads[int(k)].Get()) @@ -175,7 +166,7 @@ func (stat *HotPeerStat) GetLoad(k RegionStatKind) float64 { return math.Round(stat.Loads[int(k)]) } -// GetLoads returns denoised load if possible. +// GetLoads returns denoising load if possible. func (stat *HotPeerStat) GetLoads() []float64 { regionStats := stat.Kind.RegionStats() loads := make([]float64, len(regionStats)) @@ -185,7 +176,8 @@ func (stat *HotPeerStat) GetLoads() []float64 { return loads } -// GetThresholds returns thresholds +// GetThresholds returns thresholds. +// Only for test purpose. func (stat *HotPeerStat) GetThresholds() []float64 { return stat.thresholds } @@ -201,9 +193,9 @@ func (stat *HotPeerStat) Clone() *HotPeerStat { return &ret } -func (stat *HotPeerStat) isFullAndHot() bool { +func (stat *HotPeerStat) isHot() bool { return slice.AnyOf(stat.rollingLoads, func(i int) bool { - return stat.rollingLoads[i].isFull() && stat.rollingLoads[i].isLastAverageHot(stat.thresholds[i]) + return stat.rollingLoads[i].isLastAverageHot(stat.thresholds[i]) }) } @@ -224,5 +216,5 @@ func (stat *HotPeerStat) getIntervalSum() time.Duration { if len(stat.rollingLoads) == 0 || stat.rollingLoads[0] == nil { return 0 } - return stat.rollingLoads[0].LastAverage.GetIntervalSum() + return stat.rollingLoads[0].lastAverage.GetIntervalSum() } diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index c5a08db5dd8..7de9a8ceaca 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -61,6 +61,7 @@ type hotPeerCache struct { inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat topNTTL time.Duration reportIntervalSecs int + taskQueue chan flowItemTask } // NewHotPeerCache creates a hotPeerCache @@ -71,6 +72,7 @@ func NewHotPeerCache(kind RWType) *hotPeerCache { storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), inheritItem: make(map[uint64]*HotPeerStat), + taskQueue: make(chan flowItemTask, queueCap), } if kind == Write { c.reportIntervalSecs = WriteReportInterval @@ -98,14 +100,14 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { return res } -// Update updates the items in statistics. -func (f *hotPeerCache) Update(item *HotPeerStat) { - if item.IsNeedDelete() { +// update updates the items in statistics. +func (f *hotPeerCache) update(item *HotPeerStat) { + if item.actionType == Remove { if item.AntiCount > 0 { // means it's deleted because expired rather than cold f.putInheritItem(item) } f.removeItem(item) - item.Log("region heartbeat delete from cache", log.Debug) + item.Log("region heartbeat remove from cache", log.Debug) } else { f.putItem(item) item.Log("region heartbeat update", log.Debug) @@ -136,15 +138,15 @@ func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) { } } -// CollectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items -func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerStat { +// collectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items +func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerStat { regionID := region.GetID() items := make([]*HotPeerStat, 0) for _, storeID := range f.getAllStoreIDs(region) { if region.GetStorePeer(storeID) == nil { item := f.getOldHotPeerStat(regionID, storeID) if item != nil { - item.needDelete = true + item.actionType = Remove items = append(items, item) } } @@ -152,10 +154,10 @@ func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerSt return items } -// CheckPeerFlow checks the flow information of a peer. -// Notice: CheckPeerFlow couldn't be used concurrently. -// CheckPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { +// checkPeerFlow checks the flow information of a peer. +// Notice: checkPeerFlow couldn't be used concurrently. +// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. +func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { interval := peer.GetInterval() if Denoising && interval < HotRegionReportMinInterval { return nil @@ -167,7 +169,6 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf for i := range deltaLoads { loads[i] = deltaLoads[i] / float64(interval) } - justTransferLeader := f.justTransferLeader(region) oldItem := f.getOldHotPeerStat(region.GetID(), storeID) thresholds := f.calcHotThresholds(storeID) regionPeers := region.GetPeers() @@ -176,18 +177,17 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf peers = append(peers, peer.StoreId) } newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: region.GetID(), - Kind: f.kind, - Loads: loads, - LastUpdateTime: time.Now(), - needDelete: false, - isLeader: region.GetLeader().GetStoreId() == storeID, - justTransferLeader: justTransferLeader, - interval: interval, - peers: peers, - thresholds: thresholds, - source: direct, + StoreID: storeID, + RegionID: region.GetID(), + Kind: f.kind, + Loads: loads, + LastUpdateTime: time.Now(), + isLeader: region.GetLeader().GetStoreId() == storeID, + interval: interval, + peers: peers, + actionType: Update, + thresholds: thresholds, + source: direct, } if oldItem == nil { @@ -205,11 +205,11 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf } } } - return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) + return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) } -// CheckColdPeer checks the collect the un-heartbeat peer and maintain it. -func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]struct{}, interval uint64) (ret []*HotPeerStat) { +// checkColdPeer checks the collect the un-heartbeat peer and maintain it. +func (f *hotPeerCache) checkColdPeer(storeID uint64, reportRegions map[uint64]*core.RegionInfo, interval uint64) (ret []*HotPeerStat) { if Denoising && interval < HotRegionReportMinInterval { return } @@ -218,7 +218,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st return } for regionID := range previousHotStat { - if _, ok := reportRegions[regionID]; !ok { + if region, ok := reportRegions[regionID]; !ok { oldItem := f.getOldHotPeerStat(regionID, storeID) if oldItem == nil { continue @@ -228,21 +228,20 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st RegionID: regionID, Kind: f.kind, // use oldItem.thresholds to make the newItem won't affect the threshold - Loads: oldItem.thresholds, - LastUpdateTime: time.Now(), - needDelete: false, - isLeader: oldItem.isLeader, - justTransferLeader: oldItem.justTransferLeader, - interval: interval, - peers: oldItem.peers, - thresholds: oldItem.thresholds, - inCold: true, + Loads: oldItem.thresholds, + LastUpdateTime: time.Now(), + isLeader: oldItem.isLeader, + interval: interval, + peers: oldItem.peers, + actionType: Update, + thresholds: oldItem.thresholds, + inCold: true, } deltaLoads := make([]float64, RegionStatCount) for i, loads := range oldItem.thresholds { deltaLoads[i] = loads * float64(interval) } - stat := f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) + stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) if stat != nil { ret = append(ret, stat) } @@ -251,7 +250,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st return } -func (f *hotPeerCache) CollectMetrics(typ string) { +func (f *hotPeerCache) collectMetrics(typ string) { for storeID, peers := range f.peersOfStore { store := storeTag(storeID) thresholds := f.calcHotThresholds(storeID) @@ -338,6 +337,9 @@ func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool } func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo) bool { + if region == nil { + return false + } ids, ok := f.storesOfRegion[region.GetID()] if ok { for storeID := range ids { @@ -379,10 +381,10 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second) } -func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { +func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { regionStats := f.kind.RegionStats() if oldItem == nil { - return f.updateNewHotPeerStat(newItem, deltaLoads, interval) + return f.updateNewHotPeerStat(regionStats, newItem, deltaLoads, interval) } if newItem.source == adopt { @@ -395,14 +397,14 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa newItem.allowAdopt = oldItem.allowAdopt } - if newItem.justTransferLeader { + if f.justTransferLeader(region) { newItem.lastTransferLeaderTime = time.Now() // skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate // maintain anticount and hotdegree to avoid store threshold and hot peer are unstable. // For write stat, as the stat is send by region heartbeat, the first heartbeat will be skipped. // For read stat, as the stat is send by store heartbeat, the first heartbeat won't be skipped. if newItem.Kind == Write { - inheritItemDegree(newItem, oldItem) + inheritItem(newItem, oldItem) return newItem } } else { @@ -416,7 +418,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa isFull := newItem.rollingLoads[0].isFull() // The intervals of dims are the same, so it is only necessary to determine whether any of them if !isFull { // not update hot degree and anti count - inheritItemDegree(newItem, oldItem) + inheritItem(newItem, oldItem) } else { // If item is inCold, it means the pd didn't recv this item in the store heartbeat, // thus we make it colder @@ -424,13 +426,13 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa coldItem(newItem, oldItem) } else { if f.isOldColdPeer(oldItem, newItem.StoreID) { - if newItem.isFullAndHot() { - initItemDegree(newItem) + if newItem.isHot() { + initItem(newItem) } else { - newItem.needDelete = true + newItem.actionType = Remove } } else { - if newItem.isFullAndHot() { + if newItem.isHot() { hotItem(newItem, oldItem) } else { coldItem(newItem, oldItem) @@ -442,8 +444,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa return newItem } -func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { - regionStats := f.kind.RegionStats() +func (f *hotPeerCache) updateNewHotPeerStat(regionStats []RegionStatKind, newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { if interval == 0 { return nil } @@ -454,9 +455,9 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f return nil } if interval.Seconds() >= float64(f.reportIntervalSecs) { - initItemDegree(newItem) + initItem(newItem) } - newItem.isNew = true + newItem.actionType = Add newItem.rollingLoads = make([]*dimStat, len(regionStats)) for i, k := range regionStats { ds := newDimStat(k, time.Duration(newItem.hotStatReportInterval())*time.Second) @@ -522,7 +523,7 @@ func coldItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree - 1 newItem.AntiCount = oldItem.AntiCount - 1 if newItem.AntiCount <= 0 { - newItem.needDelete = true + newItem.actionType = Remove } else { newItem.allowAdopt = true } @@ -537,7 +538,7 @@ func hotItem(newItem, oldItem *HotPeerStat) { } } -func initItemDegree(item *HotPeerStat) { +func initItem(item *HotPeerStat) { item.HotDegree = 1 item.AntiCount = hotRegionAntiCount item.allowAdopt = true @@ -546,7 +547,7 @@ func initItemDegree(item *HotPeerStat) { } } -func inheritItemDegree(newItem, oldItem *HotPeerStat) { +func inheritItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree newItem.AntiCount = oldItem.AntiCount } diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index c3ce13f5fb5..3ef1c03ed12 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -72,17 +72,17 @@ type testCacheCase struct { kind RWType operator operator expect int - needDelete bool + actionType ActionType } func (t *testHotPeerCache) TestCache(c *C) { tests := []*testCacheCase{ - {Read, transferLeader, 3, false}, - {Read, movePeer, 4, true}, - {Read, addReplica, 4, false}, - {Write, transferLeader, 3, true}, - {Write, movePeer, 4, true}, - {Write, addReplica, 4, true}, + {Read, transferLeader, 3, Update}, + {Read, movePeer, 4, Remove}, + {Read, addReplica, 4, Update}, + {Write, transferLeader, 3, Remove}, + {Write, movePeer, 4, Remove}, + {Write, addReplica, 4, Remove}, } for _, t := range tests { testCache(c, t) @@ -97,13 +97,13 @@ func testCache(c *C, t *testCacheCase) { cache := NewHotPeerCache(t.kind) region := buildRegion(t.kind, 3, 60) checkAndUpdate(c, cache, region, defaultSize[t.kind]) - checkHit(c, cache, region, t.kind, false) // all peers are new + checkHit(c, cache, region, t.kind, Add) // all peers are new srcStore, region := schedule(c, t.operator, region, 10) res := checkAndUpdate(c, cache, region, t.expect) - checkHit(c, cache, region, t.kind, true) // hit cache + checkHit(c, cache, region, t.kind, Update) // hit cache if t.expect != defaultSize[t.kind] { - checkNeedDelete(c, res, srcStore, t.needDelete) + checkOp(c, res, srcStore, t.actionType) } } @@ -122,10 +122,10 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - res = append(res, cache.CollectExpiredItems(region)...) + res = append(res, cache.collectExpiredItems(region)...) for _, peer := range peers { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.CheckPeerFlow(peerInfo, region) + item := cache.checkPeerFlow(peerInfo, region) if item != nil { res = append(res, item) } @@ -135,7 +135,7 @@ func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Pee func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { for _, p := range res { - cache.Update(p) + cache.update(p) } return res } @@ -168,7 +168,7 @@ func checkAndUpdateSkipOne(c *C, cache *hotPeerCache, region *core.RegionInfo, e return updateFlow(cache, res) } -func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, isHit bool) { +func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, actionType ActionType) { var peers []*metapb.Peer if kind == Read { peers = []*metapb.Peer{region.GetLeader()} @@ -178,14 +178,14 @@ func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, i for _, peer := range peers { item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) c.Assert(item, NotNil) - c.Assert(item.isNew, Equals, !isHit) + c.Assert(item.actionType, Equals, actionType) } } -func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64, needDelete bool) { +func checkOp(c *C, ret []*HotPeerStat, storeID uint64, actionType ActionType) { for _, item := range ret { if item.StoreID == storeID { - c.Assert(item.needDelete, Equals, needDelete) + c.Assert(item.actionType, Equals, actionType) return } } @@ -296,55 +296,55 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { m := RegionHeartBeatReportInterval / StoreHeartBeatReportInterval // skip interval=0 - newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 0) + newItem := &HotPeerStat{actionType: Update, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{0.0, 0.0, 0.0}, 0) c.Check(newItem, IsNil) // new peer, interval is larger than report interval, but no hot - newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) + newItem = &HotPeerStat{actionType: Update, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) c.Check(newItem, IsNil) // new peer, interval is less than report interval - newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = &HotPeerStat{actionType: Update, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem, NotNil) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is less than report interval oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is less than report interval oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 2) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and cold oldItem = newItem newItem.thresholds = []float64{10.0, 10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m-1) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) } c.Check(newItem.HotDegree, Less, 0) c.Check(newItem.AntiCount, Equals, 0) - c.Check(newItem.needDelete, IsTrue) + c.Check(newItem.actionType, Equals, Remove) } func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { @@ -369,7 +369,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold Kind: cache.kind, StoreID: storeID, RegionID: i, - needDelete: false, + actionType: Update, thresholds: thresholds, Loads: make([]float64, DimLen), } @@ -379,8 +379,8 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true { break } - item := cache.updateHotPeerStat(newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) - cache.Update(item) + item := cache.updateHotPeerStat(nil, newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) + cache.update(item) } thresholds := cache.calcHotThresholds(storeID) if i < TopNN { @@ -493,13 +493,13 @@ func BenchmarkCheckRegionFlow(b *testing.B) { for i := 0; i < b.N; i++ { items := make([]*HotPeerStat, 0) for _, peerInfo := range peerInfos { - item := cache.CheckPeerFlow(peerInfo, region) + item := cache.checkPeerFlow(peerInfo, region) if item != nil { items = append(items, item) } } for _, ret := range items { - cache.Update(ret) + cache.update(ret) } } } diff --git a/server/statistics/kind.go b/server/statistics/kind.go index e5965fcbfcb..65ec300f7ac 100644 --- a/server/statistics/kind.go +++ b/server/statistics/kind.go @@ -147,3 +147,25 @@ func (k RWType) RegionStats() []RegionStatKind { } return nil } + +// ActionType indicates the action type for the stat item. +type ActionType int + +// Flags for action type. +const ( + Add ActionType = iota + Remove + Update +) + +func (t ActionType) String() string { + switch t { + case Add: + return "add" + case Remove: + return "remove" + case Update: + return "update" + } + return "unimplemented" +}