diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 2e9e254838c8b..439c6ecd8e7fe 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -131,17 +131,16 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon // SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles // "handles" to "KeyRanges" firstly. func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder { - var keyRanges []kv.KeyRange - keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles) - builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) + keyRanges, hints := TableHandlesToKVRanges(tid, handles) + builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints) return builder } // SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges. // handles in slice must be kv.PartitionHandle. func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder { - keyRanges := PartitionHandlesToKVRanges(handles) - builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) + keyRanges, hints := PartitionHandlesToKVRanges(handles) + builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints) return builder } @@ -194,6 +193,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui return builder } +// SetKeyRangesWithHints sets "KeyRanges" for "kv.Request" with row count hints. +func (builder *RequestBuilder) SetKeyRangesWithHints(keyRanges []kv.KeyRange, hints []int) *RequestBuilder { + builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints) + return builder +} + // SetWrappedKeyRanges sets "KeyRanges" for "kv.Request". func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder { builder.Request.KeyRanges = keyRanges @@ -551,7 +556,7 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc // For continuous handles, we should merge them to a single key range. func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) { krs := make([]kv.KeyRange, 0, len(handles)) - hint := make([]int, 0, len(handles)) + hints := make([]int, 0, len(handles)) i := 0 for i < len(handles) { if commonHandle, ok := handles[i].(*kv.CommonHandle); ok { @@ -560,7 +565,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()), } krs = append(krs, ran) - hint = append(hint, 1) + hints = append(hints, 1) i++ continue } @@ -576,16 +581,17 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in startKey := tablecodec.EncodeRowKey(tid, low) endKey := tablecodec.EncodeRowKey(tid, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) - hint = append(hint, j-i) + hints = append(hints, j-i) i = j } - return krs, hint + return krs, hints } // PartitionHandlesToKVRanges convert ParitionHandles to kv ranges. // Handle in slices must be kv.PartitionHandle -func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { +func PartitionHandlesToKVRanges(handles []kv.Handle) ([]kv.KeyRange, []int) { krs := make([]kv.KeyRange, 0, len(handles)) + hints := make([]int, 0, len(handles)) i := 0 for i < len(handles) { ph := handles[i].(kv.PartitionHandle) @@ -597,6 +603,7 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { EndKey: tablecodec.EncodeRowKey(pid, append(commonHandle.Encoded(), 0)), } krs = append(krs, ran) + hints = append(hints, 1) i++ continue } @@ -615,9 +622,10 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { startKey := tablecodec.EncodeRowKey(pid, low) endKey := tablecodec.EncodeRowKey(pid, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) + hints = append(hints, j-i) i = j } - return krs + return krs, hints } // IndexRangesToKVRanges converts index ranges to "KeyRange". diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 781d43df6096e..74bdf723216f1 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -61,10 +61,11 @@ func TestTableHandlesToKVRanges(t *testing.T) { // Build key ranges. expect := getExpectedRanges(1, hrs) - actual, _ := TableHandlesToKVRanges(1, handles) + actual, hints := TableHandlesToKVRanges(1, handles) // Compare key ranges and expected key ranges. require.Equal(t, len(expect), len(actual)) + require.Equal(t, hints, []int{1, 4, 2, 1, 2}) for i := range actual { require.Equal(t, expect[i].StartKey, actual[i].StartKey) require.Equal(t, expect[i].EndKey, actual[i].EndKey) @@ -378,7 +379,7 @@ func TestRequestBuilder3(t *testing.T) { Tp: 103, StartTs: 0x0, Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0}, - KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{ + KeyRanges: kv.NewNonParitionedKeyRangesWithHint([]kv.KeyRange{ { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, @@ -395,17 +396,16 @@ func TestRequestBuilder3(t *testing.T) { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65}, }, - }), - Cacheable: true, - KeepOrder: false, - Desc: false, - Concurrency: variable.DefDistSQLScanConcurrency, - IsolationLevel: 0, - Priority: 0, - NotFillCache: false, - ReplicaRead: kv.ReplicaReadLeader, - ReadReplicaScope: kv.GlobalReplicaScope, - FixedRowCountHint: []int{1, 4, 2, 1}, + }, []int{1, 4, 2, 1}), + Cacheable: true, + KeepOrder: false, + Desc: false, + Concurrency: variable.DefDistSQLScanConcurrency, + IsolationLevel: 0, + Priority: 0, + NotFillCache: false, + ReplicaRead: kv.ReplicaReadLeader, + ReadReplicaScope: kv.GlobalReplicaScope, } expect.Paging.MinPagingSize = paging.MinPagingSize expect.Paging.MaxPagingSize = paging.MaxPagingSize diff --git a/executor/builder.go b/executor/builder.go index 3e95368fdafa1..310e6920edacf 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4220,13 +4220,13 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte continue } handle := kv.IntHandle(content.keys[0].GetInt64()) - tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle}) - kvRanges = append(kvRanges, tmp...) + ranges, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle}) + kvRanges = append(kvRanges, ranges...) } } else { for _, p := range usedPartitionList { - tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles) - kvRanges = append(kvRanges, tmp...) + ranges, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles) + kvRanges = append(kvRanges, ranges...) } } diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 4420a714e96cf..65889a10d0377 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -633,3 +633,61 @@ func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) { tk.MustExec(`set @a=0x61219F79C90D3541F70E, @b=5501707547099269248, @c=0xEC43EFD30131DEA2CB8B, @d="呣丼蒢咿卻鹻铴础湜僂頃dž縍套衞陀碵碼幓9", @e="鹹楞睕堚尛鉌翡佾搁紟精廬姆燵藝潐楻翇慸嵊";`) tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`) } + +func TestCoprocessorBatchByStore(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t, t1") + tk.MustExec("create table t(id int primary key, c1 int, c2 int, key i(c1))") + tk.MustExec(`create table t1(id int primary key, c1 int, c2 int, key i(c1)) partition by range(id) ( + partition p0 values less than(10000), + partition p1 values less than (50000), + partition p2 values less than (100000))`) + for i := 0; i < 10; i++ { + tk.MustExec("insert into t values(?, ?, ?)", i*10000, i*10000, i%2) + tk.MustExec("insert into t1 values(?, ?, ?)", i*10000, i*10000, i%2) + } + tk.MustQuery("split table t between (0) and (100000) regions 20").Check(testkit.Rows("20 1")) + tk.MustQuery("split table t1 between (0) and (100000) regions 20").Check(testkit.Rows("60 1")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/setRangesPerTask", "return(1)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/setRangesPerTask")) + }() + ranges := []string{ + "(c1 >= 0 and c1 < 5000)", + "(c1 >= 10000 and c1 < 15000)", + "(c1 >= 20000 and c1 < 25000)", + "(c1 >= 30000 and c1 < 35000)", + "(c1 >= 40000 and c1 < 45000)", + "(c1 >= 50000 and c1 < 55000)", + "(c1 >= 60000 and c1 < 65000)", + "(c1 >= 70000 and c1 < 75000)", + "(c1 >= 80000 and c1 < 85000)", + "(c1 >= 90000 and c1 < 95000)", + } + evenRows := testkit.Rows("0 0 0", "20000 20000 0", "40000 40000 0", "60000 60000 0", "80000 80000 0") + oddRows := testkit.Rows("10000 10000 1", "30000 30000 1", "50000 50000 1", "70000 70000 1", "90000 90000 1") + reverseOddRows := testkit.Rows("90000 90000 1", "70000 70000 1", "50000 50000 1", "30000 30000 1", "10000 10000 1") + for _, table := range []string{"t", "t1"} { + baseSQL := fmt.Sprintf("select * from %s force index(i) where id < 100000 and (%s)", table, strings.Join(ranges, " or ")) + for _, paging := range []string{"on", "off"} { + tk.MustExec("set session tidb_enable_paging=?", paging) + for size := 0; size < 10; size++ { + tk.MustExec("set session tidb_store_batch_size=?", size) + tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows) + tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows) + // every batched task will get region error and fallback. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/batchCopRegionError", "return")) + tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows) + tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows) + tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/batchCopRegionError")) + } + } + } +} diff --git a/kv/kv.go b/kv/kv.go index 3b4e1eef3cdc8..9239cc514c7b2 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -354,25 +354,41 @@ func (t StoreType) Name() string { // KeyRanges wrap the ranges for partitioned table cases. // We might send ranges from different in the one request. type KeyRanges struct { - ranges [][]KeyRange + ranges [][]KeyRange + rowCountHints [][]int isPartitioned bool } // NewPartitionedKeyRanges constructs a new RequestRange for partitioned table. func NewPartitionedKeyRanges(ranges [][]KeyRange) *KeyRanges { + return NewPartitionedKeyRangesWithHints(ranges, nil) +} + +// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table. +func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges { + return NewNonParitionedKeyRangesWithHint(ranges, nil) +} + +// NewPartitionedKeyRangesWithHints constructs a new RequestRange for partitioned table with row count hint. +func NewPartitionedKeyRangesWithHints(ranges [][]KeyRange, hints [][]int) *KeyRanges { return &KeyRanges{ ranges: ranges, + rowCountHints: hints, isPartitioned: true, } } -// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table. -func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges { - return &KeyRanges{ +// NewNonParitionedKeyRangesWithHint constructs a new RequestRange for a non partitioned table with rou count hint. +func NewNonParitionedKeyRangesWithHint(ranges []KeyRange, hints []int) *KeyRanges { + rr := &KeyRanges{ ranges: [][]KeyRange{ranges}, isPartitioned: false, } + if hints != nil { + rr.rowCountHints = [][]int{hints} + } + return rr } // FirstPartitionRange returns the the result of first range. @@ -430,9 +446,13 @@ func (rr *KeyRanges) SortByFunc(sortFunc func(i, j KeyRange) bool) { } // ForEachPartitionWithErr runs the func for each partition with an error check. -func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange) error) (err error) { +func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange, []int) error) (err error) { for i := range rr.ranges { - err = theFunc(rr.ranges[i]) + var hints []int + if len(rr.rowCountHints) > i { + hints = rr.rowCountHints[i] + } + err = theFunc(rr.ranges[i], hints) if err != nil { return err } @@ -549,8 +569,6 @@ type Request struct { } // RequestSource indicates whether the request is an internal request. RequestSource util.RequestSource - // FixedRowCountHint is the optimization hint for copr request for task scheduling. - FixedRowCountHint []int // StoreBatchSize indicates the batch size of coprocessor in the same store. StoreBatchSize int // ResourceGroupName is the name of the bind resource group. diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index a7cdd81453fd7..66c62b8e367e5 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/mpp", @@ -77,6 +78,7 @@ go_test( "//store/driver/backoff", "//testkit/testsetup", "//util/paging", + "//util/trxevents", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_kvproto//pkg/mpp", diff --git a/store/copr/copr_test/coprocessor_test.go b/store/copr/copr_test/coprocessor_test.go index a54c5048e12cb..7931fb8432675 100644 --- a/store/copr/copr_test/coprocessor_test.go +++ b/store/copr/copr_test/coprocessor_test.go @@ -41,11 +41,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { vars := kv.NewVariables(&killed) opt := &kv.ClientSendOption{} + ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req := &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, copr.CopSmallTaskRow}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, copr.CopSmallTaskRow}), + Concurrency: 15, } it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -55,11 +55,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { require.Equal(t, smallConc, 1) require.Equal(t, rateLimit.GetCapacity(), 2) + ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -70,11 +70,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { require.Equal(t, rateLimit.GetCapacity(), 3) // cross-region long range + ranges = copr.BuildKeyRanges("a", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")), - FixedRowCountHint: []int{10}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{10}), + Concurrency: 15, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -84,11 +84,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { require.Equal(t, smallConc, 2) require.Equal(t, rateLimit.GetCapacity(), 3) + ranges = copr.BuildKeyRanges("a", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")), - FixedRowCountHint: []int{copr.CopSmallTaskRow + 1}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{copr.CopSmallTaskRow + 1}), + Concurrency: 15, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -115,12 +115,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { vars := kv.NewVariables(&killed) opt := &kv.ClientSendOption{} + ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req := &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, - StoreBatchSize: 1, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, + StoreBatchSize: 1, } it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -131,12 +131,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1) require.Equal(t, tasks[1].RowCountHint, 9) + ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, - StoreBatchSize: 3, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, + StoreBatchSize: 3, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -146,12 +146,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Equal(t, tasks[0].RowCountHint, 14) // paging will disable store batch. + ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, - StoreBatchSize: 3, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, + StoreBatchSize: 3, Paging: struct { Enable bool MinPagingSize uint64 diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index f446912ef33af..eca0b8037daa6 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/domain/infosync" @@ -113,7 +114,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars if req.StoreType == kv.TiDB { // coprocessor on TiDB doesn't support paging req.Paging.Enable = false - req.FixedRowCountHint = nil } if req.Tp != kv.ReqTypeDAG { // coprocessor request but type is not DAG @@ -126,13 +126,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } } }) - if req.RequestSource.RequestSourceInternal || req.Tp != kv.ReqTypeDAG { - // disable extra concurrency for internal tasks. - req.FixedRowCountHint = nil - } - failpoint.Inject("disableFixedRowCountHint", func(_ failpoint.Value) { - req.FixedRowCountHint = nil - }) if req.Tp != kv.ReqTypeDAG || req.StoreType != kv.TiKV { req.StoreBatchSize = 0 } @@ -151,9 +144,19 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars tasks []*copTask err error ) - buildTaskFunc := func(ranges []kv.KeyRange) error { + tryRowHint := optRowHint(req) + buildOpt := &buildCopTaskOpt{ + req: req, + cache: c.store.GetRegionCache(), + eventCb: eventCb, + respChan: req.KeepOrder, + } + buildTaskFunc := func(ranges []kv.KeyRange, hints []int) error { keyRanges := NewKeyRanges(ranges) - tasksFromRanges, err := buildCopTasks(bo, c.store.GetRegionCache(), keyRanges, req, eventCb) + if tryRowHint { + buildOpt.rowHints = hints + } + tasksFromRanges, err := buildCopTasks(bo, keyRanges, buildOpt) if err != nil { return err } @@ -195,7 +198,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars if it.concurrency > len(tasks) { it.concurrency = len(tasks) } - if req.FixedRowCountHint != nil { + if tryRowHint { var smallTasks int smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks) if len(tasks)-smallTasks < it.concurrency { @@ -294,14 +297,33 @@ func (r *copTask) ToPBBatchTasks() []*coprocessor.StoreBatchTask { // rangesPerTask limits the length of the ranges slice sent in one copTask. const rangesPerTask = 25000 -func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { +type buildCopTaskOpt struct { + req *kv.Request + cache *RegionCache + eventCb trxevents.EventCallback + respChan bool + rowHints []int +} + +func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*copTask, error) { + req, cache, eventCb, hints := opt.req, opt.cache, opt.eventCb, opt.rowHints start := time.Now() cmdType := tikvrpc.CmdCop if req.StoreType == kv.TiDB { return buildTiDBMemCopTasks(ranges, req) } - rangesLen := ranges.Len() + // something went wrong, disable hints to avoid out of range index. + if hints != nil && len(hints) != rangesLen { + hints = nil + } + + rangesPerTaskLimit := rangesPerTask + failpoint.Inject("setRangesPerTask", func(val failpoint.Value) { + if v, ok := val.(int); ok { + rangesPerTaskLimit = v + } + }) // TODO(youjiali1995): is there any request type that needn't be splitted by buckets? locs, err := cache.SplitKeyRangesByBuckets(bo, ranges) @@ -335,34 +357,33 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv pagingSize = req.Paging.MinPagingSize } for i := 0; i < rLen; { - nextI := mathutil.Min(i+rangesPerTask, rLen) + nextI := mathutil.Min(i+rangesPerTaskLimit, rLen) hint := -1 // calculate the row count hint - if req.FixedRowCountHint != nil { - startKey, endKey := loc.Ranges.At(i).StartKey, loc.Ranges.At(nextI-1).EndKey + if hints != nil { + startKey, endKey := loc.Ranges.RefAt(i).StartKey, loc.Ranges.RefAt(nextI-1).EndKey // move to the previous range if startKey of current range is lower than endKey of previous location. // In the following example, task1 will move origRangeIdx to region(i, z). // When counting the row hint for task2, we need to move origRangeIdx back to region(a, h). // |<- region(a, h) ->| |<- region(i, z) ->| // |<- task1 ->| |<- task2 ->| ... - if origRangeIdx > 0 && ranges.At(origRangeIdx-1).EndKey.Cmp(startKey) > 0 { + if origRangeIdx > 0 && ranges.RefAt(origRangeIdx-1).EndKey.Cmp(startKey) > 0 { origRangeIdx-- } hint = 0 for nextOrigRangeIdx := origRangeIdx; nextOrigRangeIdx < ranges.Len(); nextOrigRangeIdx++ { - rangeStart := ranges.At(nextOrigRangeIdx).StartKey + rangeStart := ranges.RefAt(nextOrigRangeIdx).StartKey if rangeStart.Cmp(endKey) > 0 { origRangeIdx = nextOrigRangeIdx break } - hint += req.FixedRowCountHint[nextOrigRangeIdx] + hint += hints[nextOrigRangeIdx] } } task := &copTask{ region: loc.Location.Region, bucketsVer: loc.getBucketVersion(), ranges: loc.Ranges.Slice(i, nextI), - respChan: make(chan *copResponse, chanSize), cmdType: cmdType, storeType: req.StoreType, eventCb: eventCb, @@ -371,6 +392,11 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv requestSource: req.RequestSource, RowCountHint: hint, } + // only keep-order need chan inside task. + // tasks by region error will reuse the channel of parent task. + if req.KeepOrder && opt.respChan { + task.respChan = make(chan *copResponse, chanSize) + } if err = builder.handle(task); err != nil { return nil, err } @@ -721,7 +747,9 @@ func (worker *copIteratorWorker) run(ctx context.Context) { // there is a task finished. worker.sendToRespCh(finCopResp, worker.respChan, false) } - close(task.respChan) + if task.respChan != nil { + close(task.respChan) + } if worker.finished() { return } @@ -1232,7 +1260,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. - remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + remains, err := buildCopTasks(bo, task.ranges, &buildCopTaskOpt{ + req: worker.req, + cache: worker.store.GetRegionCache(), + respChan: false, + eventCb: task.eventCb, + }) if err != nil { return remains, err } @@ -1374,13 +1407,21 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp }, } task := batchedTask.task + failpoint.Inject("batchCopRegionError", func() { + batchResp.RegionError = &errorpb.Error{} + }) if regionErr := batchResp.GetRegionError(); regionErr != nil { errStr := fmt.Sprintf("region_id:%v, region_ver:%v, store_type:%s, peer_addr:%s, error:%s", task.region.GetID(), task.region.GetVer(), task.storeType.Name(), task.storeAddr, regionErr.String()) if err := bo.Backoff(tikv.BoRegionMiss(), errors.New(errStr)); err != nil { return nil, errors.Trace(err) } - remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + remains, err := buildCopTasks(bo, task.ranges, &buildCopTaskOpt{ + req: worker.req, + cache: worker.store.GetRegionCache(), + respChan: false, + eventCb: task.eventCb, + }) if err != nil { return nil, err } @@ -1814,3 +1855,18 @@ func BuildKeyRanges(keys ...string) []kv.KeyRange { } return ranges } + +func optRowHint(req *kv.Request) bool { + opt := true + if req.StoreType == kv.TiDB { + return false + } + if req.RequestSource.RequestSourceInternal || req.Tp != kv.ReqTypeDAG { + // disable extra concurrency for internal tasks. + return false + } + failpoint.Inject("disableFixedRowCountHint", func(_ failpoint.Value) { + opt = false + }) + return opt +} diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 4f47bf454d270..36ae88758bbc5 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -22,11 +22,21 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/util/paging" + "github.com/pingcap/tidb/util/trxevents" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" ) +func buildTestCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { + return buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + eventCb: eventCb, + respChan: true, + }) +} + func TestBuildTasksWithoutBuckets(t *testing.T) { // nil --- 'g' --- 'n' --- 't' --- nil // <- 0 -> <- 1 -> <- 2 -> <- 3 -> @@ -50,49 +60,49 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { req := &kv.Request{} flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") taskEqual(t, tasks[1], regionIDs[1], 0, "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") taskEqual(t, tasks[1], regionIDs[1], 0, "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil) require.NoError(t, err) require.Len(t, tasks, 4) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") @@ -100,7 +110,7 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { taskEqual(t, tasks[2], regionIDs[2], 0, "n", "t") taskEqual(t, tasks[3], regionIDs[3], 0, "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 4) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") @@ -108,45 +118,45 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { taskEqual(t, tasks[2], regionIDs[2], 0, "n", "t") taskEqual(t, tasks[3], regionIDs[3], 0, "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "h", "k", "m", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "n", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "h", "k", "m", "n") @@ -191,7 +201,7 @@ func TestBuildTasksByBuckets(t *testing.T) { } for _, regionRange := range regionRanges { regionID, ranges := regionRange.regionID, regionRange.ranges - tasks, err := buildCopTasks(bo, cache, buildCopRanges(ranges...), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges(ranges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(ranges)/2) for i, task := range tasks { @@ -204,7 +214,7 @@ func TestBuildTasksByBuckets(t *testing.T) { for _, regionRange := range regionRanges { allRanges = append(allRanges, regionRange.ranges...) } - tasks, err := buildCopTasks(bo, cache, buildCopRanges(allRanges...), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges(allRanges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(allRanges)/2) taskIdx := 0 @@ -230,7 +240,7 @@ func TestBuildTasksByBuckets(t *testing.T) { "h", "i", "j", "k", "k", "l", "m", "n", } - tasks, err = buildCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(keyRanges)/4) for i, task := range tasks { @@ -251,7 +261,7 @@ func TestBuildTasksByBuckets(t *testing.T) { {"c", "d", "e", "g"}, {"g", "h", "i", "j"}, } - tasks, err = buildCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -277,7 +287,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'q'}, {'r'}, {'t'}, {'u'}, {'v'}, {'x'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "o", "p", "q", "s", "w"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "o", "p", "q", "s", "w"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -301,7 +311,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'q'}, {'s'}, {'u'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "o", "p", "s", "t", "v", "w", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "o", "p", "s", "t", "v", "w", "x"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -321,7 +331,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'g'}, {'t'}, {'z'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("o", "p", "u", "w"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("o", "p", "u", "w"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -343,7 +353,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'q'}, {'r'}, {'x'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "x"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -432,7 +442,7 @@ func TestRebuild(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "m") @@ -446,7 +456,7 @@ func TestRebuild(t *testing.T) { cache.InvalidateCachedRegion(tasks[1].region) req.Desc = true - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) require.NoError(t, err) require.Len(t, tasks, 3) taskEqual(t, tasks[2], regionIDs[0], 0, "a", "m") @@ -501,7 +511,7 @@ func TestBuildPagingTasks(t *testing.T) { req.Paging.MinPagingSize = paging.MinPagingSize flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) require.Len(t, tasks, 1) @@ -674,8 +684,12 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - req.FixedRowCountHint = []int{1, 1, 3, CopSmallTaskRow} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req, nil) + ranges := buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") + tasks, err := buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{1, 1, 3, CopSmallTaskRow}, + }) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"c", "d"-"e"] @@ -689,8 +703,12 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { _, conc := smallTaskConcurrency(tasks) require.Equal(t, conc, 1) - req.FixedRowCountHint = []int{1, 1, 3, 3} - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req, nil) + ranges = buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") + tasks, err = buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{1, 1, 3, 3}, + }) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"c", "d"-"e"] @@ -705,8 +723,12 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, conc, 2) // cross-region long range - req.FixedRowCountHint = []int{10} - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + ranges = buildCopRanges("a", "z") + tasks, err = buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{10}, + }) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"g"] diff --git a/store/copr/key_ranges.go b/store/copr/key_ranges.go index 86dcf036fed4e..67effbbc8b7a1 100644 --- a/store/copr/key_ranges.go +++ b/store/copr/key_ranges.go @@ -58,18 +58,23 @@ func (r *KeyRanges) Len() int { return l } -// At returns the range at the ith position. -func (r *KeyRanges) At(i int) kv.KeyRange { +// RefAt returns the reference at the ith position without copy. +func (r *KeyRanges) RefAt(i int) *kv.KeyRange { if r.first != nil { if i == 0 { - return *r.first + return r.first } i-- } if i < len(r.mid) { - return r.mid[i] + return &r.mid[i] } - return *r.last + return r.last +} + +// At returns the range at the ith position. +func (r *KeyRanges) At(i int) kv.KeyRange { + return *r.RefAt(i) } // Slice returns the sub ranges [from, to). diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index 5b9ce619694b4..d3163d887d6a9 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -564,7 +564,7 @@ func (svr *Server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeReques // SQL push down commands. // Coprocessor implements the tikvpb.TikvServer interface. -func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { +func (svr *Server) Coprocessor(ctx context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { reqCtx, err := newRequestCtx(svr, req.Context, "Coprocessor") if err != nil { return &coprocessor.Response{OtherError: convertToKeyError(err).String()}, nil @@ -573,7 +573,58 @@ func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*co if reqCtx.regErr != nil { return &coprocessor.Response{RegionError: reqCtx.regErr}, nil } - return cophandler.HandleCopRequest(reqCtx.getDBReader(), svr.mvccStore.lockStore, req), nil + resp := cophandler.HandleCopRequest(reqCtx.getDBReader(), svr.mvccStore.lockStore, req) + resp.BatchResponses = svr.StoreBatchCoprocessor(ctx, req) + return resp, nil +} + +// StoreBatchCoprocessor handle batched tasks in the same store. +func (svr *Server) StoreBatchCoprocessor(ctx context.Context, req *coprocessor.Request) []*coprocessor.StoreBatchTaskResponse { + if len(req.Tasks) == 0 { + return nil + } + tasks := req.Tasks + batchResps := make([]*coprocessor.StoreBatchTaskResponse, 0, len(tasks)) + handleBatchResp := func(task *coprocessor.StoreBatchTask) { + var err error + batchResp := &coprocessor.StoreBatchTaskResponse{ + TaskId: task.TaskId, + } + defer func() { + if err != nil { + batchResp.OtherError = err.Error() + } + batchResps = append(batchResps, batchResp) + }() + bytes, err := req.Marshal() + if err != nil { + return + } + taskReq := &coprocessor.Request{} + // deep clone req + if err = taskReq.Unmarshal(bytes); err != nil { + return + } + taskReq.Tasks = nil + taskReq.IsCacheEnabled = false + taskReq.Ranges = task.Ranges + taskReq.Context.RegionId = task.RegionId + taskReq.Context.RegionEpoch = task.RegionEpoch + taskReq.Context.Peer = task.Peer + resp, err := svr.Coprocessor(ctx, taskReq) + if err != nil { + return + } + batchResp.RegionError = resp.RegionError + batchResp.Locked = resp.Locked + batchResp.OtherError = resp.OtherError + batchResp.ExecDetailsV2 = resp.ExecDetailsV2 + batchResp.Data = resp.Data + } + for _, task := range tasks { + handleBatchResp(task) + } + return batchResps } // CoprocessorStream implements the tikvpb.TikvServer interface. diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index b072c5ec6ce83..b3726a414fa26 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1734,7 +1734,7 @@ func IndexKVIsUnique(value []byte) bool { // VerifyTableIDForRanges verifies that all given ranges are valid to decode the table id. func VerifyTableIDForRanges(keyRanges *kv.KeyRanges) ([]int64, error) { tids := make([]int64, 0, keyRanges.PartitionNum()) - collectFunc := func(ranges []kv.KeyRange) error { + collectFunc := func(ranges []kv.KeyRange, _ []int) error { if len(ranges) == 0 { return nil }