Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: support keep-order store batch coprocessor #40071

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,3 +633,53 @@ func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) {
tk.MustExec(`set @a=0x61219F79C90D3541F70E, @b=5501707547099269248, @c=0xEC43EFD30131DEA2CB8B, @d="呣丼蒢咿卻鹻铴础湜僂頃dž縍套衞陀碵碼幓9", @e="鹹楞睕堚尛鉌翡佾搁紟精廬姆燵藝潐楻翇慸嵊";`)
tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`)
}

func TestCoprocessorBatchByStore(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, c1 int, c2 int, key i(c1))")
for i := 0; i < 10; i++ {
tk.MustExec("insert into t values(?, ?, ?)", i*10000, i*10000, i%2)
}
tk.MustQuery("split table t between (0) and (100000) regions 2").Check(testkit.Rows("2 1"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/setRangesPerTask", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/setRangesPerTask"))
}()
ranges := []string{
"(c1 >= 0 and c1 < 5000)",
"(c1 >= 10000 and c1 < 15000)",
"(c1 >= 20000 and c1 < 25000)",
"(c1 >= 30000 and c1 < 35000)",
"(c1 >= 40000 and c1 < 45000)",
"(c1 >= 50000 and c1 < 55000)",
"(c1 >= 60000 and c1 < 65000)",
"(c1 >= 70000 and c1 < 75000)",
"(c1 >= 80000 and c1 < 85000)",
"(c1 >= 90000 and c1 < 95000)",
}
baseSQL := fmt.Sprintf("select * from t force index(i) where id < 100000 and (%s)", strings.Join(ranges, " or "))
evenRows := testkit.Rows("0 0 0", "20000 20000 0", "40000 40000 0", "60000 60000 0", "80000 80000 0")
oddRows := testkit.Rows("10000 10000 1", "30000 30000 1", "50000 50000 1", "70000 70000 1", "90000 90000 1")
reverseOddRows := testkit.Rows("90000 90000 1", "70000 70000 1", "50000 50000 1", "30000 30000 1", "10000 10000 1")
for _, paging := range []string{"on", "off"} {
tk.MustExec("set session tidb_enable_paging=?", paging)
for size := 0; size < 10; size++ {
tk.MustExec("set session tidb_store_batch_size=?", size)
tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows)
tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows)
// every batched task will get region error and fallback.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/batchCopRegionError", "return"))
tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows)
tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/batchCopRegionError"))
}
}
}
1 change: 1 addition & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
Expand Down
73 changes: 73 additions & 0 deletions store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import (
"github.com/tikv/client-go/v2/testutils"
)

func flattenKeyRanges(ranges *copr.KeyRanges) []kv.KeyRange {
ret := make([]kv.KeyRange, 0, ranges.Len())
ranges.Do(func(ran *kv.KeyRange) {
ret = append(ret, *ran)
})
return ret
}

func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
Expand Down Expand Up @@ -130,6 +138,7 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
require.Equal(t, tasks[0].RowCountHint, 5)
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
require.Equal(t, tasks[1].RowCountHint, 9)
require.Nil(t, it.GetHeadTask())

req = &kv.Request{
Tp: kv.ReqTypeDAG,
Expand All @@ -144,6 +153,7 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
require.Equal(t, len(tasks), 1)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 3)
require.Equal(t, tasks[0].RowCountHint, 14)
require.Nil(t, it.GetHeadTask())

// paging will disable store batch.
req = &kv.Request{
Expand All @@ -166,4 +176,67 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
require.Nil(t, errRes)
tasks = it.GetTasks()
require.Equal(t, len(tasks), 4)
require.Nil(t, it.GetHeadTask())

// keep order
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 1,
KeepOrder: true,
Desc: false,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks = it.GetTasks()
require.Equal(t, len(tasks), 2)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1)
require.Equal(t, tasks[0].RowCountHint, 5)
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
require.Equal(t, tasks[1].RowCountHint, 9)
taskRanges := [][]kv.KeyRange{
copr.BuildKeyRanges("a", "c", "d", "e"),
copr.BuildKeyRanges("h", "n"),
copr.BuildKeyRanges("n", "t"),
copr.BuildKeyRanges("t", "x", "y", "z"),
}
curTask := it.GetHeadTask()
for i := 0; i < len(taskRanges); i++ {
require.Equal(t, flattenKeyRanges(curTask.Ranges()), taskRanges[i])
curTask = curTask.GetNextTask()
}
require.Nil(t, curTask)

// keep order & desc
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 1,
KeepOrder: true,
Desc: true,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
tasks = it.GetTasks()
require.Equal(t, len(tasks), 2)
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1)
require.Equal(t, tasks[0].RowCountHint, 9)
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
require.Equal(t, tasks[1].RowCountHint, 5)
taskRanges = [][]kv.KeyRange{
copr.BuildKeyRanges("t", "x", "y", "z"),
copr.BuildKeyRanges("n", "t"),
copr.BuildKeyRanges("h", "n"),
copr.BuildKeyRanges("a", "c", "d", "e"),
}
curTask = it.GetHeadTask()
for i := 0; i < len(taskRanges); i++ {
require.Equal(t, flattenKeyRanges(curTask.Ranges()), taskRanges[i])
curTask = curTask.GetNextTask()
}
require.Nil(t, curTask)
}
Loading