Skip to content

Commit

Permalink
Revert "schedulers: fix check hot region flow (tikv#3203)" (tikv#3316)
Browse files Browse the repository at this point in the history
This reverts commit c268837.

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored Dec 30, 2020
1 parent 8ff72c9 commit 27c29b7
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 405 deletions.
27 changes: 7 additions & 20 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,52 +324,39 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
if len(filledNums) > 0 {
filledNum = filledNums[0]
}

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
num := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
for i := 0; i < num; i++ {
items := mc.HotCache.CheckRead(r)
for _, item := range items {
mc.HotCache.Update(item)
}
}
mc.PutRegion(r)
return items
}

// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderID uint64,
writtenBytes, writtenKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))

filledNum := mc.HotCache.GetFilledPeriod(statistics.WriteFlow)
if len(filledNums) > 0 {
filledNum = filledNums[0]
}

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckWrite(r)
num := mc.HotCache.GetFilledPeriod(statistics.WriteFlow)
for i := 0; i < num; i++ {
items := mc.HotCache.CheckWrite(r)
for _, item := range items {
mc.HotCache.Update(item)
}
}
mc.PutRegion(r)
return items
}

// UpdateStoreLeaderWeight updates store leader weight.
Expand Down
4 changes: 2 additions & 2 deletions pkg/movingaverage/avg_over_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ func (t *testAvgOverTimeSuite) TestChange(c *C) {
}

func (t *testAvgOverTimeSuite) TestMinFilled(c *C) {
interval := 10 * time.Second
interval := 10
rate := 1.0
for aotSize := 2; aotSize < 10; aotSize++ {
for mfSize := 2; mfSize < 10; mfSize++ {
tm := NewTimeMedian(aotSize, mfSize, interval)
for i := 0; i < tm.GetFilledPeriod(); i++ {
c.Assert(tm.Get(), Equals, 0.0)
tm.Add(rate*interval.Seconds(), interval)
tm.Add(rate*float64(interval), time.Duration(interval)*time.Second)
}
c.Assert(tm.Get(), Equals, rate)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ type TimeMedian struct {
}

// NewTimeMedian returns a TimeMedian with given size.
func NewTimeMedian(aotSize, mfSize int, reportInterval time.Duration) *TimeMedian {
func NewTimeMedian(aotSize, mfSize, reportInterval int) *TimeMedian {
interval := time.Duration(aotSize*reportInterval) * time.Second
return &TimeMedian{
aotInterval: reportInterval,
aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval),
aotInterval: interval,
aot: NewAvgOverTime(interval),
mf: NewMedianFilter(mfSize),
aotSize: aotSize,
mfSize: mfSize,
Expand Down
8 changes: 4 additions & 4 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS
h.regionPendings[regionID] = tmp
}

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_create").Inc()
return true
}

Expand Down Expand Up @@ -806,13 +806,13 @@ func (bs *balanceSolver) calcProgressiveRank() {
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// If belong to the case, both byte rate and key rate will be more balanced, the best choice.
// Both byte rate and key rate are balanced, the best choice.
rank = -3
case byteDecRatio <= minorDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be not worsened, key rate will be more balanced.
// Byte rate is not worsened, key rate is balanced.
rank = -2
case byteHot && byteDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be more balanced, ignore the key rate.
// Byte rate is balanced, ignore the key rate.
rank = -1
}
}
Expand Down
73 changes: 0 additions & 73 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,76 +1060,3 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf
)
}
}

func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
s.checkRegionFlowTest(c, tc.AddLeaderRegionWithWriteInfo)
s.checkRegionFlowTest(c, tc.AddLeaderRegionWithReadInfo)
}

func (s *testHotCacheSuite) checkRegionFlowTest(c *C, heartbeat func(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat) {
// hot degree increase
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
}

// transfer leader and skip the first heartbeat
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}, 1)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
}

// move peer: add peer and remove peer
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1)
for _, item := range items {
c.Check(item.HotDegree, Equals, 4)
}
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 4}, 1)
for _, item := range items {
if item.StoreID == 3 {
c.Check(item.IsNeedDelete(), IsTrue)
continue
}
c.Check(item.HotDegree, Equals, 5)
}
}

