diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index f8d18afbb86..3b4e2009423 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -72,17 +72,17 @@ func (mc *Cluster) GetStoreRegionCount(storeID uint64) int { } // IsRegionHot checks if the region is hot. -func (mc *Cluster) IsRegionHot(id uint64) bool { - return mc.HotSpotCache.IsRegionHot(id, mc.GetHotRegionCacheHitsThreshold()) +func (mc *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return mc.HotSpotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold()) } // RegionReadStats returns hot region's read stats. -func (mc *Cluster) RegionReadStats() []*statistics.RegionStat { +func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotSpotPeerStat { return mc.HotSpotCache.RegionStats(statistics.ReadFlow) } // RegionWriteStats returns hot region's write stats. -func (mc *Cluster) RegionWriteStats() []*statistics.RegionStat { +func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat { return mc.HotSpotCache.RegionStats(statistics.WriteFlow) } @@ -231,9 +231,9 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, followerIds ...uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetReadBytes(readBytes)) - isUpdate, item := mc.HotSpotCache.CheckRead(r, mc.StoresStats) - if isUpdate { - mc.HotSpotCache.Update(regionID, item, statistics.ReadFlow) + items := mc.HotSpotCache.CheckRead(r, mc.StoresStats) + for _, item := range items { + mc.HotSpotCache.Update(item) } mc.PutRegion(r) } @@ -242,9 +242,9 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, followerIds ...uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetWrittenBytes(writtenBytes)) - isUpdate, item := mc.HotSpotCache.CheckWrite(r, mc.StoresStats) - if isUpdate { - mc.HotSpotCache.Update(regionID, item, statistics.WriteFlow) + items := mc.HotSpotCache.CheckWrite(r, mc.StoresStats) + for _, item := range items { + mc.HotSpotCache.Update(item) } mc.PutRegion(r) } diff --git a/server/api/region.go b/server/api/region.go index 4290166e32c..3f0c59c04e2 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -39,6 +39,8 @@ type RegionInfo struct { PendingPeers []*metapb.Peer `json:"pending_peers,omitempty"` WrittenBytes uint64 `json:"written_bytes,omitempty"` ReadBytes uint64 `json:"read_bytes,omitempty"` + WrittenKeys uint64 `json:"written_keys,omitempty"` + ReadKeys uint64 `json:"read_keys,omitempty"` ApproximateSize int64 `json:"approximate_size,omitempty"` ApproximateKeys int64 `json:"approximate_keys,omitempty"` } diff --git a/server/checker/merge_checker.go b/server/checker/merge_checker.go index 09ba38dc6a3..b5f78fe8b4b 100644 --- a/server/checker/merge_checker.go +++ b/server/checker/merge_checker.go @@ -95,7 +95,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { } // skip hot region - if m.cluster.IsRegionHot(region.GetID()) { + if m.cluster.IsRegionHot(region) { checkerCounter.WithLabelValues("merge_checker", "hot_region").Inc() return nil } @@ -130,7 +130,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { func (m *MergeChecker) checkTarget(region, adjacent, target *core.RegionInfo) *core.RegionInfo { // if is not hot region and under same namespace - if adjacent != nil && !m.cluster.IsRegionHot(adjacent.GetID()) && + if adjacent != nil && !m.cluster.IsRegionHot(adjacent) && m.classifier.AllowMerge(region, adjacent) && len(adjacent.GetDownPeers()) == 0 && len(adjacent.GetPendingPeers()) == 0 && len(adjacent.GetLearners()) == 0 { // if both region is not hot, prefer the one with smaller size diff --git a/server/cluster_info.go b/server/cluster_info.go index 18f413e400b..80b67d42363 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -314,10 +314,10 @@ func (c *clusterInfo) GetRegion(regionID uint64) *core.RegionInfo { } // IsRegionHot checks if a region is in hot state. -func (c *clusterInfo) IsRegionHot(id uint64) bool { +func (c *clusterInfo) IsRegionHot(region *core.RegionInfo) bool { c.RLock() defer c.RUnlock() - return c.hotSpotCache.IsRegionHot(id, c.GetHotRegionCacheHitsThreshold()) + return c.hotSpotCache.IsRegionHot(region, c.GetHotRegionCacheHitsThreshold()) } // RandHotRegionFromStore randomly picks a hot region in specified store. @@ -523,8 +523,8 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { } } } - isWriteUpdate, writeItem := c.CheckWriteStatus(region) - isReadUpdate, readItem := c.CheckReadStatus(region) + writeItems := c.CheckWriteStatus(region) + readItems := c.CheckReadStatus(region) c.RUnlock() // Save to storage if meta is updated. @@ -605,7 +605,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { default: } } - if !isWriteUpdate && !isReadUpdate && !saveCache && !isNew { + if len(writeItems) == 0 && len(readItems) == 0 && !saveCache && !isNew { return nil } @@ -649,12 +649,11 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { c.regionStats.Observe(region, c.takeRegionStoresLocked(region)) } - key := region.GetID() - if isWriteUpdate { - c.hotSpotCache.Update(key, writeItem, statistics.WriteFlow) + for _, writeItem := range writeItems { + c.hotSpotCache.Update(writeItem) } - if isReadUpdate { - c.hotSpotCache.Update(key, readItem, statistics.ReadFlow) + for _, readItem := range readItems { + c.hotSpotCache.Update(readItem) } return nil } @@ -816,24 +815,24 @@ func (c *clusterInfo) CheckLabelProperty(typ string, labels []*metapb.StoreLabel } // RegionReadStats returns hot region's read stats. -func (c *clusterInfo) RegionReadStats() []*statistics.RegionStat { +func (c *clusterInfo) RegionReadStats() map[uint64][]*statistics.HotSpotPeerStat { // RegionStats is a thread-safe method return c.hotSpotCache.RegionStats(statistics.ReadFlow) } // RegionWriteStats returns hot region's write stats. -func (c *clusterInfo) RegionWriteStats() []*statistics.RegionStat { +func (c *clusterInfo) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat { // RegionStats is a thread-safe method return c.hotSpotCache.RegionStats(statistics.WriteFlow) } // CheckWriteStatus checks the write status, returns whether need update statistics and item. -func (c *clusterInfo) CheckWriteStatus(region *core.RegionInfo) (bool, *statistics.RegionStat) { +func (c *clusterInfo) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotSpotPeerStat { return c.hotSpotCache.CheckWrite(region, c.storesStats) } // CheckReadStatus checks the read status, returns whether need update statistics and item. -func (c *clusterInfo) CheckReadStatus(region *core.RegionInfo) (bool, *statistics.RegionStat) { +func (c *clusterInfo) CheckReadStatus(region *core.RegionInfo) []*statistics.HotSpotPeerStat { return c.hotSpotCache.CheckRead(region, c.storesStats) } diff --git a/server/core/region.go b/server/core/region.go index e04719411d5..75620f73501 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -36,7 +36,9 @@ type RegionInfo struct { downPeers []*pdpb.PeerStats pendingPeers []*metapb.Peer writtenBytes uint64 + writtenKeys uint64 readBytes uint64 + readKeys uint64 approximateSize int64 approximateKeys int64 } @@ -89,7 +91,9 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo { downPeers: heartbeat.GetDownPeers(), pendingPeers: heartbeat.GetPendingPeers(), writtenBytes: heartbeat.GetBytesWritten(), + writtenKeys: heartbeat.GetKeysWritten(), readBytes: heartbeat.GetBytesRead(), + readKeys: heartbeat.GetKeysRead(), approximateSize: int64(regionSize), approximateKeys: int64(heartbeat.GetApproximateKeys()), } @@ -115,7 +119,9 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo { downPeers: downPeers, pendingPeers: pendingPeers, writtenBytes: r.writtenBytes, + writtenKeys: r.writtenKeys, readBytes: r.readBytes, + readKeys: r.readKeys, approximateSize: r.approximateSize, approximateKeys: r.approximateKeys, } @@ -328,6 +334,16 @@ func (r *RegionInfo) GetBytesWritten() uint64 { return r.writtenBytes } +// GetKeysWritten returns the written keys of the region. +func (r *RegionInfo) GetKeysWritten() uint64 { + return r.writtenKeys +} + +// GetKeysRead returns the read keys of the region. +func (r *RegionInfo) GetKeysRead() uint64 { + return r.readKeys +} + // GetLeader returns the leader of the region. func (r *RegionInfo) GetLeader() *metapb.Peer { return r.leader diff --git a/server/namespace_cluster.go b/server/namespace_cluster.go index 7e3d669276a..3b21816dfef 100644 --- a/server/namespace_cluster.go +++ b/server/namespace_cluster.go @@ -126,15 +126,8 @@ func (c *namespaceCluster) GetRegion(id uint64) *core.RegionInfo { } // RegionWriteStats returns hot region's write stats. -func (c *namespaceCluster) RegionWriteStats() []*statistics.RegionStat { - allStats := c.Cluster.RegionWriteStats() - stats := make([]*statistics.RegionStat, 0, len(allStats)) - for _, s := range allStats { - if c.GetRegion(s.RegionID) != nil { - stats = append(stats, s) - } - } - return stats +func (c *namespaceCluster) RegionWriteStats() map[uint64][]*statistics.HotSpotPeerStat { + return c.Cluster.RegionWriteStats() } func scheduleByNamespace(cluster schedule.Cluster, classifier namespace.Classifier, scheduler schedule.Scheduler) []*operator.Operator { diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 08985733976..0cc6ba38839 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -85,7 +85,7 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio // Scatter relocates the region. func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*operator.Operator, error) { - if r.cluster.IsRegionHot(region.GetID()) { + if r.cluster.IsRegionHot(region) { return nil, errors.Errorf("region %d is a hot region", region.GetID()) } diff --git a/server/schedulers/adjacent_region.go b/server/schedulers/adjacent_region.go index e42e59bb6ea..7267b3c4bd5 100644 --- a/server/schedulers/adjacent_region.go +++ b/server/schedulers/adjacent_region.go @@ -247,7 +247,7 @@ func (l *balanceAdjacentRegionScheduler) unsafeToBalance(cluster schedule.Cluste return true } // Skip hot regions. - if cluster.IsRegionHot(region.GetID()) { + if cluster.IsRegionHot(region) { schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc() return true } diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 2f4b021dcd1..00545ad9fe5 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -197,9 +197,8 @@ func (l *balanceLeaderScheduler) transferLeaderIn(target *core.StoreInfo, cluste // no new operator need to be created, otherwise create an operator that transfers // the leader from the source store to the target store for the region. func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, target *core.StoreInfo, cluster schedule.Cluster, opInfluence operator.OpInfluence) []*operator.Operator { - regionID := region.GetID() - if cluster.IsRegionHot(regionID) { - log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", regionID)) + if cluster.IsRegionHot(region) { + log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", region.GetID())) schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc() return nil } @@ -208,7 +207,7 @@ func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, targetID := target.GetID() if !shouldBalance(cluster, source, target, region, core.LeaderKind, opInfluence) { log.Debug("skip balance leader", - zap.String("scheduler", l.GetName()), zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID), + zap.String("scheduler", l.GetName()), zap.Uint64("region-id", region.GetID()), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID), zap.Int64("source-size", source.GetLeaderSize()), zap.Float64("source-score", source.LeaderScore(0)), zap.Int64("source-influence", opInfluence.GetStoreInfluence(sourceID).ResourceSize(core.LeaderKind)), zap.Int64("target-size", target.GetLeaderSize()), zap.Float64("target-score", target.LeaderScore(0)), diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 5f2ad2599a3..fa72aac24c0 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -161,7 +161,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*operator. } // Skip hot regions. - if cluster.IsRegionHot(region.GetID()) { + if cluster.IsRegionHot(region) { log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID())) schedulerCounter.WithLabelValues(s.GetName(), "region_hot").Inc() s.hitsCounter.put(source, nil) diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 35469eb1eca..ddda26abc1e 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -935,6 +935,7 @@ var _ = Suite(&testBalanceHotWriteRegionSchedulerSuite{}) type testBalanceHotWriteRegionSchedulerSuite struct{} func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { + statistics.Denoising = false opt := mockoption.NewScheduleOptions() newTestReplication(opt, 3, "zone", "host") tc := mockcluster.NewCluster(opt) @@ -1038,6 +1039,8 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { hb.Schedule(tc) } +var _ = Suite(&testBalanceHotReadRegionSchedulerSuite{}) + type testBalanceHotReadRegionSchedulerSuite struct{} func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { @@ -1072,17 +1075,19 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { // lower than hot read flow rate, but higher than write flow rate tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*statistics.RegionHeartBeatReportInterval, 2, 3) opt.HotRegionCacheHitsThreshold = 0 - c.Assert(tc.IsRegionHot(1), IsTrue) - c.Assert(tc.IsRegionHot(11), IsFalse) + c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) + c.Assert(tc.IsRegionHot(tc.GetRegion(11)), IsFalse) // check randomly pick hot region r := tc.RandHotRegionFromStore(2, statistics.ReadFlow) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, uint64(2)) // check hot items stats := tc.HotSpotCache.RegionStats(statistics.ReadFlow) - c.Assert(len(stats), Equals, 3) - for _, s := range stats { - c.Assert(s.FlowBytes, Equals, uint64(512*1024)) + c.Assert(len(stats), Equals, 2) + for _, ss := range stats { + for _, s := range ss { + c.Assert(s.FlowBytes, Equals, uint64(512*1024)) + } } // Will transfer a hot region leader from store 1 to store 3, because the total count of peers // which is hot for store 1 is more larger than other stores. @@ -1111,6 +1116,72 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { hb.Schedule(tc) } +var _ = Suite(&testBalanceHotCacheSuite{}) + +type testBalanceHotCacheSuite struct{} + +func (s *testBalanceHotCacheSuite) TestUpdateCache(c *C) { + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + + // Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0. + tc.AddRegionStore(1, 3) + tc.AddRegionStore(2, 2) + tc.AddRegionStore(3, 2) + tc.AddRegionStore(4, 2) + tc.AddRegionStore(5, 0) + + // Report store read bytes. + tc.UpdateStorageReadBytes(1, 75*1024*1024) + tc.UpdateStorageReadBytes(2, 45*1024*1024) + tc.UpdateStorageReadBytes(3, 45*1024*1024) + tc.UpdateStorageReadBytes(4, 60*1024*1024) + tc.UpdateStorageReadBytes(5, 0) + + /// For read flow + tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, 1, 3) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + // lower than hot read flow rate, but higher than write flow rate + tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + opt.HotRegionCacheHitsThreshold = 0 + stats := tc.RegionStats(statistics.ReadFlow) + c.Assert(len(stats[1]), Equals, 2) + c.Assert(len(stats[2]), Equals, 1) + c.Assert(len(stats[3]), Equals, 0) + + tc.AddLeaderRegionWithReadInfo(3, 2, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + stats = tc.RegionStats(statistics.ReadFlow) + + c.Assert(len(stats[1]), Equals, 1) + c.Assert(len(stats[2]), Equals, 2) + c.Assert(len(stats[3]), Equals, 0) + + // For write flow + tc.UpdateStorageWrittenBytes(1, 60*1024*1024) + tc.UpdateStorageWrittenBytes(2, 30*1024*1024) + tc.UpdateStorageWrittenBytes(3, 60*1024*1024) + tc.UpdateStorageWrittenBytes(4, 30*1024*1024) + tc.UpdateStorageWrittenBytes(5, 0*1024*1024) + tc.AddLeaderRegionWithWriteInfo(4, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(5, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(6, 1, 12*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + + stats = tc.RegionStats(statistics.WriteFlow) + c.Assert(len(stats[1]), Equals, 2) + c.Assert(len(stats[2]), Equals, 2) + c.Assert(len(stats[3]), Equals, 2) + + tc.AddLeaderRegionWithWriteInfo(5, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 5) + stats = tc.RegionStats(statistics.WriteFlow) + + c.Assert(len(stats[1]), Equals, 2) + c.Assert(len(stats[2]), Equals, 2) + c.Assert(len(stats[3]), Equals, 1) + c.Assert(len(stats[5]), Equals, 1) +} + var _ = Suite(&testScatterRangeLeaderSuite{}) type testScatterRangeLeaderSuite struct{} diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 7795e8b1924..01a07e2b9fa 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -220,32 +220,26 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu return nil } -func calcScore(items []*statistics.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat { +func calcScore(storeItems map[uint64][]*statistics.HotSpotPeerStat, cluster schedule.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat { stats := make(statistics.StoreHotRegionsStat) - for _, r := range items { + for storeID, items := range storeItems { // HotDegree is the update times on the hot cache. If the heartbeat report // the flow of the region exceeds the threshold, the scheduler will update the region in // the hot cache and the hotdegree of the region will increase. - if r.HotDegree < cluster.GetHotRegionCacheHitsThreshold() { - continue - } - regionInfo := cluster.GetRegion(r.RegionID) - if regionInfo == nil { - continue - } + for _, r := range items { + if kind == core.LeaderKind && !r.IsLeader() { + continue + } + if r.HotDegree < cluster.GetHotRegionCacheHitsThreshold() { + continue + } - var storeIDs []uint64 - switch kind { - case core.RegionKind: - for id := range regionInfo.GetStoreIds() { - storeIDs = append(storeIDs, id) + regionInfo := cluster.GetRegion(r.RegionID) + if regionInfo == nil { + continue } - case core.LeaderKind: - storeIDs = append(storeIDs, regionInfo.GetLeader().GetStoreId()) - } - for _, storeID := range storeIDs { storeStat, ok := stats[storeID] if !ok { storeStat = &statistics.HotRegionsStat{ @@ -254,7 +248,7 @@ func calcScore(items []*statistics.RegionStat, cluster schedule.Cluster, kind co stats[storeID] = storeStat } - s := statistics.RegionStat{ + s := statistics.HotSpotPeerStat{ RegionID: r.RegionID, FlowBytes: uint64(r.Stats.Median()), HotDegree: r.HotDegree, diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index e143a131285..467a67c5264 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -387,7 +387,7 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, 1, 3) tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) opt.HotRegionCacheHitsThreshold = 0 - c.Assert(tc.IsRegionHot(1), IsTrue) + c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) c.Assert(hb.Schedule(tc), IsNil) } diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index 0751d52a188..69b89e10df2 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -14,16 +14,18 @@ package statistics import ( + "fmt" "math/rand" "time" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" ) -// Simulating is an option to overpass the impact of accelerated time. Should -// only turned on by the simulator. -var Simulating bool +// Denoising is an option to calculate flow base on the real heartbeats. Should +// only turned off by the simulator and the test. +var Denoising = true const ( // RegionHeartBeatReportInterval is the heartbeat report interval of a region. @@ -32,6 +34,7 @@ const ( StoreHeartBeatReportInterval = 10 statCacheMaxLen = 1000 + storeStatCacheMaxLen = 200 hotWriteRegionMinFlowRate = 16 * 1024 hotReadRegionMinFlowRate = 128 * 1024 minHotRegionReportInterval = 3 @@ -47,180 +50,348 @@ const ( ReadFlow ) -// HotSpotCache is a cache hold hot regions. -type HotSpotCache struct { - writeFlow cache.Cache - readFlow cache.Cache +func (k FlowKind) String() string { + switch k { + case WriteFlow: + return "write" + case ReadFlow: + return "read" + } + return "unimplemented" } -// NewHotSpotCache creates a new hot spot cache. -func NewHotSpotCache() *HotSpotCache { - return &HotSpotCache{ - writeFlow: cache.NewCache(statCacheMaxLen, cache.TwoQueueCache), - readFlow: cache.NewCache(statCacheMaxLen, cache.TwoQueueCache), +// HotStoresStats saves the hotspot peer's statistics. +type HotStoresStats struct { + hotStoreStats map[uint64]cache.Cache // storeID -> hot regions + storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs +} + +// NewHotStoresStats creates a HotStoresStats +func NewHotStoresStats() *HotStoresStats { + return &HotStoresStats{ + hotStoreStats: make(map[uint64]cache.Cache), + storesOfRegion: make(map[uint64]map[uint64]struct{}), } } -// CheckWrite checks the write status, returns whether need update statistics and item. -func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stats *StoresStats) (bool, *RegionStat) { +// CheckRegionFlow checks the flow information of region. +func (f *HotStoresStats) CheckRegionFlow(region *core.RegionInfo, kind FlowKind) []HotSpotPeerStatGenerator { var ( - WrittenBytesPerSec uint64 - value *RegionStat - ) + generators []HotSpotPeerStatGenerator + getBytesFlow func() uint64 + getKeysFlow func() uint64 + bytesPerSec uint64 + keysPerSec uint64 - WrittenBytesPerSec = uint64(float64(region.GetBytesWritten()) / float64(RegionHeartBeatReportInterval)) + isExpiredInStore func(region *core.RegionInfo, storeID uint64) bool + ) - v, isExist := w.writeFlow.Peek(region.GetID()) - if isExist { - value = v.(*RegionStat) - // This is used for the simulator. - if !Simulating { - interval := time.Since(value.LastUpdateTime).Seconds() - if interval < minHotRegionReportInterval { - return false, nil - } - WrittenBytesPerSec = uint64(float64(region.GetBytesWritten()) / interval) + storeIDs := make(map[uint64]struct{}) + // gets the storeIDs, including old region and new region + ids, ok := f.storesOfRegion[region.GetID()] + if ok { + for storeID := range ids { + storeIDs[storeID] = struct{}{} } } - hotRegionThreshold := calculateWriteHotThreshold(stats) - return w.isNeedUpdateStatCache(region, WrittenBytesPerSec, hotRegionThreshold, value, WriteFlow) -} + for _, peer := range region.GetPeers() { + // ReadFlow no need consider the followers. + if kind == ReadFlow && peer.GetStoreId() != region.GetLeader().GetStoreId() { + continue + } + if _, ok := storeIDs[peer.GetStoreId()]; !ok { + storeIDs[peer.GetStoreId()] = struct{}{} + } + } -// CheckRead checks the read status, returns whether need update statistics and item. -func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stats *StoresStats) (bool, *RegionStat) { - var ( - ReadBytesPerSec uint64 - value *RegionStat - ) + switch kind { + case WriteFlow: + getBytesFlow = region.GetBytesWritten + getKeysFlow = region.GetKeysWritten + isExpiredInStore = func(region *core.RegionInfo, storeID uint64) bool { + return region.GetStorePeer(storeID) == nil + } + case ReadFlow: + getBytesFlow = region.GetBytesRead + getKeysFlow = region.GetKeysRead + isExpiredInStore = func(region *core.RegionInfo, storeID uint64) bool { + return region.GetLeader().GetStoreId() != storeID + } + } - ReadBytesPerSec = uint64(float64(region.GetBytesRead()) / float64(RegionHeartBeatReportInterval)) + bytesPerSecInit := uint64(float64(getBytesFlow()) / float64(RegionHeartBeatReportInterval)) + keysPerSecInit := uint64(float64(getKeysFlow()) / float64(RegionHeartBeatReportInterval)) + for storeID := range storeIDs { + bytesPerSec = bytesPerSecInit + keysPerSec = keysPerSecInit + var oldRegionStat *HotSpotPeerStat - v, isExist := w.readFlow.Peek(region.GetID()) - if isExist { - value = v.(*RegionStat) - // This is used for the simulator. - if !Simulating { - interval := time.Since(value.LastUpdateTime).Seconds() - if interval < minHotRegionReportInterval { - return false, nil + hotStoreStats, ok := f.hotStoreStats[storeID] + if ok { + if v, isExist := hotStoreStats.Peek(region.GetID()); isExist { + oldRegionStat = v.(*HotSpotPeerStat) + // This is used for the simulator. + if Denoising { + interval := time.Since(oldRegionStat.LastUpdateTime).Seconds() + if interval < minHotRegionReportInterval && !isExpiredInStore(region, storeID) { + continue + } + bytesPerSec = uint64(float64(getBytesFlow()) / interval) + keysPerSec = uint64(float64(getKeysFlow()) / interval) + } } - ReadBytesPerSec = uint64(float64(region.GetBytesRead()) / interval) } - } - hotRegionThreshold := calculateReadHotThreshold(stats) - return w.isNeedUpdateStatCache(region, ReadBytesPerSec, hotRegionThreshold, value, ReadFlow) + generator := &hotSpotPeerStatGenerator{ + Region: region, + StoreID: storeID, + FlowBytes: bytesPerSec, + FlowKeys: keysPerSec, + Kind: kind, + + lastHotSpotPeerStats: oldRegionStat, + } + + if isExpiredInStore(region, storeID) { + generator.Expired = true + } + generators = append(generators, generator) + } + return generators } -func (w *HotSpotCache) incMetrics(name string, kind FlowKind) { - switch kind { - case WriteFlow: - hotCacheStatusGauge.WithLabelValues(name, "write").Inc() - case ReadFlow: - hotCacheStatusGauge.WithLabelValues(name, "read").Inc() +// Update updates the items in statistics. +func (f *HotStoresStats) Update(item *HotSpotPeerStat) { + if item.IsNeedDelete() { + if hotStoreStat, ok := f.hotStoreStats[item.StoreID]; ok { + hotStoreStat.Remove(item.RegionID) + } + if index, ok := f.storesOfRegion[item.RegionID]; ok { + delete(index, item.StoreID) + } + } else { + hotStoreStat, ok := f.hotStoreStats[item.StoreID] + if !ok { + hotStoreStat = cache.NewCache(statCacheMaxLen, cache.TwoQueueCache) + f.hotStoreStats[item.StoreID] = hotStoreStat + } + hotStoreStat.Put(item.RegionID, item) + index, ok := f.storesOfRegion[item.RegionID] + if !ok { + index = make(map[uint64]struct{}) + } + index[item.StoreID] = struct{}{} + f.storesOfRegion[item.RegionID] = index } } -func calculateWriteHotThreshold(stats *StoresStats) uint64 { - // hotRegionThreshold is used to pick hot region - // suppose the number of the hot Regions is statCacheMaxLen - // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions - // divide 2 because the store reports data about two times than the region record write to rocksdb - divisor := float64(statCacheMaxLen) * 2 - hotRegionThreshold := uint64(stats.TotalBytesWriteRate() / divisor) +func (f *HotStoresStats) isRegionHotWithAnyPeers(region *core.RegionInfo, hotThreshold int) bool { + for _, peer := range region.GetPeers() { + if f.isRegionHotWithPeer(region, peer, hotThreshold) { + return true + } + } + return false - if hotRegionThreshold < hotWriteRegionMinFlowRate { - hotRegionThreshold = hotWriteRegionMinFlowRate +} + +func (f *HotStoresStats) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb.Peer, hotThreshold int) bool { + if peer == nil { + return false } - return hotRegionThreshold + storeID := peer.GetStoreId() + stats, ok := f.hotStoreStats[storeID] + if !ok { + return false + } + if stat, ok := stats.Peek(region.GetID()); ok { + return stat.(*HotSpotPeerStat).HotDegree >= hotThreshold + } + return false } -func calculateReadHotThreshold(stats *StoresStats) uint64 { - // hotRegionThreshold is used to pick hot region - // suppose the number of the hot Regions is statCacheMaxLen - // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions - divisor := float64(statCacheMaxLen) - hotRegionThreshold := uint64(stats.TotalBytesReadRate() / divisor) +// HotSpotPeerStatGenerator used to produce new hotspot statistics. +type HotSpotPeerStatGenerator interface { + GenHotSpotPeerStats(stats *StoresStats) *HotSpotPeerStat +} - if hotRegionThreshold < hotReadRegionMinFlowRate { - hotRegionThreshold = hotReadRegionMinFlowRate - } - return hotRegionThreshold +// hotSpotPeerStatBuilder used to produce new hotspot statistics. +type hotSpotPeerStatGenerator struct { + Region *core.RegionInfo + StoreID uint64 + FlowKeys uint64 + FlowBytes uint64 + Expired bool + Kind FlowKind + + lastHotSpotPeerStats *HotSpotPeerStat } const rollingWindowsSize = 5 -func (w *HotSpotCache) isNeedUpdateStatCache(region *core.RegionInfo, flowBytes uint64, hotRegionThreshold uint64, oldItem *RegionStat, kind FlowKind) (bool, *RegionStat) { - newItem := NewRegionStat(region, flowBytes, hotRegionAntiCount) +// GenHotSpotPeerStats implements HotSpotPeerStatsGenerator. +func (flowStats *hotSpotPeerStatGenerator) GenHotSpotPeerStats(stats *StoresStats) *HotSpotPeerStat { + var hotRegionThreshold uint64 + switch flowStats.Kind { + case WriteFlow: + hotRegionThreshold = calculateWriteHotThresholdWithStore(stats, flowStats.StoreID) + case ReadFlow: + hotRegionThreshold = calculateReadHotThresholdWithStore(stats, flowStats.StoreID) + } + flowBytes := flowStats.FlowBytes + oldItem := flowStats.lastHotSpotPeerStats + region := flowStats.Region + newItem := &HotSpotPeerStat{ + RegionID: region.GetID(), + FlowBytes: flowStats.FlowBytes, + FlowKeys: flowStats.FlowKeys, + LastUpdateTime: time.Now(), + StoreID: flowStats.StoreID, + Version: region.GetMeta().GetRegionEpoch().GetVersion(), + AntiCount: hotRegionAntiCount, + Kind: flowStats.Kind, + needDelete: flowStats.Expired, + } + + if region.GetLeader().GetStoreId() == flowStats.StoreID { + newItem.isLeader = true + } + + if newItem.IsNeedDelete() { + return newItem + } + if oldItem != nil { newItem.HotDegree = oldItem.HotDegree + 1 newItem.Stats = oldItem.Stats } + if flowBytes >= hotRegionThreshold { if oldItem == nil { - w.incMetrics("add_item", kind) newItem.Stats = NewRollingStats(rollingWindowsSize) } + newItem.isNew = true newItem.Stats.Add(float64(flowBytes)) - return true, newItem + return newItem } + // smaller than hotRegionThreshold if oldItem == nil { - return false, newItem + return nil } if oldItem.AntiCount <= 0 { - w.incMetrics("remove_item", kind) - return true, nil + newItem.needDelete = true + return newItem } // eliminate some noise newItem.HotDegree = oldItem.HotDegree - 1 newItem.AntiCount = oldItem.AntiCount - 1 newItem.Stats.Add(float64(flowBytes)) - return true, newItem + return newItem } -// Update updates the cache. -func (w *HotSpotCache) Update(key uint64, item *RegionStat, kind FlowKind) { +// HotSpotCache is a cache hold hot regions. +type HotSpotCache struct { + writeFlow *HotStoresStats + readFlow *HotStoresStats +} + +// NewHotSpotCache creates a new hot spot cache. +func NewHotSpotCache() *HotSpotCache { + return &HotSpotCache{ + writeFlow: NewHotStoresStats(), + readFlow: NewHotStoresStats(), + } +} + +// CheckWrite checks the write status, returns update items. +func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stats *StoresStats) []*HotSpotPeerStat { + var updateItems []*HotSpotPeerStat + hotStatGenerators := w.writeFlow.CheckRegionFlow(region, WriteFlow) + for _, hotGen := range hotStatGenerators { + item := hotGen.GenHotSpotPeerStats(stats) + if item != nil { + updateItems = append(updateItems, item) + } + } + return updateItems +} + +// CheckRead checks the read status, returns update items. +func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stats *StoresStats) []*HotSpotPeerStat { + var updateItems []*HotSpotPeerStat + hotStatGenerators := w.readFlow.CheckRegionFlow(region, ReadFlow) + for _, hotGen := range hotStatGenerators { + item := hotGen.GenHotSpotPeerStats(stats) + if item != nil { + updateItems = append(updateItems, item) + } + } + return updateItems +} + +func (w *HotSpotCache) incMetrics(name string, storeID uint64, kind FlowKind) { + storeTag := fmt.Sprintf("store-%d", storeID) switch kind { case WriteFlow: - if item == nil { - w.writeFlow.Remove(key) - } else { - w.writeFlow.Put(key, item) - w.incMetrics("update_item", kind) - } + hotCacheStatusGauge.WithLabelValues(name, storeTag, "write").Inc() case ReadFlow: - if item == nil { - w.readFlow.Remove(key) - } else { - w.readFlow.Put(key, item) - w.incMetrics("update_item", kind) - } + hotCacheStatusGauge.WithLabelValues(name, storeTag, "read").Inc() + } +} + +// Update updates the cache. +func (w *HotSpotCache) Update(item *HotSpotPeerStat) { + var stats *HotStoresStats + switch item.Kind { + case WriteFlow: + stats = w.writeFlow + case ReadFlow: + stats = w.readFlow + } + stats.Update(item) + if item.IsNeedDelete() { + w.incMetrics("remove_item", item.StoreID, item.Kind) + } else if item.IsNew() { + w.incMetrics("add_item", item.StoreID, item.Kind) + } else { + w.incMetrics("update_item", item.StoreID, item.Kind) } } // RegionStats returns hot items according to kind -func (w *HotSpotCache) RegionStats(kind FlowKind) []*RegionStat { - var elements []*cache.Item +func (w *HotSpotCache) RegionStats(kind FlowKind) map[uint64][]*HotSpotPeerStat { + var flowMap map[uint64]cache.Cache switch kind { case WriteFlow: - elements = w.writeFlow.Elems() + flowMap = w.writeFlow.hotStoreStats case ReadFlow: - elements = w.readFlow.Elems() + flowMap = w.readFlow.hotStoreStats } - stats := make([]*RegionStat, len(elements)) - for i := range elements { - stats[i] = elements[i].Value.(*RegionStat) + res := make(map[uint64][]*HotSpotPeerStat) + for storeID, elements := range flowMap { + values := elements.Elems() + stat, ok := res[storeID] + if !ok { + stat = make([]*HotSpotPeerStat, len(values)) + res[storeID] = stat + } + for i := range values { + stat[i] = values[i].Value.(*HotSpotPeerStat) + } } - return stats + return res } // RandHotRegionFromStore random picks a hot region in specify store. -func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotThreshold int) *RegionStat { - stats := w.RegionStats(kind) +func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotThreshold int) *HotSpotPeerStat { + stats, ok := w.RegionStats(kind)[storeID] + if !ok { + return nil + } for _, i := range rand.Perm(len(stats)) { - if stats[i].HotDegree >= hotThreshold && stats[i].StoreID == storeID { + if stats[i].HotDegree >= hotThreshold { return stats[i] } } @@ -229,31 +400,85 @@ func (w *HotSpotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hot // CollectMetrics collect the hot cache metrics func (w *HotSpotCache) CollectMetrics(stats *StoresStats) { - hotCacheStatusGauge.WithLabelValues("total_length", "write").Set(float64(w.writeFlow.Len())) - hotCacheStatusGauge.WithLabelValues("total_length", "read").Set(float64(w.readFlow.Len())) - threshold := calculateWriteHotThreshold(stats) - hotCacheStatusGauge.WithLabelValues("hotThreshold", "write").Set(float64(threshold)) - threshold = calculateReadHotThreshold(stats) - hotCacheStatusGauge.WithLabelValues("hotThreshold", "read").Set(float64(threshold)) + for storeID, flowStats := range w.writeFlow.hotStoreStats { + storeTag := fmt.Sprintf("store-%d", storeID) + threshold := calculateWriteHotThresholdWithStore(stats, storeID) + hotCacheStatusGauge.WithLabelValues("total_length", storeTag, "write").Set(float64(flowStats.Len())) + hotCacheStatusGauge.WithLabelValues("hotThreshold", storeTag, "write").Set(float64(threshold)) + } + + for storeID, flowStats := range w.readFlow.hotStoreStats { + storeTag := fmt.Sprintf("store-%d", storeID) + threshold := calculateReadHotThresholdWithStore(stats, storeID) + hotCacheStatusGauge.WithLabelValues("total_length", storeTag, "read").Set(float64(flowStats.Len())) + hotCacheStatusGauge.WithLabelValues("hotThreshold", storeTag, "read").Set(float64(threshold)) + } } // IsRegionHot checks if the region is hot. -func (w *HotSpotCache) IsRegionHot(id uint64, hotThreshold int) bool { - if stat, ok := w.writeFlow.Peek(id); ok { - if stat.(*RegionStat).HotDegree >= hotThreshold { - return true - } +func (w *HotSpotCache) IsRegionHot(region *core.RegionInfo, hotThreshold int) bool { + stats := w.writeFlow + if stats.isRegionHotWithAnyPeers(region, hotThreshold) { + return true + } + stats = w.readFlow + return stats.isRegionHotWithPeer(region, region.GetLeader(), hotThreshold) +} + +// Utils +func calculateWriteHotThreshold(stats *StoresStats) uint64 { + // hotRegionThreshold is used to pick hot region + // suppose the number of the hot Regions is statCacheMaxLen + // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions + // divide 2 because the store reports data about two times than the region record write to rocksdb + divisor := float64(statCacheMaxLen) * 2 + hotRegionThreshold := uint64(stats.TotalBytesWriteRate() / divisor) + + if hotRegionThreshold < hotWriteRegionMinFlowRate { + hotRegionThreshold = hotWriteRegionMinFlowRate } - if stat, ok := w.readFlow.Peek(id); ok { - return stat.(*RegionStat).HotDegree >= hotThreshold + return hotRegionThreshold +} + +func calculateWriteHotThresholdWithStore(stats *StoresStats, storeID uint64) uint64 { + writeBytes, _ := stats.GetStoreBytesRate(storeID) + divisor := float64(storeStatCacheMaxLen) * 2 + hotRegionThreshold := uint64(float64(writeBytes) / divisor) + + if hotRegionThreshold < hotWriteRegionMinFlowRate { + hotRegionThreshold = hotWriteRegionMinFlowRate } - return false + return hotRegionThreshold +} + +func calculateReadHotThresholdWithStore(stats *StoresStats, storeID uint64) uint64 { + _, readBytes := stats.GetStoreBytesRate(storeID) + divisor := float64(storeStatCacheMaxLen) * 2 + hotRegionThreshold := uint64(float64(readBytes) / divisor) + + if hotRegionThreshold < hotReadRegionMinFlowRate { + hotRegionThreshold = hotReadRegionMinFlowRate + } + return hotRegionThreshold +} + +func calculateReadHotThreshold(stats *StoresStats) uint64 { + // hotRegionThreshold is used to pick hot region + // suppose the number of the hot Regions is statCacheMaxLen + // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions + divisor := float64(statCacheMaxLen) + hotRegionThreshold := uint64(stats.TotalBytesReadRate() / divisor) + + if hotRegionThreshold < hotReadRegionMinFlowRate { + hotRegionThreshold = hotReadRegionMinFlowRate + } + return hotRegionThreshold } // RegionStatInformer provides access to a shared informer of statistics. type RegionStatInformer interface { - IsRegionHot(id uint64) bool - RegionWriteStats() []*RegionStat - RegionReadStats() []*RegionStat + IsRegionHot(region *core.RegionInfo) bool + RegionWriteStats() map[uint64][]*HotSpotPeerStat + RegionReadStats() map[uint64][]*HotSpotPeerStat RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo } diff --git a/server/statistics/metrics.go b/server/statistics/metrics.go index f4ae69f965b..9ab4dab2267 100644 --- a/server/statistics/metrics.go +++ b/server/statistics/metrics.go @@ -22,7 +22,7 @@ var ( Subsystem: "hotcache", Name: "status", Help: "Status of the hotspot.", - }, []string{"name", "type"}) + }, []string{"name", "store", "type"}) storeStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ diff --git a/server/statistics/region.go b/server/statistics/region.go index 6b0d34b7e4f..cfa452c8608 100644 --- a/server/statistics/region.go +++ b/server/statistics/region.go @@ -19,37 +19,47 @@ import ( "github.com/pingcap/pd/server/core" ) -// RegionStat records each hot region's statistics -type RegionStat struct { +// HotSpotPeerStat records each hot region's statistics +type HotSpotPeerStat struct { RegionID uint64 `json:"region_id"` FlowBytes uint64 `json:"flow_bytes"` + FlowKeys uint64 `json:"flow_keys"` // HotDegree records the hot region update times HotDegree int `json:"hot_degree"` // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` - StoreID uint64 `json:"-"` + // StoreID is the store id of the region peer + StoreID uint64 `json:"store_id"` + Kind FlowKind `json:"kind"` // AntiCount used to eliminate some noise when remove region in cache AntiCount int // Version used to check the region split times Version uint64 // Stats is a rolling statistics, recording some recently added records. Stats *RollingStats + + needDelete bool + isLeader bool + isNew bool } -// NewRegionStat returns a RegionStat. -func NewRegionStat(region *core.RegionInfo, flowBytes uint64, antiCount int) *RegionStat { - return &RegionStat{ - RegionID: region.GetID(), - FlowBytes: flowBytes, - LastUpdateTime: time.Now(), - StoreID: region.GetLeader().GetStoreId(), - Version: region.GetMeta().GetRegionEpoch().GetVersion(), - AntiCount: antiCount, - } +// IsNeedDelete to delete the item in cache. +func (stat HotSpotPeerStat) IsNeedDelete() bool { + return stat.needDelete +} + +// IsLeader indicaes the item belong to the leader. +func (stat HotSpotPeerStat) IsLeader() bool { + return stat.isLeader +} + +// IsNew indicaes the item is first update in the cache of the region. +func (stat HotSpotPeerStat) IsNew() bool { + return stat.isNew } // RegionsStat is a list of a group region state type -type RegionsStat []RegionStat +type RegionsStat []HotSpotPeerStat func (m RegionsStat) Len() int { return len(m) } func (m RegionsStat) Swap(i, j int) { m[i], m[j] = m[j], m[i] } diff --git a/server/statistics/store.go b/server/statistics/store.go index 53fa77d0f5e..4a4b95c0116 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -92,6 +92,16 @@ func (s *StoresStats) TotalBytesReadRate() float64 { return s.bytesReadRate } +// GetStoreBytesRate returns the bytes write stat of the specified store. +func (s *StoresStats) GetStoreBytesRate(storeID uint64) (writeRate float64, readRate float64) { + s.RLock() + defer s.RUnlock() + if storeStat, ok := s.rollingStoresStats[storeID]; ok { + return storeStat.GetBytesRate() + } + return 0, 0 +} + // GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { s.RLock() diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 93afc777437..b9205944535 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -51,7 +51,7 @@ func main() { flag.Parse() simutil.InitLogger(*simLogLevel) - statistics.Simulating = true + statistics.Denoising = false if *caseName == "" { if *pdAddr != "" {