Skip to content

Commit

Permalink
add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Jul 22, 2023
1 parent 1cef5ae commit 3588db3
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 30 deletions.
41 changes: 18 additions & 23 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,21 @@ type balanceSolver struct {
}

func (bs *balanceSolver) init() {
// Init store load detail according to the type.
// Load the configuration items of the scheduler.
bs.resourceTy = toResourceType(bs.rwTy, bs.opTy)
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()
switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}

// Init store load detail according to the type.
bs.stLoadDetail = bs.sche.stLoadInfos[bs.resourceTy]

bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}
Expand All @@ -509,7 +522,6 @@ func (bs *balanceSolver) init() {
}
maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}

bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.filteredHotPeers = make(map[uint64][]*statistics.HotPeerStat)
bs.nthHotPeer = make(map[uint64][]*statistics.HotPeerStat)
for _, detail := range bs.stLoadDetail {
Expand All @@ -532,18 +544,6 @@ func (bs *balanceSolver) init() {
Loads: stepLoads,
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()

switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}
}

func (bs *balanceSolver) initRankV1() {
Expand Down Expand Up @@ -881,15 +881,13 @@ func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) [
ret := make([]*statistics.HotPeerStat, 0, len(hotPeers))
appendItem := func(item *statistics.HotPeerStat) {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree, bs.rwTy) {
// todo: add test for regionPendings
// todo: add test for CoolDownTransferLeader
// no in pending operator and no need cool down after transfer leader
ret = append(ret, item)
}
}

var firstSort, secondSort []*statistics.HotPeerStat
if len(hotPeers) > topnPosition || len(hotPeers) > bs.maxPeerNum {
if len(hotPeers) >= topnPosition || len(hotPeers) > bs.maxPeerNum {
firstSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(firstSort, hotPeers)
sort.Slice(firstSort, func(i, j int) bool {
Expand All @@ -901,10 +899,10 @@ func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) [
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
}
if len(hotPeers) > topnPosition {
if len(hotPeers) >= topnPosition {
storeID := storeLoad.GetID()
bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition]
bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition]
bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition-1]
bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition-1]
}
if len(hotPeers) > bs.maxPeerNum {
union := bs.sortHotPeers(firstSort, secondSort)
Expand All @@ -923,9 +921,6 @@ func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) [

func (bs *balanceSolver) sortHotPeers(firstSort, secondSort []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
if len(firstSort) == 0 || len(secondSort) == 0 {
return union
}
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
for len(union) < bs.maxPeerNum {
for len(firstSort) > 0 {
Expand Down
70 changes: 63 additions & 7 deletions pkg/schedule/schedulers/hot_region_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/docker/go-units"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
Expand Down Expand Up @@ -358,36 +359,89 @@ func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) {
// The query of high node is 2000qps, the low node is 200qps.
// There are all small hot regions in the cluster, which are smaller than 20% of diff or 2% of low node.
re := require.New(t)
emptyFunc := func(*mockcluster.Cluster, *hotScheduler) {}
highLoad, lowLoad := uint64(2000), uint64(200)
bigHotRegionByte := uint64(float64(lowLoad) * firstPriorityMinHotRatio * 10 * units.MiB * statistics.ReadReportInterval)
bigHotRegionQuery := uint64(float64(lowLoad) * firstPriorityMinHotRatio * 10 * statistics.ReadReportInterval)

// Case1: Before #6827, we only use minHotRatio, so cannot schedule small hot region in this case.
// Because 10000 is larger than the length of hotRegions, so `filterHotPeers` will skip the topn calculation.
regions := make([]testRegionInfo, 0)
origin := topnPosition
topnPosition = 10000
ops := checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, regions)
ops := checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc)
re.Empty(ops)
topnPosition = origin

// Case2: After #6827, we use top10 as the threshold of minHotPeer.
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, regions)
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc)
re.Len(ops, 1)
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, lowLoad, highLoad, emptyFunc)
re.Len(ops, 0)

// Case3: If there is larger hot region, we will schedule it.
hotRegionID := uint64(100)
regions = append(regions, testRegionInfo{hotRegionID, []uint64{1, 2, 3}, float64(lowLoad*units.MiB) * 0.1, 0, float64(lowLoad) * 0.1})
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, regions)
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) {
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
})
re.Len(ops, 1)
re.Equal(hotRegionID, ops[0].RegionID())

// Case4: If there is larger hot region, but it need to cool down, we will schedule small hot region.
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) {
// just transfer leader
tc.AddRegionWithReadInfo(hotRegionID, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
})
re.Len(ops, 1)
re.NotEqual(hotRegionID, ops[0].RegionID())

// Case5: If there is larger hot region, but it is pending, we will schedule small hot region.
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, hb *hotScheduler) {
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
hb.regionPendings[hotRegionID] = &pendingInfluence{}
})
re.Len(ops, 1)
re.NotEqual(hotRegionID, ops[0].RegionID())

// Case5: If there are more than topnPosition hot regions, but them need to cool down,
// we will schedule large hot region rather than small hot region, so there is no operator.
topnPosition = 2
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) {
// just transfer leader
tc.AddRegionWithReadInfo(hotRegionID, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
// just transfer leader
tc.AddRegionWithReadInfo(hotRegionID+1, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddRegionWithReadInfo(hotRegionID+1, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
})
re.Len(ops, 0)
topnPosition = origin

// Case6: If there are more than topnPosition hot regions, but them are pending,
// we will schedule large hot region rather than small hot region, so there is no operator.
topnPosition = 2
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, hb *hotScheduler) {
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
hb.regionPendings[hotRegionID] = &pendingInfluence{}
tc.AddRegionWithReadInfo(hotRegionID+1, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
hb.regionPendings[hotRegionID+1] = &pendingInfluence{}
})
re.Len(ops, 0)
topnPosition = origin
}

func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLoad, lowLoad uint64, regions []testRegionInfo) []*operator.Operator {
func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLoad, lowLoad uint64,
addOtherRegions func(*mockcluster.Cluster, *hotScheduler)) []*operator.Operator {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
statistics.Denoising = false
sche, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.conf.SetSrcToleranceRatio(1)
hb.conf.SetDstToleranceRatio(1)
hb.conf.SetRankFormulaVersion("v2")
hb.conf.ReadPriorities = []string{statistics.QueryPriority, statistics.BytePriority}
tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 40)
tc.AddRegionStore(2, 10)
Expand All @@ -402,6 +456,7 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo

smallHotPeerQuery := float64(lowLoad) * firstPriorityMinHotRatio * 0.9 // it's a small hot region than the firstPriorityMinHotRatio
smallHotPeerByte := float64(lowLoad) * secondPriorityMinHotRatio * 0.9 * units.MiB // it's a small hot region than the secondPriorityMinHotRatio
regions := make([]testRegionInfo, 0)
for i := 10; i < 50; i++ {
regions = append(regions, testRegionInfo{uint64(i), []uint64{1, 2, 3}, smallHotPeerByte, 0, smallHotPeerQuery})
if i < 20 {
Expand All @@ -410,7 +465,8 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo
}
}
addRegionInfo(tc, statistics.Read, regions)

tc.SetHotRegionCacheHitsThreshold(1)
addOtherRegions(tc, hb)
ops, _ := hb.Schedule(tc, false)
return ops
}

0 comments on commit 3588db3

Please sign in to comment.