Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into support_store_report
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer committed May 14, 2021
2 parents c244213 + 91470c3 commit 01fcb2a
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 84 deletions.
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
}
}
if store := c.core.GetStore(newStore.GetID()); store != nil {
c.hotStat.UpdateStoreHeartbeatMetrics(store)
statistics.UpdateStoreHeartbeatMetrics(store)
}
c.core.PutStore(newStore)
c.hotStat.Observe(newStore.GetID(), newStore.GetStoreStats())
c.hotStat.UpdateTotalLoad(c.core.GetStores())
c.hotStat.FilterUnhealthyStore(c)
reportInterval := stats.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
Expand Down Expand Up @@ -1107,7 +1106,9 @@ func (c *RaftCluster) buryStore(storeID uint64) error {
err := c.putStoreLocked(newStore)
c.onStoreVersionChangeLocked()
if err == nil {
// clean up the residual information.
c.RemoveStoreLimit(storeID)
c.hotStat.RemoveRollingStoreStats(storeID)
}
return err
}
Expand Down Expand Up @@ -1267,7 +1268,6 @@ func (c *RaftCluster) deleteStoreLocked(store *core.StoreInfo) error {
}
}
c.core.DeleteStore(store)
c.hotStat.RemoveRollingStoreStats(store.GetID())
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (w *HotCache) HotRegionsFromStore(storeID uint64, kind FlowKind, minHotDegr

// IsRegionHot checks if the region is hot.
func (w *HotCache) IsRegionHot(region *core.RegionInfo, minHotDegree int) bool {
return w.writeFlow.IsRegionHot(region, minHotDegree) ||
w.readFlow.IsRegionHot(region, minHotDegree)
return w.writeFlow.isRegionHotWithAnyPeers(region, minHotDegree) ||
w.readFlow.isRegionHotWithPeer(region, region.GetLeader(), minHotDegree)
}

// CollectMetrics collects the hot cache metrics.
Expand Down
12 changes: 0 additions & 12 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,6 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second)
}

func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool {
f.RLock()
defer f.RUnlock()
switch f.kind {
case WriteFlow:
return f.isRegionHotWithAnyPeers(region, hotDegree)
case ReadFlow:
return f.isRegionHotWithPeer(region, region.GetLeader(), hotDegree)
}
return false
}

