Skip to content

Commit

Permalink
scheduler: fix inaccurate statistics and config (#3949) (#3965)
Browse files Browse the repository at this point in the history
* scheduler: remove the useless code in hot-region-scheduler (#3833)

Signed-off-by: HunDunDM <hundundm@gmail.com>

* scheduler: fix inaccurate statistics and config (#3949)

Signed-off-by: HunDunDM <hundundm@gmail.com>

* tiny fix

Signed-off-by: HunDunDM <hundundm@gmail.com>

Co-authored-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
ti-chi-bot and HunDunDM authored Sep 15, 2021
1 parent 2b8584e commit 48b3ffd
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 59 deletions.
110 changes: 64 additions & 46 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ type hotScheduler struct {
name string
*BaseScheduler
sync.RWMutex
leaderLimit uint64
peerLimit uint64
types []rwType
r *rand.Rand
types []rwType
r *rand.Rand

// regionPendings stores regionID -> pendingInfluence
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
Expand All @@ -108,8 +106,6 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS
ret := &hotScheduler{
name: HotRegionName,
BaseScheduler: base,
leaderLimit: 1,
peerLimit: 1,
types: []rwType{write, read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
regionPendings: make(map[uint64]*pendingInfluence),
Expand Down Expand Up @@ -171,7 +167,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
h.Lock()
defer h.Unlock()

h.prepareForBalance(cluster)
h.prepareForBalance(typ, cluster)

switch typ {
case read:
Expand All @@ -184,13 +180,15 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
switch typ {
case read:
// update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
Expand All @@ -204,9 +202,8 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.pendingSums,
regionRead,
read, core.RegionKind)
}

{ // update write statistics
case write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
Expand All @@ -228,9 +225,9 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
// and clean the region from regionInfluence if they have ended operator.
// It makes each key/byte rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
maxZombieDur := h.conf.GetMaxZombieDuration()
ret := make(map[uint64]*Influence)
for id, p := range h.regionPendings {
maxZombieDur := p.maxZombieDuration
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
delete(h.regionPendings, id)
Expand Down Expand Up @@ -266,8 +263,9 @@ func summaryStoresLoad(
) map[uint64]*storeLoadDetail {
// loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count)
loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads))
allLoadSum := make([]float64, statistics.DimLen)
allCount := 0.0
allTiKVLoadSum := make([]float64, statistics.DimLen)
allTiKVCount := 0
allTiKVHotPeersCount := 0

// Stores without byte rate statistics is not available to schedule.
for _, store := range stores {
Expand All @@ -279,6 +277,8 @@ func summaryStoresLoad(
if kind == core.LeaderKind && !store.AllowLeaderTransfer() {
continue
}
isTiFlash := core.IsTiFlashStore(store.GetMeta())

loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
Expand Down Expand Up @@ -317,10 +317,14 @@ func summaryStoresLoad(
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.KeyDim])
}
}
for i := range allLoadSum {
allLoadSum[i] += loads[i]

if !isTiFlash {
for i := range allTiKVLoadSum {
allTiKVLoadSum[i] += loads[i]
}
allTiKVCount += 1
allTiKVHotPeersCount += len(hotPeers)
}
allCount += float64(len(hotPeers))

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Expand All @@ -335,16 +339,19 @@ func summaryStoresLoad(
HotPeers: hotPeers,
}
}
storeLen := float64(len(storesLoads))

expectLoads := make([]float64, len(allTiKVLoadSum))
for i := range expectLoads {
expectLoads[i] = allTiKVLoadSum[i] / float64(allTiKVCount)
}
expect := storeLoad{
Loads: expectLoads,
Count: float64(allTiKVHotPeersCount) / float64(allTiKVCount),
}

// store expectation byte/key rate and count for each store-load detail.
for id, detail := range loadDetail {
expectLoads := make([]float64, len(allLoadSum))
for i := range expectLoads {
expectLoads[i] = allLoadSum[i] / storeLen
}
expectCount := allCount / storeLen
detail.LoadPred.Expect.Loads = expectLoads
detail.LoadPred.Expect.Count = expectCount
detail.LoadPred.Expect = expect
// Debug
{
ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String()
Expand All @@ -356,7 +363,7 @@ func summaryStoresLoad(
}
{
ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectCount)
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expect.Count)
}
}
return loadDetail
Expand All @@ -377,15 +384,15 @@ func filterHotPeers(
return ret
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence) bool {
func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc()
return false
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur)
h.regionPendings[regionID] = influence

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
Expand Down Expand Up @@ -535,9 +542,9 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
bs.cur = &solution{}
var (
best *solution
ops []*operator.Operator
infls []Influence
best *solution
op *operator.Operator
infl Influence
)

for srcStoreID := range bs.filterSrcStores() {
Expand All @@ -553,9 +560,9 @@ func (bs *balanceSolver) solve() []*operator.Operator {
bs.cur.dstStoreID = dstStoreID
bs.calcProgressiveRank()
if bs.cur.progressiveRank < 0 && bs.betterThan(best) {
if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 {
ops = newOps
infls = newInfls
if newOp, newInfl := bs.buildOperator(); newOp != nil {
op = newOp
infl = *newInfl
clone := *bs.cur
best = &clone
}
Expand All @@ -564,13 +571,25 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}

for i := 0; i < len(ops); i++ {
// TODO: multiple operators need to be atomic.
if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i]) {
return nil
}
if best == nil {
return nil
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.rwTy == write && bs.opTy == transferLeader:
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}

if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) {
return nil
}
return ops

return []*operator.Operator{op}
}

// filterSrcStores compare the min rate and the ratio * expectation rate, if both key and byte rate is greater than
Expand Down Expand Up @@ -787,7 +806,7 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about key rate.
// Only consider key rate.
srcKeyRate := srcLd.Loads[statistics.KeyDim]
dstKeyRate := dstLd.Loads[statistics.KeyDim]
peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, statistics.KeyDim))
Expand Down Expand Up @@ -998,12 +1017,11 @@ func (bs *balanceSolver) isReadyToBuild() bool {
return true
}

func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence) {
if !bs.isReadyToBuild() {
return nil, nil
}
var (
op *operator.Operator
counters []prometheus.Counter
err error
)
Expand Down Expand Up @@ -1064,11 +1082,11 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String()))

infl := Influence{
infl = &Influence{
Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...),
Count: 1,
}
return []*operator.Operator{op}, []Influence{infl}
return op, infl
}

