Skip to content

Commit

Permalink
store/copr: balance region for batch cop task (#24521)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored May 18, 2021
1 parent 5fd17dd commit 66c8cd9
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 52 deletions.
204 changes: 185 additions & 19 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package copr
import (
"context"
"io"
"math"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
Expand All @@ -40,8 +43,9 @@ import (
type batchCopTask struct {
storeAddr string
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext

copTasks []copTaskAndRPCContext
regionInfos []tikv.RegionInfo
}

type batchCopResponse struct {
Expand Down Expand Up @@ -93,9 +97,152 @@ func (rs *batchCopResponse) RespTime() time.Duration {
return rs.respTime
}

type copTaskAndRPCContext struct {
task *copTask
ctx *tikv.RPCContext
// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store because some
// meta data(like the rpc context) in batchCopTask is related to it
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
if len(originalTasks) <= 1 {
return originalTasks
}
storeTaskMap := make(map[uint64]*batchCopTask)
storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
batchTask := &batchCopTask{
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []tikv.RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
}

for _, task := range originalTasks {
taskStoreID := task.regionInfos[0].AllStores[0]
for index, ri := range task.regionInfos {
// for each region, figure out the valid store num
validStoreNum := 0
if index == 0 {
continue
}
if len(ri.AllStores) <= 1 {
validStoreNum = 1
} else {
for _, storeID := range ri.AllStores {
if _, ok := storeTaskMap[storeID]; ok {
validStoreNum++
}
}
}
if validStoreNum == 1 {
// if only one store is valid, just put it to storeTaskMap
storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri)
} else {
// if more than one store is valid, put the region
// to store candidate map
totalRegionCandidateNum += validStoreNum
totalRemainingRegionNum += 1
taskKey := ri.Region.String()
for _, storeID := range ri.AllStores {
if _, validStore := storeTaskMap[storeID]; !validStore {
continue
}
if _, ok := storeCandidateRegionMap[storeID]; !ok {
candidateMap := make(map[string]tikv.RegionInfo)
storeCandidateRegionMap[storeID] = candidateMap
}
if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion {
// duplicated region, should not happen, just give up balance
logutil.BgLogger().Warn("Meet duplicated region info during when trying to balance batch cop task, give up balancing")
return originalTasks
}
storeCandidateRegionMap[storeID][taskKey] = ri
}
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
return store
}

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}

var ret []*batchCopTask
for _, task := range storeTaskMap {
ret = append(ret, task)
}
return ret
}

func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
Expand Down Expand Up @@ -138,13 +285,15 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
// Then `splitRegion` will reloads these regions.
continue
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx})
batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
copTasks: []copTaskAndRPCContext{{task, rpcCtx}},
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
Expand All @@ -159,9 +308,25 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
}
continue
}

for _, task := range storeTaskMap {
batchTasks = append(batchTasks, task)
}
if log.GetLevel() <= zap.DebugLevel {
msg := "Before region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}
batchTasks = balanceBatchCopTask(batchTasks)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions,"
}
logutil.BgLogger().Debug(msg)
}

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasks takes too much time",
Expand Down Expand Up @@ -311,25 +476,25 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *
// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) {
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *tikvstore.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks))
for _, task := range task.copTasks {
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}

Expand All @@ -351,13 +516,14 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
})
req.StoreTp = tikvrpc.TiFlash

logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks)))
resp, retry, cancel, err := sender.sendStreamReqToAddr(bo, task.copTasks, req, tikv.ReadTimeoutUltraLong)
logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
}
if err != nil {
err = derr.ToTiDBErr(err)
return nil, errors.Trace(err)
}
defer cancel()
Expand Down
14 changes: 7 additions & 7 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
var regionInfos []*coprocessor.RegionInfo
originalTask, ok := req.Meta.(*batchCopTask)
if ok {
for _, task := range originalTask.copTasks {
for _, ri := range originalTask.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
RegionId: task.task.region.GetID(),
RegionId: ri.Region.GetID(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: task.task.region.GetConfVer(),
Version: task.task.region.GetVer(),
ConfVer: ri.Region.GetConfVer(),
Version: ri.Region.GetVer(),
},
Ranges: task.task.ranges.ToPBRanges(),
Ranges: ri.Ranges.ToPBRanges(),
})
}
}
Expand All @@ -214,8 +214,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, tikv.ReadTimeoutMedium)
sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient())
rpcResp, _, _, err = sender.SendReqToAddr(bo.TiKVBackoffer(), originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
Expand Down
Loading

0 comments on commit 66c8cd9

Please sign in to comment.