From 76f9e80bf3e8ddf74cacaf8cecbf93fdb0dd7878 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Mon, 6 Feb 2023 22:55:38 +0800 Subject: [PATCH 1/5] x --- distsql/request_builder.go | 1 + kv/kv.go | 2 ++ store/copr/coprocessor.go | 5 +++++ 3 files changed, 8 insertions(+) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 439c6ecd8e7fe..8d0fae3a464f1 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -159,6 +159,7 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild if limit != nil && limit.Limit < estimatedRegionRowCount { builder.Request.Concurrency = 1 } + builder.Request.LimitSize = limit.GetLimit() } return builder } diff --git a/kv/kv.go b/kv/kv.go index 9239cc514c7b2..057c22fb7312b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -573,6 +573,8 @@ type Request struct { StoreBatchSize int // ResourceGroupName is the name of the bind resource group. ResourceGroupName string + // LimitSize indicates whether the request is scan and limit + LimitSize uint64 } // CoprRequestAdjuster is used to check and adjust a copr request according to specific rules. diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index eca0b8037daa6..64a4d2a720c06 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -402,6 +402,11 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } i = nextI if req.Paging.Enable { + if req.LimitSize != 0 && req.LimitSize < pagingSize { + // disable paging for small limit. + task.paging = false + task.pagingSize = 0 + } pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) } } From eed1f5c3d04126f9a68ea333223c1ea7d3ba80fa Mon Sep 17 00:00:00 2001 From: Zhigao Tong Date: Tue, 7 Feb 2023 09:09:54 +0800 Subject: [PATCH 2/5] more tests Signed-off-by: Zhigao Tong --- store/copr/coprocessor_test.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index f7b15ebfd682d..35c6238bac277 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -514,8 +514,6 @@ func TestBuildPagingTasks(t *testing.T) { req := &kv.Request{} req.Paging.Enable = true req.Paging.MinPagingSize = paging.MinPagingSize - flashReq := &kv.Request{} - flashReq.StoreType = kv.TiFlash tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) @@ -525,6 +523,36 @@ func TestBuildPagingTasks(t *testing.T) { require.Equal(t, tasks[0].pagingSize, paging.MinPagingSize) } +func TestBuildPagingTasksDisablePagingForSmallLimit(t *testing.T) { + mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) + + req := &kv.Request{} + req.Paging.Enable = true + req.Paging.MinPagingSize = paging.MinPagingSize + req.LimitSize = 1 + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + require.NoError(t, err) + require.Len(t, tasks, 1) + require.Len(t, tasks, 1) + require.False(t, tasks[0].paging) + require.Equal(t, tasks[0].pagingSize, uint64(0)) +} + func toCopRange(r kv.KeyRange) *coprocessor.KeyRange { coprRange := coprocessor.KeyRange{} coprRange.Start = r.StartKey From d3e15711b437184975000df284d7551d2dd304e2 Mon Sep 17 00:00:00 2001 From: Zhigao Tong Date: Tue, 7 Feb 2023 09:20:29 +0800 Subject: [PATCH 3/5] 1 Signed-off-by: Zhigao Tong --- store/copr/coprocessor_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 35c6238bac277..c94d441932d8c 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -531,7 +531,7 @@ func TestBuildPagingTasksDisablePagingForSmallLimit(t *testing.T) { err = mockClient.Close() require.NoError(t, err) }() - testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + _, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) defer pdCli.Close() @@ -549,6 +549,7 @@ func TestBuildPagingTasksDisablePagingForSmallLimit(t *testing.T) { require.NoError(t, err) require.Len(t, tasks, 1) require.Len(t, tasks, 1) + taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") require.False(t, tasks[0].paging) require.Equal(t, tasks[0].pagingSize, uint64(0)) } From 33911b2234481eafaa01c02ae0120828bf68ad8d Mon Sep 17 00:00:00 2001 From: Zhigao Tong Date: Tue, 7 Feb 2023 09:25:53 +0800 Subject: [PATCH 4/5] 2 Signed-off-by: Zhigao Tong --- store/copr/coprocessor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 64a4d2a720c06..cf1018bce84f7 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -406,8 +406,9 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c // disable paging for small limit. task.paging = false task.pagingSize = 0 + } else { + pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) } - pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) } } } From 869fbd998c5b5581c81be297d363870f00117e11 Mon Sep 17 00:00:00 2001 From: Zhigao Tong Date: Tue, 7 Feb 2023 09:44:54 +0800 Subject: [PATCH 5/5] 3 Signed-off-by: Zhigao Tong --- distsql/request_builder_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 74bdf723216f1..1de6b11dd5244 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -676,6 +676,7 @@ func TestScanLimitConcurrency(t *testing.T) { Build() require.NoError(t, err) require.Equal(t, tt.concurrency, actual.Concurrency) + require.Equal(t, actual.LimitSize, tt.limit) }) } }