Skip to content

Commit

Permalink
store/copr: batch replica read (#42237)
Browse files Browse the repository at this point in the history
close #42322
  • Loading branch information
you06 committed Mar 19, 2023
1 parent 56412f5 commit 3f8b0cc
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 53 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4101,8 +4101,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:KFKjXBwDCfmPyNjMgNE2YAS+ZtwryVSYSlCSNhzpbig=",
version = "v2.0.7-0.20230316080603-d19741b3ed77",
sum = "h1:m5Y7tBW5Rq8L1ANxibitBa/DInDy3hA2Qvk1Ys9u1NU=",
version = "v2.0.7-0.20230317032622-884a634378d4",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77
github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4
github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77 h1:KFKjXBwDCfmPyNjMgNE2YAS+ZtwryVSYSlCSNhzpbig=
github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77/go.mod h1:DPL03G+QwLmypNjDIl+B02UltorBMx3WzSh4yJbp+cw=
github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4 h1:m5Y7tBW5Rq8L1ANxibitBa/DInDy3hA2Qvk1Ys9u1NU=
github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4/go.mod h1:DPL03G+QwLmypNjDIl+B02UltorBMx3WzSh4yJbp+cw=
github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243 h1:CYU+awkq5ykKyWV2e2Z+qtRveWMttV4N3r0lyk/z4/M=
github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU=
github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo=
Expand Down
128 changes: 86 additions & 42 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,19 @@ type copTask struct {
requestSource util.RequestSource
RowCountHint int // used for extra concurrency of small tasks, -1 for unknown row count
batchTaskList map[uint64]*batchedCopTask

// when this task is batched and the leader's wait duration exceeds the load-based threshold,
// we set this field to the target replica store ID and redirect the request to the replica.
redirect2Replica *uint64
busyThreshold time.Duration
}

type batchedCopTask struct {
task *copTask
region coprocessor.RegionInfo
storeID uint64
peer *metapb.Peer
task *copTask
region coprocessor.RegionInfo
storeID uint64
peer *metapb.Peer
loadBasedReplicaRetry bool
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -339,7 +345,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c

var builder taskBuilder
if req.StoreBatchSize > 0 && hints != nil {
builder = newBatchTaskBuilder(bo, req, cache)
builder = newBatchTaskBuilder(bo, req, cache, req.ReplicaRead)
} else {
builder = newLegacyTaskBuilder(len(locs))
}
Expand Down Expand Up @@ -389,6 +395,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
pagingSize: pagingSize,
requestSource: req.RequestSource,
RowCountHint: hint,
busyThreshold: req.StoreBusyThreshold,
}
// only keep-order need chan inside task.
// tasks by region error will reuse the channel of parent task.
Expand Down Expand Up @@ -466,25 +473,32 @@ func (b *legacyTaskBuilder) build() []*copTask {
return b.tasks
}

type storeReplicaKey struct {
storeID uint64
replicaRead bool
}

type batchStoreTaskBuilder struct {
bo *Backoffer
req *kv.Request
cache *RegionCache
taskID uint64
limit int
store2Idx map[uint64]int
tasks []*copTask
bo *Backoffer
req *kv.Request
cache *RegionCache
taskID uint64
limit int
store2Idx map[storeReplicaKey]int
tasks []*copTask
replicaRead kv.ReplicaReadType
}

func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder {
func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache, replicaRead kv.ReplicaReadType) *batchStoreTaskBuilder {
return &batchStoreTaskBuilder{
bo: bo,
req: req,
cache: cache,
taskID: 0,
limit: req.StoreBatchSize,
store2Idx: make(map[uint64]int, 16),
tasks: make([]*copTask, 0, 16),
bo: bo,
req: req,
cache: cache,
taskID: 0,
limit: req.StoreBatchSize,
store2Idx: make(map[storeReplicaKey]int, 16),
tasks: make([]*copTask, 0, 16),
replicaRead: replicaRead,
}
}

Expand All @@ -502,16 +516,25 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) {
if b.limit <= 0 || !isSmallTask(task) {
return nil
}
batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead)
batchedTask, err := b.cache.BuildBatchTask(b.bo, b.req, task, b.replicaRead)
if err != nil {
return err
}
if batchedTask == nil {
return nil
}
if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit {
key := storeReplicaKey{
storeID: batchedTask.storeID,
replicaRead: batchedTask.loadBasedReplicaRetry,
}
if idx, ok := b.store2Idx[key]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit {
if batchedTask.loadBasedReplicaRetry {
// If the task is dispatched to leader because all followers are busy,
// task.redirect2Replica != nil means the busy threshold shouldn't take effect again.
batchedTask.task.redirect2Replica = &batchedTask.storeID
}
b.tasks = append(b.tasks, batchedTask.task)
b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1
b.store2Idx[key] = len(b.tasks) - 1
} else {
if b.tasks[idx].batchTaskList == nil {
b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit)
Expand Down Expand Up @@ -1123,14 +1146,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch

cacheKey, cacheValue := worker.buildCacheKey(task, &copReq)

// TODO: Load-based replica read is currently not compatible with store batched tasks now.
// The batched tasks should be dispatched to their own followers, but it's not implemented yet.
// So, only enable load-based replica read when there is no batched tasks.
var busyThresholdMs uint32
if len(copReq.Tasks) == 0 {
busyThresholdMs = uint32(worker.req.StoreBusyThreshold.Milliseconds())
}
req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{
replicaRead := worker.req.ReplicaRead
req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(replicaRead), &worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel),
Priority: priorityToPB(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
Expand All @@ -1139,7 +1156,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
TaskId: worker.req.TaskID,
RequestSource: task.requestSource.GetRequestSource(),
ResourceGroupName: worker.req.ResourceGroupName,
BusyThresholdMs: busyThresholdMs,
BusyThresholdMs: uint32(task.busyThreshold.Milliseconds()),
})
if worker.req.ResourceGroupTagger != nil {
worker.req.ResourceGroupTagger(req)
Expand All @@ -1158,6 +1175,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
if len(worker.req.MatchStoreLabels) > 0 {
ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels))
}
if task.redirect2Replica != nil {
req.ReplicaRead = true
req.ReplicaReadType = options.GetTiKVReplicaReadType(kv.ReplicaReadFollower)
ops = append(ops, tikv.WithMatchStores([]uint64{*task.redirect2Replica}))
}
resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...)
err = derr.ToTiDBErr(err)
if err != nil {
Expand Down Expand Up @@ -1304,13 +1326,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if err != nil {
return remains, err
}
return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp.GetBatchResponses(), task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp, task, ch)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp, task, ch)
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1346,18 +1368,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
return nil, err
}

