diff --git a/executor/executor.go b/executor/executor.go index aa7e2235fbd50..00e4e692c5eb4 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -14,6 +14,7 @@ package executor import ( + "context" "fmt" "runtime" "sync" @@ -43,7 +44,6 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) var ( @@ -689,7 +689,9 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error { return nil } for !e.meetFirstBatch { - err := e.children[0].Next(ctx, e.childResult) + // transfer req's requiredRows to childResult and then adjust it in childResult + e.childResult = e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize) + err := e.children[0].Next(ctx, e.adjustRequiredRows(e.childResult)) if err != nil { return errors.Trace(err) } @@ -713,6 +715,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error { } e.cursor += batchSize } + e.adjustRequiredRows(chk) err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -747,6 +750,24 @@ func (e *LimitExec) Close() error { return errors.Trace(e.baseExecutor.Close()) } +func (e *LimitExec) adjustRequiredRows(chk *chunk.Chunk) *chunk.Chunk { + // the limit of maximum number of rows the LimitExec should read + limitTotal := int(e.end - e.cursor) + + var limitRequired int + if e.cursor < e.begin { + // if cursor is less than begin, it have to read (begin-cursor) rows to ignore + // and then read chk.RequiredRows() rows to return, + // so the limit is (begin-cursor)+chk.RequiredRows(). + limitRequired = int(e.begin) - int(e.cursor) + chk.RequiredRows() + } else { + // if cursor is equal or larger than begin, just read chk.RequiredRows() rows to return. + limitRequired = chk.RequiredRows() + } + + return chk.SetRequiredRows(mathutil.Min(limitTotal, limitRequired), e.maxChunkSize) +} + func init() { // While doing optimization in the plan package, we need to execute uncorrelated subquery, // but the plan package cannot import the executor package because of the dependency cycle. diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go new file mode 100644 index 0000000000000..03f5cb9b7ff5a --- /dev/null +++ b/executor/executor_required_rows_test.go @@ -0,0 +1,186 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "fmt" + "math/rand" + + "github.com/cznic/mathutil" + . "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/mock" +) + +type requiredRowsDataSource struct { + baseExecutor + totalRows int + count int + ctx sessionctx.Context + + expectedRowsRet []int + numNextCalled int +} + +func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRowsRet []int) *requiredRowsDataSource { + // the schema of output is fixed now, which is [Double, Long] + retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)} + cols := make([]*expression.Column, len(retTypes)) + for i := range retTypes { + cols[i] = &expression.Column{Index: i, RetType: retTypes[i]} + } + schema := expression.NewSchema(cols...) + baseExec := newBaseExecutor(ctx, schema, "") + return &requiredRowsDataSource{baseExec, totalRows, 0, ctx, expectedRowsRet, 0} +} + +func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.Chunk) error { + defer func() { + rowsRet := req.NumRows() + expected := r.expectedRowsRet[r.numNextCalled] + if rowsRet != expected { + panic(fmt.Sprintf("unexpected number of rows returned, obtain: %v, expected: %v", rowsRet, expected)) + } + r.numNextCalled++ + }() + + req.Reset() + if r.count > r.totalRows { + return nil + } + required := mathutil.Min(req.RequiredRows(), r.totalRows-r.count) + for i := 0; i < required; i++ { + req.AppendRow(r.genOneRow()) + } + r.count += required + return nil +} + +func (r *requiredRowsDataSource) genOneRow() chunk.Row { + row := chunk.MutRowFromTypes(r.retTypes()) + for i := range r.retTypes() { + row.SetValue(i, r.genValue(r.retTypes()[i])) + } + return row.ToRow() +} + +func (r *requiredRowsDataSource) genValue(valType *types.FieldType) interface{} { + switch valType.Tp { + case mysql.TypeLong, mysql.TypeLonglong: + return int64(rand.Int()) + case mysql.TypeDouble: + return rand.Float64() + default: + panic("not implement") + } +} + +func (r *requiredRowsDataSource) checkNumNextCalled() error { + if r.numNextCalled != len(r.expectedRowsRet) { + return fmt.Errorf("unexpected number of call on Next, obtain: %v, expected: %v", + r.numNextCalled, len(r.expectedRowsRet)) + } + return nil +} + +func (s *testExecSuite) TestLimitRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + limitOffset int + limitCount int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 20, + limitOffset: 0, + limitCount: 10, + requiredRows: []int{3, 5, 1, 500, 500}, + expectedRows: []int{3, 5, 1, 1, 0}, + expectedRowsDS: []int{3, 5, 1, 1}, + }, + { + totalRows: 20, + limitOffset: 0, + limitCount: 25, + requiredRows: []int{9, 500}, + expectedRows: []int{9, 11}, + expectedRowsDS: []int{9, 11}, + }, + { + totalRows: 100, + limitOffset: 50, + limitCount: 30, + requiredRows: []int{10, 5, 10, 20}, + expectedRows: []int{10, 5, 10, 5}, + expectedRowsDS: []int{60, 5, 10, 5}, + }, + { + totalRows: 100, + limitOffset: 101, + limitCount: 10, + requiredRows: []int{10}, + expectedRows: []int{0}, + expectedRowsDS: []int{100, 0}, + }, + { + totalRows: maxChunkSize + 20, + limitOffset: maxChunkSize + 1, + limitCount: 10, + requiredRows: []int{3, 3, 3, 100}, + expectedRows: []int{3, 3, 3, 1}, + expectedRowsDS: []int{maxChunkSize, 4, 3, 3, 1}, + }, + } + + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := context.Background() + ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS) + exec := buildLimitExec(sctx, ds, testCase.limitOffset, testCase.limitCount) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], sctx.GetSessionVars().MaxChunkSize) + c.Assert(exec.Next(ctx, chk), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(ds.checkNumNextCalled(), IsNil) + } +} + +func buildLimitExec(ctx sessionctx.Context, src Executor, offset, count int) Executor { + n := mathutil.Min(count, ctx.GetSessionVars().MaxChunkSize) + base := newBaseExecutor(ctx, src.Schema(), "", src) + base.initCap = n + limitExec := &LimitExec{ + baseExecutor: base, + begin: uint64(offset), + end: uint64(offset + count), + } + return limitExec +} + +func defaultCtx() sessionctx.Context { + ctx := mock.NewContext() + ctx.GetSessionVars().MaxChunkSize = 1024 + return ctx +}