Skip to content

Commit

Permalink
scheduler: make hot v2 more suitable small hot region
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Jul 20, 2023
1 parent c9939b6 commit 56398c9
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 34 deletions.
80 changes: 50 additions & 30 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ var (
pendingOpFails = schedulerStatus.WithLabelValues(HotRegionName, "pending_op_fails")
)

const topnPosition = 10

type baseHotScheduler struct {
*BaseScheduler
// store information, including pending Influence by resource type
Expand Down Expand Up @@ -449,11 +451,13 @@ func isAvailableV1(s *solution) bool {

type balanceSolver struct {
sche.SchedulerCluster
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
rwTy statistics.RWType
opTy opType
resourceTy resourceType
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
filteredHotPeers map[uint64][]*statistics.HotPeerStat // storeID -> hotPeers(filtered)
nthHotPeer map[uint64][]*statistics.HotPeerStat // storeID -> [dimLen]hotPeers
rwTy statistics.RWType
opTy opType
resourceTy resourceType

cur *solution

Expand Down Expand Up @@ -506,10 +510,15 @@ func (bs *balanceSolver) init() {
}
maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}

bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.filteredHotPeers = make(map[uint64][]*statistics.HotPeerStat)
bs.nthHotPeer = make(map[uint64][]*statistics.HotPeerStat)
for _, detail := range bs.stLoadDetail {
bs.maxSrc = statistics.MaxLoad(bs.maxSrc, detail.LoadPred.Min())
bs.minDst = statistics.MinLoad(bs.minDst, detail.LoadPred.Max())
maxCur = statistics.MaxLoad(maxCur, &detail.LoadPred.Current)
bs.nthHotPeer[detail.GetID()] = make([]*statistics.HotPeerStat, statistics.DimLen)
bs.filteredHotPeers[detail.GetID()] = bs.filterHotPeers(detail)
}

rankStepRatios := []float64{
Expand All @@ -527,7 +536,6 @@ func (bs *balanceSolver) init() {

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()

Expand Down Expand Up @@ -660,7 +668,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
for _, mainPeerStat := range bs.filterHotPeers(srcStore) {
for _, mainPeerStat := range bs.filteredHotPeers[srcStoreID] {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer {
Expand Down Expand Up @@ -688,7 +696,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.needSearchRevertRegions() {
hotSchedulerSearchRevertRegionsCounter.Inc()
dstStoreID := dstStore.GetID()
for _, revertPeerStat := range bs.filterHotPeers(bs.cur.dstStore) {
for _, revertPeerStat := range bs.filteredHotPeers[dstStoreID] {
revertRegion := bs.getRegion(revertPeerStat, dstStoreID)
if revertRegion == nil || revertRegion.GetID() == bs.cur.region.GetID() ||
!allowRevertRegion(revertRegion, srcStoreID) {
Expand Down Expand Up @@ -869,44 +877,56 @@ func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, exp

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) []*statistics.HotPeerStat {
hotPeers := storeLoad.HotPeers
ret := make([]*statistics.HotPeerStat, 0, len(hotPeers))
appendItem := func(item *statistics.HotPeerStat) {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree, bs.rwTy) {
// todo: add test for regionPendings
// no in pending operator and no need cool down after transfer leader
ret = append(ret, item)
}
}

src := storeLoad.HotPeers
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
if len(src) <= bs.maxPeerNum {
ret = make([]*statistics.HotPeerStat, 0, len(src))
for _, peer := range src {
appendItem(peer)
}
} else {
union := bs.sortHotPeers(src)
var firstSort, secondSort []*statistics.HotPeerStat
if len(hotPeers) > topnPosition || len(hotPeers) > bs.maxPeerNum {
firstSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(firstSort, hotPeers)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(secondSort, hotPeers)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
}
if len(hotPeers) > topnPosition {
storeID := storeLoad.GetID()
bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition]
bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition]
}
if len(hotPeers) > bs.maxPeerNum {
union := bs.sortHotPeers(firstSort, secondSort)
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
appendItem(peer)
}
return ret
}

return
for _, peer := range hotPeers {
appendItem(peer)
}
return ret
}

func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort := make([]*statistics.HotPeerStat, len(ret))
copy(secondSort, ret)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
func (bs *balanceSolver) sortHotPeers(firstSort, secondSort []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
if len(firstSort) == 0 || len(secondSort) == 0 {
return union
}
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
for len(union) < bs.maxPeerNum {
for len(firstSort) > 0 {
peer := firstSort[0]
Expand Down
13 changes: 9 additions & 4 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,20 +1907,23 @@ func TestHotCacheSortHotPeer(t *testing.T) {
},
}}

st := &statistics.StoreLoadDetail{
HotPeers: hotPeers,
}
leaderSolver.maxPeerNum = 1
u := leaderSolver.sortHotPeers(hotPeers)
u := leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1}, u)

leaderSolver.maxPeerNum = 2
u = leaderSolver.sortHotPeers(hotPeers)
u = leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1, 2}, u)
}

func checkSortResult(re *require.Assertions, regions []uint64, hotPeers map[*statistics.HotPeerStat]struct{}) {
func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*statistics.HotPeerStat) {
re.Equal(len(hotPeers), len(regions))
for _, region := range regions {
in := false
for hotPeer := range hotPeers {
for _, hotPeer := range hotPeers {
if hotPeer.RegionID == region {
in = true
break
Expand Down Expand Up @@ -1990,7 +1993,9 @@ func TestInfluenceByRWType(t *testing.T) {
// must transfer leader
schedulePeerPr = 0
// must transfer leader from 1 to 3
fmt.Println("transfer leader")
ops, _ = hb.Schedule(tc, false)
fmt.Println(ops)
op = ops[0]
re.NotNil(op)

Expand Down
7 changes: 7 additions & 0 deletions pkg/schedule/schedulers/hot_region_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,17 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
srcPendingRate, dstPendingRate := bs.cur.getPendingLoad(dim)
peersRate := bs.cur.getPeersRateFromCache(dim)
highRate, lowRate := srcRate, dstRate
topnHotPeer := bs.nthHotPeer[bs.cur.srcStore.GetID()][dim]
reverse := false
if srcRate < dstRate {
highRate, lowRate = dstRate, srcRate
peersRate = -peersRate
reverse = true
topnHotPeer = bs.nthHotPeer[bs.cur.dstStore.GetID()][dim]
}
topnRate := math.MaxFloat64
if topnHotPeer != nil {
topnRate = topnHotPeer.GetLoad(dim)
}

if highRate*rs.balancedCheckRatio <= lowRate {
Expand Down Expand Up @@ -262,6 +268,7 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
// maxBetterRate may be less than minBetterRate, in which case a positive fraction cannot be produced.
minNotWorsenedRate = -bs.getMinRate(dim)
minBetterRate = math.Min(minBalancedRate*rs.perceivedRatio, lowRate*rs.minHotRatio)
minBetterRate = math.Min(minBetterRate, topnRate)
maxBetterRate = maxBalancedRate + (highRate-lowRate-minBetterRate-maxBalancedRate)*rs.perceivedRatio
maxNotWorsenedRate = maxBalancedRate + (highRate-lowRate-minNotWorsenedRate-maxBalancedRate)*rs.perceivedRatio
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/schedule/schedulers/hot_region_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package schedulers

import (
"fmt"
"testing"

"github.com/docker/go-units"
Expand All @@ -32,6 +33,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
statistics.Denoising = false
statisticsInterval = 0

sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
Expand Down Expand Up @@ -145,6 +147,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) {
// This is a test that searchRevertRegions finds a solution of rank -2.
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0

cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -207,6 +210,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
// This is a test that searchRevertRegions finds a solution of rank -1.
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0

cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -268,6 +272,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
func TestSkipUniformStore(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0

cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -345,3 +350,42 @@ func TestSkipUniformStore(t *testing.T) {
operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 3, 2)
clearPendingInfluence(hb.(*hotScheduler))
}

func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
statistics.Denoising = false

sche, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 40)
tc.AddRegionStore(2, 10)
tc.AddRegionStore(3, 10)

highLoad, lowLoad := uint64(2000), uint64(200)
tc.UpdateStorageReadQuery(1, highLoad*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadQuery(2, lowLoad*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadQuery(3, (highLoad+lowLoad)/2*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(1, highLoad*units.MiB*statistics.StoreHeartBeatReportInterval, 0)
tc.UpdateStorageReadStats(2, lowLoad*units.MiB*statistics.StoreHeartBeatReportInterval, 0)
tc.UpdateStorageReadStats(3, (highLoad+lowLoad)/2*units.MiB*statistics.StoreHeartBeatReportInterval, 0)

regions := make([]testRegionInfo, 0)
smallHotPeerQuery := float64(lowLoad) * firstPriorityMinHotRatio * 0.9 // it's a small hot region than the firstPriorityMinHotRatio
smallHotPeerByte := float64(lowLoad) * secondPriorityMinHotRatio * 0.9 * units.MiB // it's a small hot region than the secondPriorityMinHotRatio
fmt.Println(smallHotPeerByte, 500*units.KiB)
for i := 0; i < 40; i++ {
regions = append(regions, testRegionInfo{uint64(i), []uint64{1, 2, 3}, smallHotPeerByte, 0, smallHotPeerQuery})
if i < 10 {
regions = append(regions, testRegionInfo{uint64(i), []uint64{2, 1, 3}, smallHotPeerByte, 0, smallHotPeerQuery})
regions = append(regions, testRegionInfo{uint64(i), []uint64{3, 1, 2}, smallHotPeerByte, 0, smallHotPeerQuery})
}
}
addRegionInfo(tc, statistics.Read, regions)

ops, _ := hb.Schedule(tc, false)
re.Len(ops, 1)
}

0 comments on commit 56398c9

Please sign in to comment.