diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 4420a714e96cf..893870e45d256 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -633,3 +633,53 @@ func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) { tk.MustExec(`set @a=0x61219F79C90D3541F70E, @b=5501707547099269248, @c=0xEC43EFD30131DEA2CB8B, @d="呣丼蒢咿卻鹻铴础湜僂頃dž縍套衞陀碵碼幓9", @e="鹹楞睕堚尛鉌翡佾搁紟精廬姆燵藝潐楻翇慸嵊";`) tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`) } + +func TestCoprocessorBatchByStore(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(id int primary key, c1 int, c2 int, key i(c1))") + for i := 0; i < 10; i++ { + tk.MustExec("insert into t values(?, ?, ?)", i*10000, i*10000, i%2) + } + tk.MustQuery("split table t between (0) and (100000) regions 2").Check(testkit.Rows("2 1")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/setRangesPerTask", "return(1)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/setRangesPerTask")) + }() + ranges := []string{ + "(c1 >= 0 and c1 < 5000)", + "(c1 >= 10000 and c1 < 15000)", + "(c1 >= 20000 and c1 < 25000)", + "(c1 >= 30000 and c1 < 35000)", + "(c1 >= 40000 and c1 < 45000)", + "(c1 >= 50000 and c1 < 55000)", + "(c1 >= 60000 and c1 < 65000)", + "(c1 >= 70000 and c1 < 75000)", + "(c1 >= 80000 and c1 < 85000)", + "(c1 >= 90000 and c1 < 95000)", + } + baseSQL := fmt.Sprintf("select * from t force index(i) where id < 100000 and (%s)", strings.Join(ranges, " or ")) + evenRows := testkit.Rows("0 0 0", "20000 20000 0", "40000 40000 0", "60000 60000 0", "80000 80000 0") + oddRows := testkit.Rows("10000 10000 1", "30000 30000 1", "50000 50000 1", "70000 70000 1", "90000 90000 1") + reverseOddRows := testkit.Rows("90000 90000 1", "70000 70000 1", "50000 50000 1", "30000 30000 1", "10000 10000 1") + for _, paging := range []string{"on", "off"} { + tk.MustExec("set session tidb_enable_paging=?", paging) + for size := 0; size < 10; size++ { + tk.MustExec("set session tidb_store_batch_size=?", size) + tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows) + tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows) + // every batched task will get region error and fallback. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/batchCopRegionError", "return")) + tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows) + tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/batchCopRegionError")) + } + } +} diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index eb3eb2f016424..443726ae9b30b 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/mpp", diff --git a/store/copr/copr_test/coprocessor_test.go b/store/copr/copr_test/coprocessor_test.go index a54c5048e12cb..bcedb6f2438e0 100644 --- a/store/copr/copr_test/coprocessor_test.go +++ b/store/copr/copr_test/coprocessor_test.go @@ -25,6 +25,14 @@ import ( "github.com/tikv/client-go/v2/testutils" ) +func flattenKeyRanges(ranges *copr.KeyRanges) []kv.KeyRange { + ret := make([]kv.KeyRange, 0, ranges.Len()) + ranges.Do(func(ran *kv.KeyRange) { + ret = append(ret, *ran) + }) + return ret +} + func TestBuildCopIteratorWithRowCountHint(t *testing.T) { // nil --- 'g' --- 'n' --- 't' --- nil // <- 0 -> <- 1 -> <- 2 -> <- 3 -> @@ -130,6 +138,7 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Equal(t, tasks[0].RowCountHint, 5) require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1) require.Equal(t, tasks[1].RowCountHint, 9) + require.Nil(t, it.GetHeadTask()) req = &kv.Request{ Tp: kv.ReqTypeDAG, @@ -144,6 +153,7 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Equal(t, len(tasks), 1) require.Equal(t, len(tasks[0].ToPBBatchTasks()), 3) require.Equal(t, tasks[0].RowCountHint, 14) + require.Nil(t, it.GetHeadTask()) // paging will disable store batch. req = &kv.Request{ @@ -166,4 +176,67 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Nil(t, errRes) tasks = it.GetTasks() require.Equal(t, len(tasks), 4) + require.Nil(t, it.GetHeadTask()) + + // keep order + req = &kv.Request{ + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), + FixedRowCountHint: []int{1, 1, 3, 3}, + Concurrency: 15, + StoreBatchSize: 1, + KeepOrder: true, + Desc: false, + } + it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) + require.Nil(t, errRes) + tasks = it.GetTasks() + require.Equal(t, len(tasks), 2) + require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1) + require.Equal(t, tasks[0].RowCountHint, 5) + require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1) + require.Equal(t, tasks[1].RowCountHint, 9) + taskRanges := [][]kv.KeyRange{ + copr.BuildKeyRanges("a", "c", "d", "e"), + copr.BuildKeyRanges("h", "n"), + copr.BuildKeyRanges("n", "t"), + copr.BuildKeyRanges("t", "x", "y", "z"), + } + curTask := it.GetHeadTask() + for i := 0; i < len(taskRanges); i++ { + require.Equal(t, flattenKeyRanges(curTask.Ranges()), taskRanges[i]) + curTask = curTask.GetNextTask() + } + require.Nil(t, curTask) + + // keep order & desc + req = &kv.Request{ + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), + FixedRowCountHint: []int{1, 1, 3, 3}, + Concurrency: 15, + StoreBatchSize: 1, + KeepOrder: true, + Desc: true, + } + it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) + require.Nil(t, errRes) + tasks = it.GetTasks() + require.Equal(t, len(tasks), 2) + require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1) + require.Equal(t, tasks[0].RowCountHint, 9) + require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1) + require.Equal(t, tasks[1].RowCountHint, 5) + taskRanges = [][]kv.KeyRange{ + copr.BuildKeyRanges("t", "x", "y", "z"), + copr.BuildKeyRanges("n", "t"), + copr.BuildKeyRanges("h", "n"), + copr.BuildKeyRanges("a", "c", "d", "e"), + } + curTask = it.GetHeadTask() + for i := 0; i < len(taskRanges); i++ { + require.Equal(t, flattenKeyRanges(curTask.Ranges()), taskRanges[i]) + curTask = curTask.GetNextTask() + } + require.Nil(t, curTask) } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 390c5ffe8e63a..ffd613431d4df 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "sort" "strconv" "strings" "sync" @@ -29,6 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/domain/infosync" @@ -135,8 +137,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars if req.Tp != kv.ReqTypeDAG || req.StoreType != kv.TiKV { req.StoreBatchSize = 0 } - // TODO: support keep-order batch - if req.ReplicaRead != kv.ReplicaReadLeader || req.KeepOrder { + if req.ReplicaRead != kv.ReplicaReadLeader { // disable batch copr for follower read req.StoreBatchSize = 0 } @@ -150,9 +151,22 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars tasks []*copTask err error ) + buildOpt := &buildCopTaskOpt{ + req: req, + cache: c.store.GetRegionCache(), + respChan: req.KeepOrder, + eventCb: eventCb, + requireToken: true, + } + var headTask *copTask buildTaskFunc := func(ranges []kv.KeyRange) error { keyRanges := NewKeyRanges(ranges) - tasksFromRanges, err := buildCopTasks(bo, c.store.GetRegionCache(), keyRanges, req, eventCb) + head, tasksFromRanges, err := buildCopTasks(bo, keyRanges, buildOpt) + // If req.Desc, always replace headTask with the latest returned head, + // else, read from the first head. + if headTask == nil || req.Desc { + headTask = head + } if err != nil { return err } @@ -163,12 +177,12 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars tasks = append(tasks, tasksFromRanges...) return nil } + // Here we build the task by partition, not directly by region. // This is because it's possible that TiDB merge multiple small partition into one region which break some assumption. // Keep it split by partition would be more safe. err = req.KeyRanges.ForEachPartitionWithErr(buildTaskFunc) // only batch store requests in first build. - req.StoreBatchSize = 0 reqType := "null" if req.ClosestReplicaReadAdjuster != nil { reqType = "miss" @@ -185,12 +199,13 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars req: req, concurrency: req.Concurrency, finishCh: make(chan struct{}), + headTask: headTask, + tasks: tasks, vars: vars, memTracker: req.MemTracker, replicaReadSeed: c.replicaReadSeed, rpcCancel: tikv.NewRPCanceller(), } - it.tasks = tasks if it.concurrency > len(tasks) { it.concurrency = len(tasks) } @@ -230,9 +245,14 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars it.sendRate = util.NewRateLimit(2 * (it.concurrency + it.smallTaskConcurrency)) it.respChan = nil } else { - it.respChan = make(chan *copResponse) + if req.StoreBatchSize > 0 { + it.respChan = make(chan *copResponse, 1+req.StoreBatchSize) + } else { + it.respChan = make(chan *copResponse) + } it.sendRate = util.NewRateLimit(it.concurrency + it.smallTaskConcurrency) } + req.StoreBatchSize = 0 it.actionOnExceed = newRateLimitAction(uint(it.sendRate.GetCapacity())) return it, nil } @@ -258,6 +278,9 @@ type copTask struct { requestSource util.RequestSource RowCountHint int // used for extra concurrency of small tasks, -1 for unknown row count batchTaskList map[uint64]*batchedCopTask + requireToken bool + // in keep-order mode, read copTasks like a linked list. + nextTask *copTask } type batchedCopTask struct { @@ -272,6 +295,10 @@ func (r *copTask) String() string { r.region.GetID(), r.region.GetConfVer(), r.region.GetVer(), r.ranges.Len(), r.storeAddr) } +func (r *copTask) Ranges() *KeyRanges { + return r.ranges +} + func (r *copTask) ToPBBatchTasks() []*coprocessor.StoreBatchTask { if len(r.batchTaskList) == 0 { return nil @@ -289,26 +316,93 @@ func (r *copTask) ToPBBatchTasks() []*coprocessor.StoreBatchTask { return pbTasks } -// rangesPerTask limits the length of the ranges slice sent in one copTask. -const rangesPerTask = 25000 +// ReorderRespChan inserts the split tasks into read linked list. +// It's possible a task meets region split and got two responses, in this case, the split tasks will be inserted into the read linked list. +/* + ┌───────┐ ┌───────┐ ┌───────┐ + │ task1 ├─►│ task2 ├─►│ task3 │ + └───────┘ └───────┘ └───────┘ + │ │ + task2 split into task2.1 and task2.2 + │ │ + ▼ ▼ + ┌───────┐ ┌───────┐ ┌───────┐ + │ task1 ├─►│ task2 │ │ task3 │ + └───────┘ ├───────┘ └─▲─────┘ + │ │ + │ │ + ┌───────▼─┐ ┌──────┴──┐ + │ task2.1 ├──►│ task2.2 │ + └─────────┘ └─────────┘ +*/ +func (r *copTask) ReorderRespChan(tasks []*copTask) { + if r.respChan == nil || len(tasks) == 0 { + return + } + for i := 0; i < len(tasks); i++ { + if i == len(tasks)-1 { + // the token will be put back when the last task(task2.2 in the figure) is done. + if r.requireToken { + tasks[i].requireToken = true + } + tasks[i].nextTask = r.nextTask + } else { + tasks[i].nextTask = tasks[i+1].nextTask + } + } + r.nextTask = tasks[0] + r.requireToken = false + // the read goroutine still wait on r.respChan, close it to read from the next task. + close(r.respChan) +} + +func (r *copTask) GetNextTask() *copTask { + return r.nextTask +} + +// RangesPerTask limits the length of the ranges slice sent in one copTask. +const RangesPerTask = 25000 + +type buildCopTaskOpt struct { + req *kv.Request + cache *RegionCache + eventCb trxevents.EventCallback + respChan bool + requireToken bool +} -func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { +// buildCopTasks accepts the given ranges and build the coprocessor tasks, it returns the head task, all tasks slice and error. +// Head task is not nil when opt.respChan = true, which means the tasks should read in keep-order mode. +// If head task is set, you should read from it first, then iterate over copTask.nextTask. +// Details of keep-order reading with split tasks is in copTask.ReorderRespChan. +func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) (*copTask, []*copTask, error) { + req, cache, eventCb := opt.req, opt.cache, opt.eventCb start := time.Now() cmdType := tikvrpc.CmdCop if req.StoreType == kv.TiDB { - return buildTiDBMemCopTasks(ranges, req) + tasks, err := buildTiDBMemCopTasks(ranges, req) + return nil, tasks, err } - rangesLen := ranges.Len() + rangesPerTask := RangesPerTask + failpoint.Inject("setRangesPerTask", func(val failpoint.Value) { + if v, ok := val.(int); ok { + rangesPerTask = v + } + }) // TODO(youjiali1995): is there any request type that needn't be splitted by buckets? locs, err := cache.SplitKeyRangesByBuckets(bo, ranges) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } // Channel buffer is 2 for handling region split. // In a common case, two region split tasks will not be blocked. chanSize := 2 + // Channel buffer is 1 when keep-order, because split tasks will use new channel. + if req.KeepOrder { + chanSize = 1 + } // in paging request, a request will be returned in multi batches, // enlarge the channel size to avoid the request blocked by buffer full. if req.Paging.Enable { @@ -322,6 +416,10 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv builder = newLegacyTaskBuilder(len(locs)) } origRangeIdx := 0 + var ( + headTask *copTask + beforeTask *copTask + ) for _, loc := range locs { // TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice // to make sure the message can be sent successfully. @@ -360,7 +458,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv region: loc.Location.Region, bucketsVer: loc.getBucketVersion(), ranges: loc.Ranges.Slice(i, nextI), - respChan: make(chan *copResponse, chanSize), cmdType: cmdType, storeType: req.StoreType, eventCb: eventCb, @@ -368,11 +465,26 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv pagingSize: pagingSize, requestSource: req.RequestSource, RowCountHint: hint, + requireToken: opt.requireToken, + } + if opt.respChan { + task.respChan = make(chan *copResponse, chanSize) + if req.Desc { + task.nextTask = beforeTask + headTask = task + } else { + if beforeTask != nil { + beforeTask.nextTask = task + } else { + headTask = task + } + } } if err = builder.handle(task); err != nil { - return nil, err + return nil, nil, err } i = nextI + beforeTask = task if req.Paging.Enable { pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) } @@ -390,7 +502,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv zap.Int("task len", len(tasks))) } metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) - return tasks, nil + return headTask, tasks, nil } type taskBuilder interface { @@ -482,6 +594,8 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { if task.RowCountHint > 0 { b.tasks[idx].RowCountHint += task.RowCountHint } + // it's batched, so the token is not required. + batchedTask.task.requireToken = false b.tasks[idx].batchTaskList[task.taskID] = batchedTask } handled = true @@ -565,10 +679,9 @@ type copIterator struct { smallTaskConcurrency int finishCh chan struct{} - // If keepOrder, results are stored in copTask.respChan, read them out one by one. - tasks []*copTask - // curr indicates the curr id of the finished copTask - curr int + // If keepOrder, results are stored in copTask.respChan, read them from headTask out one by one. + headTask *copTask + tasks []*copTask // sendRate controls the sending rate of copIteratorTaskSender sendRate *util.RateLimit @@ -711,7 +824,6 @@ func (worker *copIteratorWorker) run(ctx context.Context) { // there is a task finished. worker.sendToRespCh(finCopResp, worker.respChan, false) } - close(task.respChan) if worker.finished() { return } @@ -852,6 +964,11 @@ func (it *copIterator) GetTasks() []*copTask { return it.tasks } +// GetHeadTask returns the head task when keep-order reading. +func (it *copIterator) GetHeadTask() *copTask { + return it.headTask +} + func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask, sendTo chan<- *copTask) (exit bool) { select { case sendTo <- t: @@ -914,7 +1031,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { } } }) - // If data order matters, response should be returned in the same order as copTask slice. + // If data order matters, response should be returned in the same order as copIterator.headTask link list. // Otherwise all responses are returned from a single channel. if it.respChan != nil { // Get next fetched resp from chan @@ -931,13 +1048,12 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { } } else { for { - if it.curr >= len(it.tasks) { + if it.headTask == nil { // Resp will be nil if iterator is finishCh. it.actionOnExceed.close() return nil, nil } - task := it.tasks[it.curr] - resp, ok, closed = it.recvFromRespCh(ctx, task.respChan) + resp, ok, closed = it.recvFromRespCh(ctx, it.headTask.respChan) if closed { // Close() is already called, so Next() is invalid. return nil, nil @@ -945,12 +1061,13 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { if ok { break } - it.actionOnExceed.destroyTokenIfNeeded(func() { - it.sendRate.PutToken() - }) - // Switch to next task. - it.tasks[it.curr] = nil - it.curr++ + // put back token for the tasks who acquire a token. + if it.headTask.requireToken { + it.actionOnExceed.destroyTokenIfNeeded(func() { + it.sendRate.PutToken() + }) + } + it.headTask = it.headTask.nextTask } } @@ -1221,10 +1338,17 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. - remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + _, remains, err := buildCopTasks(bo, task.ranges, &buildCopTaskOpt{ + req: worker.req, + cache: worker.store.GetRegionCache(), + eventCb: task.eventCb, + respChan: worker.req.KeepOrder, + requireToken: false, + }) if err != nil { return remains, err } + task.ReorderRespChan(remains) return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { @@ -1319,7 +1443,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } } batchResps := resp.pbResp.BatchResponses - worker.sendToRespCh(resp, ch, true) + if worker.req.KeepOrder { + worker.sendToRespCh(resp, task.respChan, true) + close(task.respChan) + } else { + worker.sendToRespCh(resp, ch, true) + } return worker.handleBatchCopResponse(bo, batchResps, task.batchTaskList, ch) } @@ -1363,16 +1492,26 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp }, } task := batchedTask.task + failpoint.Inject("batchCopRegionError", func() { + batchResp.RegionError = &errorpb.Error{} + }) if regionErr := batchResp.GetRegionError(); regionErr != nil { errStr := fmt.Sprintf("region_id:%v, region_ver:%v, store_type:%s, peer_addr:%s, error:%s", task.region.GetID(), task.region.GetVer(), task.storeType.Name(), task.storeAddr, regionErr.String()) if err := bo.Backoff(tikv.BoRegionMiss(), errors.New(errStr)); err != nil { return nil, errors.Trace(err) } - remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + _, remains, err := buildCopTasks(bo, task.ranges, &buildCopTaskOpt{ + req: worker.req, + cache: worker.store.GetRegionCache(), + eventCb: task.eventCb, + respChan: worker.req.KeepOrder, + requireToken: false, + }) if err != nil { return nil, err } + task.ReorderRespChan(remains) appendRemainTasks(remains...) continue } @@ -1407,7 +1546,12 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp return nil, errors.Trace(err) } // TODO: check OOM - worker.sendToRespCh(resp, ch, false) + if worker.req.KeepOrder { + worker.sendToRespCh(resp, task.respChan, false) + close(task.respChan) + } else { + worker.sendToRespCh(resp, ch, false) + } } for _, t := range tasks { task := t.task @@ -1427,6 +1571,13 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp } appendRemainTasks(t.task) } + desc := worker.req.Desc + sort.Slice(remainTasks, func(i, j int) bool { + if desc { + return batchResps[i].TaskId > batchResps[j].TaskId + } + return batchResps[i].TaskId < batchResps[j].TaskId + }) return remainTasks, nil } diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 7790e8f7661fc..0928c1b6a705a 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -27,6 +27,16 @@ import ( "github.com/tikv/client-go/v2/tikv" ) +func buildTestCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request) ([]*copTask, error) { + _, tasks, err := buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + eventCb: nil, + respChan: true, + }) + return tasks, err +} + func TestBuildTasksWithoutBuckets(t *testing.T) { // nil --- 'g' --- 'n' --- 't' --- nil // <- 0 -> <- 1 -> <- 2 -> <- 3 -> @@ -50,49 +60,49 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { req := &kv.Request{} flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n"), req) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("m", "n"), req) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "k"), req) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") taskEqual(t, tasks[1], regionIDs[1], 0, "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") taskEqual(t, tasks[1], regionIDs[1], 0, "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "x"), req) require.NoError(t, err) require.Len(t, tasks, 4) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") @@ -100,7 +110,7 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { taskEqual(t, tasks[2], regionIDs[2], 0, "n", "t") taskEqual(t, tasks[3], regionIDs[3], 0, "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq) require.NoError(t, err) require.Len(t, tasks, 4) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") @@ -108,45 +118,45 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { taskEqual(t, tasks[2], regionIDs[2], 0, "n", "t") taskEqual(t, tasks[3], regionIDs[3], 0, "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "h", "k", "m", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "n", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "h", "k", "m", "n") @@ -191,7 +201,7 @@ func TestBuildTasksByBuckets(t *testing.T) { } for _, regionRange := range regionRanges { regionID, ranges := regionRange.regionID, regionRange.ranges - tasks, err := buildCopTasks(bo, cache, buildCopRanges(ranges...), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges(ranges...), req) require.NoError(t, err) require.Len(t, tasks, len(ranges)/2) for i, task := range tasks { @@ -204,7 +214,7 @@ func TestBuildTasksByBuckets(t *testing.T) { for _, regionRange := range regionRanges { allRanges = append(allRanges, regionRange.ranges...) } - tasks, err := buildCopTasks(bo, cache, buildCopRanges(allRanges...), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges(allRanges...), req) require.NoError(t, err) require.Len(t, tasks, len(allRanges)/2) taskIdx := 0 @@ -230,7 +240,7 @@ func TestBuildTasksByBuckets(t *testing.T) { "h", "i", "j", "k", "k", "l", "m", "n", } - tasks, err = buildCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges(keyRanges...), req) require.NoError(t, err) require.Len(t, tasks, len(keyRanges)/4) for i, task := range tasks { @@ -251,7 +261,7 @@ func TestBuildTasksByBuckets(t *testing.T) { {"c", "d", "e", "g"}, {"g", "h", "i", "j"}, } - tasks, err = buildCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges(keyRanges...), req) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -277,7 +287,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'q'}, {'r'}, {'t'}, {'u'}, {'v'}, {'x'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "o", "p", "q", "s", "w"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "o", "p", "q", "s", "w"), req) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -301,7 +311,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'q'}, {'s'}, {'u'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "o", "p", "s", "t", "v", "w", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "o", "p", "s", "t", "v", "w", "x"), req) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -321,7 +331,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'g'}, {'t'}, {'z'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("o", "p", "u", "w"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("o", "p", "u", "w"), req) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -343,7 +353,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'q'}, {'r'}, {'x'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "x"), req) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -432,7 +442,7 @@ func TestRebuild(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "z"), req) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "m") @@ -446,7 +456,7 @@ func TestRebuild(t *testing.T) { cache.InvalidateCachedRegion(tasks[1].region) req.Desc = true - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "z"), req) require.NoError(t, err) require.Len(t, tasks, 3) taskEqual(t, tasks[2], regionIDs[0], 0, "a", "m") @@ -501,7 +511,7 @@ func TestBuildPagingTasks(t *testing.T) { req.Paging.MinPagingSize = paging.MinPagingSize flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req) require.NoError(t, err) require.Len(t, tasks, 1) require.Len(t, tasks, 1) @@ -675,7 +685,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} req.FixedRowCountHint = []int{1, 1, 3, CopSmallTaskRow} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"c", "d"-"e"] @@ -690,7 +700,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, conc, 1) req.FixedRowCountHint = []int{1, 1, 3, 3} - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"c", "d"-"e"] @@ -706,7 +716,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { // cross-region long range req.FixedRowCountHint = []int{10} - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "z"), req) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"g"] diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index f7779b51a8d69..25eaf52355118 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -556,7 +556,7 @@ func (svr *Server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeReques // SQL push down commands. // Coprocessor implements the tikvpb.TikvServer interface. -func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { +func (svr *Server) Coprocessor(ctx context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { reqCtx, err := newRequestCtx(svr, req.Context, "Coprocessor") if err != nil { return &coprocessor.Response{OtherError: convertToKeyError(err).String()}, nil @@ -565,7 +565,58 @@ func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*co if reqCtx.regErr != nil { return &coprocessor.Response{RegionError: reqCtx.regErr}, nil } - return cophandler.HandleCopRequest(reqCtx.getDBReader(), svr.mvccStore.lockStore, req), nil + resp := cophandler.HandleCopRequest(reqCtx.getDBReader(), svr.mvccStore.lockStore, req) + resp.BatchResponses = svr.StoreBatchCoprocessor(ctx, req) + return resp, nil +} + +// StoreBatchCoprocessor handle batched tasks in the same store. +func (svr *Server) StoreBatchCoprocessor(ctx context.Context, req *coprocessor.Request) []*coprocessor.StoreBatchTaskResponse { + if len(req.Tasks) == 0 { + return nil + } + tasks := req.Tasks + batchResps := make([]*coprocessor.StoreBatchTaskResponse, 0, len(tasks)) + handleBatchResp := func(task *coprocessor.StoreBatchTask) { + var err error + batchResp := &coprocessor.StoreBatchTaskResponse{ + TaskId: task.TaskId, + } + defer func() { + if err != nil { + batchResp.OtherError = err.Error() + } + batchResps = append(batchResps, batchResp) + }() + bytes, err := req.Marshal() + if err != nil { + return + } + taskReq := &coprocessor.Request{} + // deep clone req + if err = taskReq.Unmarshal(bytes); err != nil { + return + } + taskReq.Tasks = nil + taskReq.IsCacheEnabled = false + taskReq.Ranges = task.Ranges + taskReq.Context.RegionId = task.RegionId + taskReq.Context.RegionEpoch = task.RegionEpoch + taskReq.Context.Peer = task.Peer + resp, err := svr.Coprocessor(ctx, taskReq) + if err != nil { + return + } + batchResp.RegionError = resp.RegionError + batchResp.Locked = resp.Locked + batchResp.OtherError = resp.OtherError + batchResp.ExecDetailsV2 = resp.ExecDetailsV2 + batchResp.Data = resp.Data + } + for _, task := range tasks { + handleBatchResp(task) + } + return batchResps } // CoprocessorStream implements the tikvpb.TikvServer interface.