From 3f8b0ccd81374a27a466db2e934737503905b30c Mon Sep 17 00:00:00 2001 From: you06 Date: Sun, 19 Mar 2023 23:44:40 +0800 Subject: [PATCH] store/copr: batch replica read (#42237) close pingcap/tidb#42322 --- DEPS.bzl | 4 +- go.mod | 2 +- go.sum | 4 +- store/copr/coprocessor.go | 128 +++++++++++++++++++++++++------------ store/copr/region_cache.go | 58 +++++++++++++++-- 5 files changed, 143 insertions(+), 53 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index fb383faefa2ff..47507e569bc42 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4101,8 +4101,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:KFKjXBwDCfmPyNjMgNE2YAS+ZtwryVSYSlCSNhzpbig=", - version = "v2.0.7-0.20230316080603-d19741b3ed77", + sum = "h1:m5Y7tBW5Rq8L1ANxibitBa/DInDy3hA2Qvk1Ys9u1NU=", + version = "v2.0.7-0.20230317032622-884a634378d4", ) go_repository( name = "com_github_tikv_pd", diff --git a/go.mod b/go.mod index 6bfda67b3b1c3..923aa4358200c 100644 --- a/go.mod +++ b/go.mod @@ -92,7 +92,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77 + github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4 github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243 github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index 4f5c21bee31e4..1635e1477725e 100644 --- a/go.sum +++ b/go.sum @@ -941,8 +941,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77 h1:KFKjXBwDCfmPyNjMgNE2YAS+ZtwryVSYSlCSNhzpbig= -github.com/tikv/client-go/v2 v2.0.7-0.20230316080603-d19741b3ed77/go.mod h1:DPL03G+QwLmypNjDIl+B02UltorBMx3WzSh4yJbp+cw= +github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4 h1:m5Y7tBW5Rq8L1ANxibitBa/DInDy3hA2Qvk1Ys9u1NU= +github.com/tikv/client-go/v2 v2.0.7-0.20230317032622-884a634378d4/go.mod h1:DPL03G+QwLmypNjDIl+B02UltorBMx3WzSh4yJbp+cw= github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243 h1:CYU+awkq5ykKyWV2e2Z+qtRveWMttV4N3r0lyk/z4/M= github.com/tikv/pd/client v0.0.0-20230316082839-7a0ce101c243/go.mod h1:N2QHc05Vll8CofXQor47lpW5d22WDosFC8WPVx9BsbU= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo= diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 228796343ecd9..0493c101e92d5 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -259,13 +259,19 @@ type copTask struct { requestSource util.RequestSource RowCountHint int // used for extra concurrency of small tasks, -1 for unknown row count batchTaskList map[uint64]*batchedCopTask + + // when this task is batched and the leader's wait duration exceeds the load-based threshold, + // we set this field to the target replica store ID and redirect the request to the replica. + redirect2Replica *uint64 + busyThreshold time.Duration } type batchedCopTask struct { - task *copTask - region coprocessor.RegionInfo - storeID uint64 - peer *metapb.Peer + task *copTask + region coprocessor.RegionInfo + storeID uint64 + peer *metapb.Peer + loadBasedReplicaRetry bool } func (r *copTask) String() string { @@ -339,7 +345,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c var builder taskBuilder if req.StoreBatchSize > 0 && hints != nil { - builder = newBatchTaskBuilder(bo, req, cache) + builder = newBatchTaskBuilder(bo, req, cache, req.ReplicaRead) } else { builder = newLegacyTaskBuilder(len(locs)) } @@ -389,6 +395,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c pagingSize: pagingSize, requestSource: req.RequestSource, RowCountHint: hint, + busyThreshold: req.StoreBusyThreshold, } // only keep-order need chan inside task. // tasks by region error will reuse the channel of parent task. @@ -466,25 +473,32 @@ func (b *legacyTaskBuilder) build() []*copTask { return b.tasks } +type storeReplicaKey struct { + storeID uint64 + replicaRead bool +} + type batchStoreTaskBuilder struct { - bo *Backoffer - req *kv.Request - cache *RegionCache - taskID uint64 - limit int - store2Idx map[uint64]int - tasks []*copTask + bo *Backoffer + req *kv.Request + cache *RegionCache + taskID uint64 + limit int + store2Idx map[storeReplicaKey]int + tasks []*copTask + replicaRead kv.ReplicaReadType } -func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder { +func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache, replicaRead kv.ReplicaReadType) *batchStoreTaskBuilder { return &batchStoreTaskBuilder{ - bo: bo, - req: req, - cache: cache, - taskID: 0, - limit: req.StoreBatchSize, - store2Idx: make(map[uint64]int, 16), - tasks: make([]*copTask, 0, 16), + bo: bo, + req: req, + cache: cache, + taskID: 0, + limit: req.StoreBatchSize, + store2Idx: make(map[storeReplicaKey]int, 16), + tasks: make([]*copTask, 0, 16), + replicaRead: replicaRead, } } @@ -502,16 +516,25 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { if b.limit <= 0 || !isSmallTask(task) { return nil } - batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead) + batchedTask, err := b.cache.BuildBatchTask(b.bo, b.req, task, b.replicaRead) if err != nil { return err } if batchedTask == nil { return nil } - if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit { + key := storeReplicaKey{ + storeID: batchedTask.storeID, + replicaRead: batchedTask.loadBasedReplicaRetry, + } + if idx, ok := b.store2Idx[key]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit { + if batchedTask.loadBasedReplicaRetry { + // If the task is dispatched to leader because all followers are busy, + // task.redirect2Replica != nil means the busy threshold shouldn't take effect again. + batchedTask.task.redirect2Replica = &batchedTask.storeID + } b.tasks = append(b.tasks, batchedTask.task) - b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1 + b.store2Idx[key] = len(b.tasks) - 1 } else { if b.tasks[idx].batchTaskList == nil { b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit) @@ -1123,14 +1146,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch cacheKey, cacheValue := worker.buildCacheKey(task, &copReq) - // TODO: Load-based replica read is currently not compatible with store batched tasks now. - // The batched tasks should be dispatched to their own followers, but it's not implemented yet. - // So, only enable load-based replica read when there is no batched tasks. - var busyThresholdMs uint32 - if len(copReq.Tasks) == 0 { - busyThresholdMs = uint32(worker.req.StoreBusyThreshold.Milliseconds()) - } - req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{ + replicaRead := worker.req.ReplicaRead + req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(replicaRead), &worker.replicaReadSeed, kvrpcpb.Context{ IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), Priority: priorityToPB(worker.req.Priority), NotFillCache: worker.req.NotFillCache, @@ -1139,7 +1156,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch TaskId: worker.req.TaskID, RequestSource: task.requestSource.GetRequestSource(), ResourceGroupName: worker.req.ResourceGroupName, - BusyThresholdMs: busyThresholdMs, + BusyThresholdMs: uint32(task.busyThreshold.Milliseconds()), }) if worker.req.ResourceGroupTagger != nil { worker.req.ResourceGroupTagger(req) @@ -1158,6 +1175,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch if len(worker.req.MatchStoreLabels) > 0 { ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels)) } + if task.redirect2Replica != nil { + req.ReplicaRead = true + req.ReplicaReadType = options.GetTiKVReplicaReadType(kv.ReplicaReadFollower) + ops = append(ops, tikv.WithMatchStores([]uint64{*task.redirect2Replica})) + } resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...) err = derr.ToTiDBErr(err) if err != nil { @@ -1304,13 +1326,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R if err != nil { return remains, err } - return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp.GetBatchResponses(), task, ch) + return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp, task, ch) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { if err := worker.handleLockErr(bo, lockErr, task); err != nil { return nil, err } - return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) + return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp, task, ch) } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -1346,18 +1368,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, err } - batchResps := resp.pbResp.BatchResponses + pbResp := resp.pbResp worker.sendToRespCh(resp, ch, true) - return worker.handleBatchCopResponse(bo, rpcCtx, batchResps, task.batchTaskList, ch) + return worker.handleBatchCopResponse(bo, rpcCtx, pbResp, task.batchTaskList, ch) } -func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, resp *coprocessor.Response, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { if len(task.batchTaskList) == 0 { return remains, nil } batchedTasks := task.batchTaskList task.batchTaskList = nil - batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, batchResp, batchedTasks, ch) + batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, resp, batchedTasks, ch) if err != nil { return nil, err } @@ -1366,18 +1388,21 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx * // handle the batched cop response. // tasks will be changed, so the input tasks should not be used after calling this function. -func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, batchResps []*coprocessor.StoreBatchTaskResponse, +func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *coprocessor.Response, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) (remainTasks []*copTask, err error) { if len(tasks) == 0 { return nil, nil } batchedNum := len(tasks) + busyThresholdFallback := false defer func() { if err != nil { return } - worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks))) - worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks))) + if !busyThresholdFallback { + worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks))) + worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks))) + } }() appendRemainTasks := func(tasks ...*copTask) { if remainTasks == nil { @@ -1393,6 +1418,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t Addr: rpcCtx.Addr, } } + batchResps := resp.GetBatchResponses() for _, batchResp := range batchResps { taskID := batchResp.GetTaskId() batchedTask, ok := tasks[taskID] @@ -1463,7 +1489,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t } for _, t := range tasks { task := t.task - // when the error is generated by client, response is empty, skip warning for this case. + // when the error is generated by client or a load-based server busy, + // response is empty by design, skip warning for this case. if len(batchResps) != 0 { firstRangeStartKey := task.ranges.At(0).StartKey lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey @@ -1479,6 +1506,23 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t } appendRemainTasks(t.task) } + if regionErr := resp.GetRegionError(); regionErr != nil && regionErr.ServerIsBusy != nil && + regionErr.ServerIsBusy.EstimatedWaitMs > 0 && len(remainTasks) != 0 { + if len(batchResps) == 0 { + busyThresholdFallback = true + handler := newBatchTaskBuilder(bo, worker.req, worker.store.GetRegionCache(), kv.ReplicaReadFollower) + for _, task := range remainTasks { + // do not set busy threshold again. + task.busyThreshold = 0 + if err = handler.handle(task); err != nil { + return nil, err + } + } + remainTasks = handler.build() + } else { + return nil, errors.New("store batched coprocessor with server is busy error shouldn't contain responses") + } + } return remainTasks, nil } diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index aa33656c39cca..b6465ef8aefd0 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -16,7 +16,9 @@ package copr import ( "bytes" + "math" "strconv" + "time" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" @@ -210,11 +212,54 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store } // BuildBatchTask fetches store and peer info for cop task, wrap it as `batchedCopTask`. -func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) { - rpcContext, err := c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0) - if err != nil { - return nil, err +func (c *RegionCache) BuildBatchTask(bo *Backoffer, req *kv.Request, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) { + var ( + rpcContext *tikv.RPCContext + err error + ) + if replicaRead == kv.ReplicaReadFollower { + followerStoreSeed := uint32(0) + leastEstWaitTime := time.Duration(math.MaxInt64) + var ( + firstFollowerPeer *uint64 + followerContext *tikv.RPCContext + ) + for { + followerContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), followerStoreSeed) + if err != nil { + return nil, err + } + if firstFollowerPeer == nil { + firstFollowerPeer = &rpcContext.Peer.Id + } else if *firstFollowerPeer == rpcContext.Peer.Id { + break + } + estWaitTime := followerContext.Store.EstimatedWaitTime() + // the wait time of this follower is under given threshold, choose it. + if estWaitTime > req.StoreBusyThreshold { + continue + } + if rpcContext == nil { + rpcContext = followerContext + } else if estWaitTime < leastEstWaitTime { + leastEstWaitTime = estWaitTime + rpcContext = followerContext + } + followerStoreSeed++ + } + // all replicas are busy, fallback to leader. + if rpcContext == nil { + replicaRead = kv.ReplicaReadLeader + } + } + + if replicaRead == kv.ReplicaReadLeader { + rpcContext, err = c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0) + if err != nil { + return nil, err + } } + // fallback to non-batch path if rpcContext == nil { return nil, nil @@ -229,7 +274,8 @@ func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead k }, Ranges: task.ranges.ToPBRanges(), }, - storeID: rpcContext.Store.StoreID(), - peer: rpcContext.Peer, + storeID: rpcContext.Store.StoreID(), + peer: rpcContext.Peer, + loadBasedReplicaRetry: replicaRead != kv.ReplicaReadLeader, }, nil }