func (f *hotPeerCache) CollectMetrics(typ string) {
f.RLock()
defer f.RUnlock()
Expand Down
71 changes: 22 additions & 49 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ import (
type StoresStats struct {
sync.RWMutex
rollingStoresStats map[uint64]*RollingStoreStats
totalLoads []float64
}

// NewStoresStats creates a new hot spot cache.
func NewStoresStats() *StoresStats {
return &StoresStats{
rollingStoresStats: make(map[uint64]*RollingStoreStats),
totalLoads: make([]float64, StoreStatCount),
}
}

Expand Down Expand Up @@ -67,35 +65,14 @@ func (s *StoresStats) GetOrCreateRollingStoreStats(storeID uint64) *RollingStore

// Observe records the current store status with a given store.
func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats) {
store := s.GetOrCreateRollingStoreStats(storeID)
store.Observe(stats)
rollingStoreStat := s.GetOrCreateRollingStoreStats(storeID)
rollingStoreStat.Observe(stats)
}

// Set sets the store statistics (for test).
func (s *StoresStats) Set(storeID uint64, stats *pdpb.StoreStats) {
store := s.GetOrCreateRollingStoreStats(storeID)
store.Set(stats)
}

// UpdateTotalLoad updates the total loads of all stores.
func (s *StoresStats) UpdateTotalLoad(stores []*core.StoreInfo) {
s.Lock()
defer s.Unlock()
for i := range s.totalLoads {
s.totalLoads[i] = 0
}
for _, store := range stores {
if !store.IsUp() {
continue
}
stats, ok := s.rollingStoresStats[store.GetID()]
if !ok {
continue
}
for i := range s.totalLoads {
s.totalLoads[i] += stats.GetLoad(StoreStatKind(i))
}
}
rollingStoreStat := s.GetOrCreateRollingStoreStats(storeID)
rollingStoreStat.Set(stats)
}

// GetStoresLoads returns all stores loads.
Expand All @@ -111,24 +88,20 @@ func (s *StoresStats) GetStoresLoads() map[uint64][]float64 {
return res
}

func (s *StoresStats) storeIsUnhealthy(cluster core.StoreSetInformer, storeID uint64) bool {
store := cluster.GetStore(storeID)
return store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed()
}

// FilterUnhealthyStore filter unhealthy store
func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) {
s.Lock()
defer s.Unlock()
for storeID := range s.rollingStoresStats {
if s.storeIsUnhealthy(cluster, storeID) {
store := cluster.GetStore(storeID)
if store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed() {
delete(s.rollingStoresStats, storeID)
}
}
}

// UpdateStoreHeartbeatMetrics is used to update store heartbeat interval metrics
func (s *StoresStats) UpdateStoreHeartbeatMetrics(store *core.StoreInfo) {
func UpdateStoreHeartbeatMetrics(store *core.StoreInfo) {
storeHeartbeatIntervalHist.Observe(time.Since(store.GetLastHeartbeatTS()).Seconds())
}

Expand All @@ -145,16 +118,16 @@ const (
DefaultAotSize = 2
// DefaultWriteMfSize is default size of write median filter
DefaultWriteMfSize = 5
// DefaultReadMfSize is default size of read median filter
DefaultReadMfSize = 3
// defaultReadMfSize is default size of read median filter
defaultReadMfSize = 3
)

// NewRollingStoreStats creates a RollingStoreStats.
func newRollingStoreStats() *RollingStoreStats {
timeMedians := make(map[StoreStatKind]*movingaverage.TimeMedian)
interval := StoreHeartBeatReportInterval * time.Second
timeMedians[StoreReadBytes] = movingaverage.NewTimeMedian(DefaultAotSize, DefaultReadMfSize, interval)
timeMedians[StoreReadKeys] = movingaverage.NewTimeMedian(DefaultAotSize, DefaultReadMfSize, interval)
timeMedians[StoreReadBytes] = movingaverage.NewTimeMedian(DefaultAotSize, defaultReadMfSize, interval)
timeMedians[StoreReadKeys] = movingaverage.NewTimeMedian(DefaultAotSize, defaultReadMfSize, interval)
timeMedians[StoreWriteBytes] = movingaverage.NewTimeMedian(DefaultAotSize, DefaultWriteMfSize, interval)
timeMedians[StoreWriteKeys] = movingaverage.NewTimeMedian(DefaultAotSize, DefaultWriteMfSize, interval)

Expand All @@ -180,14 +153,14 @@ func collect(records []*pdpb.RecordPair) float64 {
// Observe records current statistics.
func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) {
statInterval := stats.GetInterval()
interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp()
log.Debug("update store stats", zap.Uint64("key-write", stats.KeysWritten), zap.Uint64("bytes-write", stats.BytesWritten), zap.Duration("interval", time.Duration(interval)*time.Second), zap.Uint64("store-id", stats.GetStoreId()))
interval := time.Duration(statInterval.GetEndTimestamp()-statInterval.GetStartTimestamp()) * time.Second
log.Debug("update store stats", zap.Uint64("key-write", stats.KeysWritten), zap.Uint64("bytes-write", stats.BytesWritten), zap.Duration("interval", interval), zap.Uint64("store-id", stats.GetStoreId()))
r.Lock()
defer r.Unlock()
r.timeMedians[StoreWriteBytes].Add(float64(stats.BytesWritten), time.Duration(interval)*time.Second)
r.timeMedians[StoreWriteKeys].Add(float64(stats.KeysWritten), time.Duration(interval)*time.Second)
r.timeMedians[StoreReadBytes].Add(float64(stats.BytesRead), time.Duration(interval)*time.Second)
r.timeMedians[StoreReadKeys].Add(float64(stats.KeysRead), time.Duration(interval)*time.Second)
r.timeMedians[StoreWriteBytes].Add(float64(stats.BytesWritten), interval)
r.timeMedians[StoreWriteKeys].Add(float64(stats.KeysWritten), interval)
r.timeMedians[StoreReadBytes].Add(float64(stats.BytesRead), interval)
r.timeMedians[StoreReadKeys].Add(float64(stats.KeysRead), interval)

// Updates the cpu usages and disk rw rates of store.
r.movingAvgs[StoreCPUUsage].Add(collect(stats.GetCpuUsages()))
Expand All @@ -198,16 +171,16 @@ func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) {
// Set sets the statistics (for test).
func (r *RollingStoreStats) Set(stats *pdpb.StoreStats) {
statInterval := stats.GetInterval()
interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp()
interval := float64(statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp())
if interval == 0 {
return
}
r.Lock()
defer r.Unlock()
r.timeMedians[StoreWriteBytes].Set(float64(stats.BytesWritten) / float64(interval))
r.timeMedians[StoreReadBytes].Set(float64(stats.BytesRead) / float64(interval))
r.timeMedians[StoreWriteKeys].Set(float64(stats.KeysWritten) / float64(interval))
r.timeMedians[StoreReadKeys].Set(float64(stats.KeysRead) / float64(interval))
r.timeMedians[StoreWriteBytes].Set(float64(stats.BytesWritten) / interval)
r.timeMedians[StoreReadBytes].Set(float64(stats.BytesRead) / interval)
r.timeMedians[StoreWriteKeys].Set(float64(stats.KeysWritten) / interval)
r.timeMedians[StoreReadKeys].Set(float64(stats.KeysRead) / interval)
r.movingAvgs[StoreCPUUsage].Set(collect(stats.GetCpuUsages()))
r.movingAvgs[StoreDiskReadRate].Set(collect(stats.GetReadIoRates()))
r.movingAvgs[StoreDiskWriteRate].Set(collect(stats.GetWriteIoRates()))
Expand Down
18 changes: 0 additions & 18 deletions server/statistics/store_hot_peers_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,3 @@ type StoreHotPeersInfos struct {

// StoreHotPeersStat is used to record the hot region statistics group by store.
type StoreHotPeersStat map[uint64]*HotPeersStat

// GetStoreStatAsPeer returns stat as peer from the corresponding store.
func (info *StoreHotPeersInfos) GetStoreStatAsPeer(storeID uint64) (string, *HotPeersStat) {
stat, ok := info.AsPeer[storeID]
if !ok {
stat = &HotPeersStat{}
}
return "as_peer", stat
}

// GetStoreStatAsLeader returns stat stat as leader from the corresponding store.
func (info *StoreHotPeersInfos) GetStoreStatAsLeader(storeID uint64) (string, *HotPeersStat) {
stat, ok := info.AsLeader[storeID]
if !ok {
stat = &HotPeersStat{}
}
return "as_leader", stat
}

0 comments on commit 01fcb2a

Please sign in to comment.