diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index a4cea914c3aa9..390c5ffe8e63a 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -140,7 +140,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars // disable batch copr for follower read req.StoreBatchSize = 0 } - // disable paging for batch copr + // disable batch copr when paging is enabled. if req.Paging.Enable { req.StoreBatchSize = 0 } @@ -315,13 +315,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv chanSize = 18 } - tasks := make([]*copTask, 0, len(locs)) - origRangeIdx := 0 - taskID := uint64(0) - var store2Idx map[uint64]int + var builder taskBuilder if req.StoreBatchSize > 0 { - store2Idx = make(map[uint64]int, 16) + builder = newBatchTaskBuilder(bo, req, cache) + } else { + builder = newLegacyTaskBuilder(len(locs)) } + origRangeIdx := 0 for _, loc := range locs { // TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice // to make sure the message can be sent successfully. @@ -357,7 +357,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv } } task := &copTask{ - taskID: taskID, region: loc.Location.Region, bucketsVer: loc.getBucketVersion(), ranges: loc.Ranges.Slice(i, nextI), @@ -370,50 +369,138 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv requestSource: req.RequestSource, RowCountHint: hint, } - if req.StoreBatchSize > 0 { - batchedTask, err := cache.BuildBatchTask(bo, task, req.ReplicaRead) - if err != nil { - return nil, err - } - if idx, ok := store2Idx[batchedTask.storeID]; !ok || len(tasks[idx].batchTaskList) >= req.StoreBatchSize { - tasks = append(tasks, batchedTask.task) - store2Idx[batchedTask.storeID] = len(tasks) - 1 - } else { - if tasks[idx].batchTaskList == nil { - tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, req.StoreBatchSize) - // disable paging for batched task. - tasks[idx].paging = false - tasks[idx].pagingSize = 0 - } - if task.RowCountHint > 0 { - tasks[idx].RowCountHint += task.RowCountHint - } - tasks[idx].batchTaskList[taskID] = batchedTask - } - } else { - tasks = append(tasks, task) + if err = builder.handle(task); err != nil { + return nil, err } i = nextI if req.Paging.Enable { pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) } - taskID++ } } if req.Desc { - reverseTasks(tasks) + builder.reverse() } + tasks := builder.build() if elapsed := time.Since(start); elapsed > time.Millisecond*500 { logutil.BgLogger().Warn("buildCopTasks takes too much time", zap.Duration("elapsed", elapsed), zap.Int("range len", rangesLen), zap.Int("task len", len(tasks))) } - metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks))) + metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) return tasks, nil } +type taskBuilder interface { + handle(*copTask) error + reverse() + build() []*copTask + regionNum() int +} + +type legacyTaskBuilder struct { + tasks []*copTask +} + +func newLegacyTaskBuilder(hint int) *legacyTaskBuilder { + return &legacyTaskBuilder{ + tasks: make([]*copTask, 0, hint), + } +} + +func (b *legacyTaskBuilder) handle(task *copTask) error { + b.tasks = append(b.tasks, task) + return nil +} + +func (b *legacyTaskBuilder) regionNum() int { + return len(b.tasks) +} + +func (b *legacyTaskBuilder) reverse() { + reverseTasks(b.tasks) +} + +func (b *legacyTaskBuilder) build() []*copTask { + return b.tasks +} + +type batchStoreTaskBuilder struct { + bo *Backoffer + req *kv.Request + cache *RegionCache + taskID uint64 + limit int + store2Idx map[uint64]int + tasks []*copTask +} + +func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder { + return &batchStoreTaskBuilder{ + bo: bo, + req: req, + cache: cache, + taskID: 0, + limit: req.StoreBatchSize, + store2Idx: make(map[uint64]int, 16), + tasks: make([]*copTask, 0, 16), + } +} + +func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { + b.taskID++ + task.taskID = b.taskID + handled := false + defer func() { + if !handled && err == nil { + // fallback to non-batch way. It's mainly caused by region miss. + b.tasks = append(b.tasks, task) + } + }() + if b.limit <= 0 { + return nil + } + batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead) + if err != nil { + return err + } + if batchedTask == nil { + return nil + } + if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit { + b.tasks = append(b.tasks, batchedTask.task) + b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1 + } else { + if b.tasks[idx].batchTaskList == nil { + b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit) + // disable paging for batched task. + b.tasks[idx].paging = false + b.tasks[idx].pagingSize = 0 + } + if task.RowCountHint > 0 { + b.tasks[idx].RowCountHint += task.RowCountHint + } + b.tasks[idx].batchTaskList[task.taskID] = batchedTask + } + handled = true + return nil +} + +func (b *batchStoreTaskBuilder) regionNum() int { + // we allocate b.taskID for each region task, so the final b.taskID is equal to the related region number. + return int(b.taskID) +} + +func (b *batchStoreTaskBuilder) reverse() { + reverseTasks(b.tasks) +} + +func (b *batchStoreTaskBuilder) build() []*copTask { + return b.tasks +} + func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) { servers, err := infosync.GetAllServerInfo(context.Background()) if err != nil { @@ -1138,13 +1225,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R if err != nil { return remains, err } - return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch) + return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { if err := worker.handleLockErr(bo, lockErr, task); err != nil { return nil, err } - return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch) + return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -1250,16 +1337,26 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains } // handle the batched cop response. +// tasks will be changed, so the input tasks should not be used after calling this function. func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) { if len(tasks) == 0 { return nil, nil } var remainTasks []*copTask + appendRemainTasks := func(tasks ...*copTask) { + if remainTasks == nil { + // allocate size fo remain length + remainTasks = make([]*copTask, 0, len(tasks)) + } + remainTasks = append(remainTasks, tasks...) + } for _, batchResp := range batchResps { - batchedTask, ok := tasks[batchResp.GetTaskId()] + taskID := batchResp.GetTaskId() + batchedTask, ok := tasks[taskID] if !ok { return nil, errors.Errorf("task id %d not found", batchResp.GetTaskId()) } + delete(tasks, taskID) resp := &copResponse{ pbResp: &coprocessor.Response{ Data: batchResp.Data, @@ -1276,7 +1373,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp if err != nil { return nil, err } - remainTasks = append(remainTasks, remains...) + appendRemainTasks(remains...) continue } //TODO: handle locks in batch @@ -1284,7 +1381,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp if err := worker.handleLockErr(bo, resp.pbResp.GetLocked(), task); err != nil { return nil, err } - remainTasks = append(remainTasks, task) + appendRemainTasks(task) continue } if otherErr := batchResp.GetOtherError(); otherErr != "" { @@ -1312,6 +1409,24 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp // TODO: check OOM worker.sendToRespCh(resp, ch, false) } + for _, t := range tasks { + task := t.task + // when the error is generated by client, response is empty, skip warning for this case. + if len(batchResps) != 0 { + firstRangeStartKey := task.ranges.At(0).StartKey + lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey + logutil.Logger(bo.GetCtx()).Error("response of batched task missing", + zap.Uint64("id", task.taskID), + zap.Uint64("txnStartTS", worker.req.StartTs), + zap.Uint64("regionID", task.region.GetID()), + zap.Uint64("bucketsVer", task.bucketsVer), + zap.Int("rangeNums", task.ranges.Len()), + zap.ByteString("firstRangeStartKey", firstRangeStartKey), + zap.ByteString("lastRangeEndKey", lastRangeEndKey), + zap.String("storeAddr", task.storeAddr)) + } + appendRemainTasks(t.task) + } return remainTasks, nil } diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index a3fd20e036d43..97c3d705c223b 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -18,7 +18,6 @@ import ( "bytes" "strconv" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -210,8 +209,9 @@ func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead k if err != nil { return nil, err } + // fallback to non-batch path if rpcContext == nil { - return nil, errors.Errorf("region %s missing", task.region.String()) + return nil, nil } return &batchedCopTask{ task: task,