func (h *hotScheduler) GetHotStatus(typ string) *statistics.StoreHotPeersInfos {
Expand Down
11 changes: 8 additions & 3 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
MinHotByteRate: 100,
MinHotKeyRate: 10,
MaxZombieRounds: 3,
MaxPeerNum: 1000,
ByteRateRankStepRatio: 0.05,
KeyRateRankStepRatio: 0.05,
CountRankStepRatio: 0.01,
GreatDecRatio: 0.95,
MinorDecRatio: 0.99,
MaxPeerNum: 1000,
SrcToleranceRatio: 1.05, // Tolerate 5% difference
DstToleranceRatio: 1.05, // Tolerate 5% difference
}
Expand Down Expand Up @@ -73,12 +73,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(conf)
}

func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration {
func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -214,5 +220,4 @@ func (conf *hotRegionSchedulerConfig) persist() error {

}
return conf.storage.SaveScheduleConfig(HotRegionName, data)

}
4 changes: 2 additions & 2 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
op.Start()
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, Influence{})
return newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration())
}
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
Expand Down Expand Up @@ -1306,7 +1306,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {

if testcase.DegreeAfterTransferLeader >= 3 {
// try schedule
hb.prepareForBalance(tc)
hb.prepareForBalance(testcase.kind, tc)
leaderSolver := newBalanceSolver(hb, tc, testcase.kind, transferLeader)
leaderSolver.cur = &solution{srcStoreID: 2}
c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule
Expand Down
19 changes: 11 additions & 8 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"net/url"
"strconv"
"time"

"github.com/montanaflynn/stats"
"github.com/pingcap/log"
Expand Down Expand Up @@ -215,17 +216,19 @@ func (lhs *Influence) add(rhs *Influence, w float64) *Influence {

// TODO: merge it into OperatorInfluence.
type pendingInfluence struct {
op *operator.Operator
from, to uint64
origin Influence
op *operator.Operator
from, to uint64
origin Influence
maxZombieDuration time.Duration
}

func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) *pendingInfluence {
func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence {
return &pendingInfluence{
op: op,
from: from,
to: to,
origin: infl,
op: op,
from: from,
to: to,
origin: infl,
maxZombieDuration: maxZombieDur,
}
}

Expand Down

0 comments on commit 48b3ffd

Please sign in to comment.