func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
// some peers are hot, and some are cold #3198
rate := uint64(512 * KB)
for i := 0; i < statistics.TopNN; i++ {
for j := 0; j < statistics.DefaultAotSize; j++ {
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
c.Check(items[0].GetThresholds()[0], Equals, float64(rate)*statistics.HotThresholdRatio)
// Threshold of store 1,2,3 is 409.6 KB and others are 1 KB
// Make the hot threshold of some store is high and the others are low
rate = 10 * KB
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
} else {
c.Check(item.IsNeedDelete(), IsFalse)
}
}
}
10 changes: 4 additions & 6 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,14 @@ func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat {
peers := make([]statistics.HotPeerStat, 0, len(li.HotPeers))
var totalBytesRate, totalKeysRate float64
for _, peer := range li.HotPeers {
if peer.HotDegree > 0 {
peers = append(peers, *peer.Clone())
totalBytesRate += peer.ByteRate
totalKeysRate += peer.KeyRate
}
peers = append(peers, *peer.Clone())
totalBytesRate += peer.ByteRate
totalKeysRate += peer.KeyRate
}
return &statistics.HotPeersStat{
TotalBytesRate: math.Round(totalBytesRate),
TotalKeysRate: math.Round(totalKeysRate),
Count: len(peers),
Count: len(li.HotPeers),
Stats: peers,
}
}
64 changes: 5 additions & 59 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,6 @@ const (
dimLen
)

type dimStat struct {
typ int
Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed.
LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed.
}

func newDimStat(typ int) *dimStat {
reportInterval := RegionHeartBeatReportInterval * time.Second
return &dimStat{
typ: typ,
Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval),
LastAverage: movingaverage.NewAvgOverTime(reportInterval),
}
}

func (d *dimStat) Add(delta float64, interval time.Duration) {
d.LastAverage.Add(delta, interval)
d.Rolling.Add(delta, interval)
}

func (d *dimStat) isHot(thresholds [dimLen]float64) bool {
return d.LastAverage.IsFull() && d.LastAverage.Get() >= thresholds[d.typ]
}

func (d *dimStat) isFull() bool {
return d.LastAverage.IsFull()
}

func (d *dimStat) clearLastAverage() {
d.LastAverage.Clear()
}

func (d *dimStat) Get() float64 {
return d.Rolling.Get()
}

// HotPeerStat records each hot peer's statistics
type HotPeerStat struct {
StoreID uint64 `json:"store_id"`
Expand All @@ -77,19 +41,15 @@ type HotPeerStat struct {
KeyRate float64 `json:"flow_keys"`

// rolling statistics, recording some recently added records.
rollingByteRate *dimStat
rollingKeyRate *dimStat
rollingByteRate *movingaverage.TimeMedian
rollingKeyRate *movingaverage.TimeMedian

// LastUpdateTime used to calculate average write
LastUpdateTime time.Time `json:"last_update_time"`

needDelete bool
isLeader bool
isNew bool
justTransferLeader bool
interval uint64
thresholds [dimLen]float64
peers []uint64
needDelete bool
isLeader bool
isNew bool
}

// ID returns region ID. Implementing TopNItem.
Expand Down Expand Up @@ -141,11 +101,6 @@ func (stat *HotPeerStat) GetKeyRate() float64 {
return math.Round(stat.rollingKeyRate.Get())
}

// GetThresholds returns thresholds
func (stat *HotPeerStat) GetThresholds() [dimLen]float64 {
return stat.thresholds
}

// Clone clones the HotPeerStat
func (stat *HotPeerStat) Clone() *HotPeerStat {
ret := *stat
Expand All @@ -155,12 +110,3 @@ func (stat *HotPeerStat) Clone() *HotPeerStat {
ret.rollingKeyRate = nil
return &ret
}

func (stat *HotPeerStat) isHot() bool {
return stat.rollingByteRate.isHot(stat.thresholds) || stat.rollingKeyRate.isHot(stat.thresholds)
}

func (stat *HotPeerStat) clearLastAverage() {
stat.rollingByteRate.clearLastAverage()
stat.rollingKeyRate.clearLastAverage()
}
Loading

0 comments on commit 27c29b7

Please sign in to comment.