diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 09ab4094ab732..ff35135fd08b2 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -160,6 +160,7 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild if limit != nil && limit.Limit < estimatedRegionRowCount { builder.Request.Concurrency = 1 } + builder.Request.LimitSize = limit.GetLimit() } return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index fa55229e36fa5..8e35bb85d5f57 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -674,6 +674,7 @@ func TestScanLimitConcurrency(t *testing.T) { Build() require.NoError(t, err) require.Equal(t, tt.concurrency, actual.Concurrency) + require.Equal(t, actual.LimitSize, tt.limit) }) } } diff --git a/kv/kv.go b/kv/kv.go index 4c855c0938308..d84c34d1b89c4 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -543,6 +543,8 @@ type Request struct { FixedRowCountHint []int // StoreBatchSize indicates the batch size of coprocessor in the same store. StoreBatchSize int + // LimitSize indicates whether the request is scan and limit + LimitSize uint64 } // CoprRequestAdjuster is used to check and adjust a copr request according to specific rules. diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index b24140b86d368..8e42ed60ccb66 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -396,7 +396,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv } i = nextI if req.Paging.Enable { - pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) + if req.LimitSize != 0 && req.LimitSize < pagingSize { + // disable paging for small limit. + task.paging = false + task.pagingSize = 0 + } else { + pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) + } } taskID++ } diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 7790e8f7661fc..ee6f79ed3dc82 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -510,6 +510,37 @@ func TestBuildPagingTasks(t *testing.T) { require.Equal(t, tasks[0].pagingSize, paging.MinPagingSize) } +func TestBuildPagingTasksDisablePagingForSmallLimit(t *testing.T) { + mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + _, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + + pdCli := &tikv.CodecPDClient{Client: pdClient} + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) + + req := &kv.Request{} + req.Paging.Enable = true + req.Paging.MinPagingSize = paging.MinPagingSize + req.LimitSize = 1 + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + require.NoError(t, err) + require.Len(t, tasks, 1) + require.Len(t, tasks, 1) + taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") + require.False(t, tasks[0].paging) + require.Equal(t, tasks[0].pagingSize, uint64(0)) +} + func toCopRange(r kv.KeyRange) *coprocessor.KeyRange { coprRange := coprocessor.KeyRange{} coprRange.Start = r.StartKey