batchResps := resp.pbResp.BatchResponses
pbResp := resp.pbResp
worker.sendToRespCh(resp, ch, true)
return worker.handleBatchCopResponse(bo, rpcCtx, batchResps, task.batchTaskList, ch)
return worker.handleBatchCopResponse(bo, rpcCtx, pbResp, task.batchTaskList, ch)
}

func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, resp *coprocessor.Response, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(task.batchTaskList) == 0 {
return remains, nil
}
batchedTasks := task.batchTaskList
task.batchTaskList = nil
batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, batchResp, batchedTasks, ch)
batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, resp, batchedTasks, ch)
if err != nil {
return nil, err
}
Expand All @@ -1366,18 +1388,21 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *

// handle the batched cop response.
// tasks will be changed, so the input tasks should not be used after calling this function.
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, batchResps []*coprocessor.StoreBatchTaskResponse,
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *coprocessor.Response,
tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) (remainTasks []*copTask, err error) {
if len(tasks) == 0 {
return nil, nil
}
batchedNum := len(tasks)
busyThresholdFallback := false
defer func() {
if err != nil {
return
}
worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks)))
worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks)))
if !busyThresholdFallback {
worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks)))
worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks)))
}
}()
appendRemainTasks := func(tasks ...*copTask) {
if remainTasks == nil {
Expand All @@ -1393,6 +1418,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
Addr: rpcCtx.Addr,
}
}
batchResps := resp.GetBatchResponses()
for _, batchResp := range batchResps {
taskID := batchResp.GetTaskId()
batchedTask, ok := tasks[taskID]
Expand Down Expand Up @@ -1463,7 +1489,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
}
for _, t := range tasks {
task := t.task
// when the error is generated by client, response is empty, skip warning for this case.
// when the error is generated by client or a load-based server busy,
// response is empty by design, skip warning for this case.
if len(batchResps) != 0 {
firstRangeStartKey := task.ranges.At(0).StartKey
lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey
Expand All @@ -1479,6 +1506,23 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
}
appendRemainTasks(t.task)
}
if regionErr := resp.GetRegionError(); regionErr != nil && regionErr.ServerIsBusy != nil &&
regionErr.ServerIsBusy.EstimatedWaitMs > 0 && len(remainTasks) != 0 {
if len(batchResps) == 0 {
busyThresholdFallback = true
handler := newBatchTaskBuilder(bo, worker.req, worker.store.GetRegionCache(), kv.ReplicaReadFollower)
for _, task := range remainTasks {
// do not set busy threshold again.
task.busyThreshold = 0
if err = handler.handle(task); err != nil {
return nil, err
}
}
remainTasks = handler.build()
} else {
return nil, errors.New("store batched coprocessor with server is busy error shouldn't contain responses")
}
}
return remainTasks, nil
}

