diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 437d8deb2bc22..111583c6ee379 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -16,16 +16,19 @@ package executor import ( "context" "fmt" + "math" "math/rand" "github.com/cznic/mathutil" . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mock" ) @@ -184,5 +187,185 @@ func defaultCtx() sessionctx.Context { ctx := mock.NewContext() ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize + ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort + ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker("", ctx.GetSessionVars().MemQuotaQuery) return ctx } + +func (s *testExecSuite) TestSortRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + groupBy []int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 10, + groupBy: []int{0}, + requiredRows: []int{1, 5, 3, 10}, + expectedRows: []int{1, 5, 3, 1}, + expectedRowsDS: []int{10, 0}, + }, + { + totalRows: 10, + groupBy: []int{0, 1}, + requiredRows: []int{1, 5, 3, 10}, + expectedRows: []int{1, 5, 3, 1}, + expectedRowsDS: []int{10, 0}, + }, + { + totalRows: maxChunkSize + 1, + groupBy: []int{0}, + requiredRows: []int{1, 5, 3, 10, maxChunkSize}, + expectedRows: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + expectedRowsDS: []int{maxChunkSize, 1, 0}, + }, + { + totalRows: 3*maxChunkSize + 1, + groupBy: []int{0}, + requiredRows: []int{1, 5, 3, 10, maxChunkSize}, + expectedRows: []int{1, 5, 3, 10, maxChunkSize}, + expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 1, 0}, + }, + } + + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := context.Background() + ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) + byItems := make([]*plannercore.ByItems, 0, len(testCase.groupBy)) + for _, groupBy := range testCase.groupBy { + col := ds.Schema().Columns[groupBy] + byItems = append(byItems, &plannercore.ByItems{Expr: col}) + } + exec := buildSortExec(sctx, byItems, ds) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(ds.checkNumNextCalled(), IsNil) + } +} + +func buildSortExec(sctx sessionctx.Context, byItems []*plannercore.ByItems, src Executor) Executor { + sortExec := SortExec{ + baseExecutor: newBaseExecutor(sctx, src.Schema(), "", src), + ByItems: byItems, + schema: src.Schema(), + } + return &sortExec +} + +func (s *testExecSuite) TestTopNRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + topNOffset int + topNCount int + groupBy []int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 10, + topNOffset: 0, + topNCount: 10, + groupBy: []int{0}, + requiredRows: []int{1, 1, 1, 1, 10}, + expectedRows: []int{1, 1, 1, 1, 6}, + expectedRowsDS: []int{10, 0}, + }, + { + totalRows: 100, + topNOffset: 15, + topNCount: 11, + groupBy: []int{0}, + requiredRows: []int{1, 1, 1, 1, 10}, + expectedRows: []int{1, 1, 1, 1, 7}, + expectedRowsDS: []int{26, 100 - 26, 0}, + }, + { + totalRows: 100, + topNOffset: 95, + topNCount: 10, + groupBy: []int{0}, + requiredRows: []int{1, 2, 3, 10}, + expectedRows: []int{1, 2, 2, 0}, + expectedRowsDS: []int{100, 0, 0}, + }, + { + totalRows: maxChunkSize + 20, + topNOffset: 1, + topNCount: 5, + groupBy: []int{0, 1}, + requiredRows: []int{1, 3, 7, 10}, + expectedRows: []int{1, 3, 1, 0}, + expectedRowsDS: []int{6, maxChunkSize, 14, 0}, + }, + { + totalRows: maxChunkSize + maxChunkSize + 20, + topNOffset: maxChunkSize + 10, + topNCount: 8, + groupBy: []int{0, 1}, + requiredRows: []int{1, 2, 3, 5, 7}, + expectedRows: []int{1, 2, 3, 2, 0}, + expectedRowsDS: []int{maxChunkSize, 18, maxChunkSize, 2, 0}, + }, + { + totalRows: maxChunkSize*5 + 10, + topNOffset: maxChunkSize*5 + 20, + topNCount: 10, + groupBy: []int{0, 1}, + requiredRows: []int{1, 2, 3}, + expectedRows: []int{0, 0, 0}, + expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, maxChunkSize, maxChunkSize, 10, 0, 0}, + }, + { + totalRows: maxChunkSize + maxChunkSize + 10, + topNOffset: 10, + topNCount: math.MaxInt64, + groupBy: []int{0, 1}, + requiredRows: []int{1, 2, 3, maxChunkSize, maxChunkSize}, + expectedRows: []int{1, 2, 3, maxChunkSize, maxChunkSize - 1 - 2 - 3}, + expectedRowsDS: []int{maxChunkSize, maxChunkSize, 10, 0, 0}, + }, + } + + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := context.Background() + ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) + byItems := make([]*plannercore.ByItems, 0, len(testCase.groupBy)) + for _, groupBy := range testCase.groupBy { + col := ds.Schema().Columns[groupBy] + byItems = append(byItems, &plannercore.ByItems{Expr: col}) + } + exec := buildTopNExec(sctx, testCase.topNOffset, testCase.topNCount, byItems, ds) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(ds.checkNumNextCalled(), IsNil) + } +} + +func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*plannercore.ByItems, src Executor) Executor { + sortExec := SortExec{ + baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src), + ByItems: byItems, + schema: src.Schema(), + } + return &TopNExec{ + SortExec: sortExec, + limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)}, + } +} diff --git a/executor/sort.go b/executor/sort.go index bb1d56adf57a0..54c33dbad06ea 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -19,7 +19,7 @@ import ( "sort" "time" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" @@ -93,10 +93,7 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.RecordBatch) error { sort.Slice(e.rowPtrs, e.keyColumnsLess) e.fetched = true } - for req.NumRows() < e.maxChunkSize { - if e.Idx >= len(e.rowPtrs) { - return nil - } + for !req.IsFull() && e.Idx < len(e.rowPtrs) { rowPtr := e.rowPtrs[e.Idx] req.AppendRow(e.rowChunks.GetRow(rowPtr)) e.Idx++ @@ -265,7 +262,7 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.RecordBatch) error { if e.Idx >= len(e.rowPtrs) { return nil } - for req.NumRows() < e.maxChunkSize && e.Idx < len(e.rowPtrs) { + for !req.IsFull() && e.Idx < len(e.rowPtrs) { row := e.rowChunks.GetRow(e.rowPtrs[e.Idx]) req.AppendRow(row) e.Idx++ @@ -280,6 +277,8 @@ func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { e.rowChunks.GetMemTracker().SetLabel("rowChunks") for uint64(e.rowChunks.Len()) < e.totalLimit { srcChk := e.children[0].newFirstChunk() + // adjust required rows by total limit + srcChk.SetRequiredRows(int(e.totalLimit-uint64(e.rowChunks.Len())), e.maxChunkSize) err := e.children[0].Next(ctx, chunk.NewRecordBatch(srcChk)) if err != nil { return errors.Trace(err)