Expand Down
58 changes: 52 additions & 6 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package copr

import (
"bytes"
"math"
"strconv"
"time"

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -210,11 +212,54 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store
}

// BuildBatchTask fetches store and peer info for cop task, wrap it as `batchedCopTask`.
func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) {
rpcContext, err := c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
if err != nil {
return nil, err
func (c *RegionCache) BuildBatchTask(bo *Backoffer, req *kv.Request, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) {
var (
rpcContext *tikv.RPCContext
err error
)
if replicaRead == kv.ReplicaReadFollower {
followerStoreSeed := uint32(0)
leastEstWaitTime := time.Duration(math.MaxInt64)
var (
firstFollowerPeer *uint64
followerContext *tikv.RPCContext
)
for {
followerContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), followerStoreSeed)
if err != nil {
return nil, err
}
if firstFollowerPeer == nil {
firstFollowerPeer = &rpcContext.Peer.Id
} else if *firstFollowerPeer == rpcContext.Peer.Id {
break
}
estWaitTime := followerContext.Store.EstimatedWaitTime()
// the wait time of this follower is under given threshold, choose it.
if estWaitTime > req.StoreBusyThreshold {
continue
}
if rpcContext == nil {
rpcContext = followerContext
} else if estWaitTime < leastEstWaitTime {
leastEstWaitTime = estWaitTime
rpcContext = followerContext
}
followerStoreSeed++
}
// all replicas are busy, fallback to leader.
if rpcContext == nil {
replicaRead = kv.ReplicaReadLeader
}
}

if replicaRead == kv.ReplicaReadLeader {
rpcContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0)
if err != nil {
return nil, err
}
}

// fallback to non-batch path
if rpcContext == nil {
return nil, nil
Expand All @@ -229,7 +274,8 @@ func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead k
},
Ranges: task.ranges.ToPBRanges(),
},
storeID: rpcContext.Store.StoreID(),
peer: rpcContext.Peer,
storeID: rpcContext.Store.StoreID(),
peer: rpcContext.Peer,
loadBasedReplicaRetry: replicaRead != kv.ReplicaReadLeader,
}, nil
}

0 comments on commit 3f8b0cc

Please sign in to comment.