diff --git a/ddl/reorg.go b/ddl/reorg.go index 66730b5a826d9..10cf2d85b9064 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -282,7 +282,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxR } defer terror.Call(result.Close) - chk := chunk.NewChunkWithCapacity(getColumnsTypes(columns), 1) + chk := chunk.New(getColumnsTypes(columns), 1, 1) err = result.Next(ctx, chk) if err != nil { return maxRowID, false, errors.Trace(err) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index e914382417a8a..b06dd9afa6c56 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -70,7 +70,7 @@ func (s *testSuite) TestSelectNormal(c *C) { response.Fetch(context.TODO()) // Test Next. - chk := chunk.NewChunkWithCapacity(colTypes, 32) + chk := chunk.New(colTypes, 32, 32) numAllRows := 0 for { err = response.Next(context.TODO(), chk) @@ -122,7 +122,7 @@ func (s *testSuite) TestSelectStreaming(c *C) { response.Fetch(context.TODO()) // Test Next. - chk := chunk.NewChunkWithCapacity(colTypes, 32) + chk := chunk.New(colTypes, 32, 32) numAllRows := 0 for { err = response.Next(context.TODO(), chk) @@ -259,7 +259,7 @@ func BenchmarkReadRowsData(b *testing.B) { for i := 0; i < numCols; i++ { colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} } - chk := chunk.NewChunkWithCapacity(colTypes, numRows) + chk := chunk.New(colTypes, numRows, numRows) buffer := populateBuffer() @@ -277,7 +277,7 @@ func BenchmarkDecodeToChunk(b *testing.B) { for i := 0; i < numCols; i++ { colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} } - chk := chunk.NewChunkWithCapacity(colTypes, numRows) + chk := chunk.New(colTypes, numRows, numRows) for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { diff --git a/executor/adapter.go b/executor/adapter.go index 8bd9cf3242011..e877dd6e126e7 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -112,7 +112,7 @@ func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error { // NewChunk create a new chunk using NewChunk function in chunk package. func (a *recordSet) NewChunk() *chunk.Chunk { - return a.executor.newChunk() + return a.executor.newFirstChunk() } func (a *recordSet) Close() error { @@ -270,7 +270,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co a.logSlowQuery(txnTS, err == nil) }() - err = e.Next(ctx, e.newChunk()) + err = e.Next(ctx, e.newFirstChunk()) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/admin.go b/executor/admin.go index 2a571b878ce7f..1fe645ae15de3 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error { FieldType: *colTypeForHandle, }) - e.srcChunk = e.newChunk() + e.srcChunk = e.newFirstChunk() dagPB, err := e.buildDAGPB() if err != nil { return errors.Trace(err) @@ -197,7 +197,7 @@ func (e *RecoverIndexExec) Open(ctx context.Context) error { return errors.Trace(err) } - e.srcChunk = chunk.NewChunkWithCapacity(e.columnsTypes(), e.maxChunkSize) + e.srcChunk = chunk.New(e.columnsTypes(), e.initCap, e.maxChunkSize) e.batchSize = 2048 e.recoverRows = make([]recoverRows, 0, e.batchSize) e.idxValsBufs = make([][]types.Datum, e.batchSize) @@ -636,7 +636,7 @@ func (e *CleanupIndexExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.idxChunk = chunk.NewChunkWithCapacity(e.getIdxColTypes(), e.maxChunkSize) + e.idxChunk = chunk.New(e.getIdxColTypes(), e.initCap, e.maxChunkSize) e.idxValues = make(map[int64][][]types.Datum, e.batchSize) e.batchKeys = make([]kv.Key, 0, e.batchSize) e.idxValsBufs = make([][]types.Datum, e.batchSize) diff --git a/executor/aggregate.go b/executor/aggregate.go index b2c6753539f3e..d28d5a4c404f7 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -235,7 +235,7 @@ func (e *HashAggExec) initForUnparallelExec() { e.partialResultMap = make(aggPartialResultMapper, 0) e.groupKeyBuffer = make([]byte, 0, 8) e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer)) - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() } func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { @@ -270,12 +270,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { partialResultsMap: make(aggPartialResultMapper, 0), groupByItems: e.GroupByItems, groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)), - chk: e.children[0].newChunk(), + chk: e.children[0].newFirstChunk(), } e.partialWorkers[i] = w e.inputCh <- &HashAggInput{ - chk: e.children[0].newChunk(), + chk: e.children[0].newFirstChunk(), giveBackCh: w.inputCh, } } @@ -292,7 +292,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { rowBuffer: make([]types.Datum, 0, e.Schema().Len()), mutableRow: chunk.MutRowFromTypes(e.retTypes()), } - e.finalWorkers[i].finalResultHolderCh <- e.newChunk() + e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk() } } @@ -734,7 +734,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() e.executed = false e.isChildReturnEmpty = true e.inputIter = chunk.NewIterator4Chunk(e.childResult) diff --git a/executor/builder.go b/executor/builder.go index 8ac333c223372..1f9ef0e0cb96a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/cznic/mathutil" "github.com/cznic/sortutil" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/distsql" @@ -398,8 +399,10 @@ func (b *executorBuilder) buildChecksumTable(v *plannercore.ChecksumTable) Execu } func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor { + base := newBaseExecutor(b.ctx, nil, v.ExplainID()) + base.initCap = chunk.ZeroCapacity e := &DeallocateExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()), + baseExecutor: base, Name: v.Name, } return e @@ -431,8 +434,11 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { b.err = errors.Trace(b.err) return nil } + n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize))) + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec) + base.initCap = n e := &LimitExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), + baseExecutor: base, begin: v.Offset, end: v.Offset + v.Count, } @@ -440,8 +446,10 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { } func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor { + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = chunk.ZeroCapacity e := &PrepareExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: base, is: b.is, name: v.Name, sqlText: v.SQLText, @@ -495,8 +503,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { case *ast.RevokeStmt: return b.buildRevoke(s) } + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = chunk.ZeroCapacity e := &SimpleExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: base, Statement: v.Statement, is: b.is, } @@ -504,8 +514,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { } func (b *executorBuilder) buildSet(v *plannercore.Set) Executor { + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = chunk.ZeroCapacity e := &SetExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: base, vars: v.VarAssigns, } return e @@ -523,6 +535,7 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { } else { baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID()) } + baseExec.initCap = chunk.ZeroCapacity ivs := &InsertValues{ baseExecutor: baseExec, @@ -614,12 +627,13 @@ func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor { func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor { e := &RevokeExec{ - ctx: b.ctx, - Privs: revoke.Privs, - ObjectType: revoke.ObjectType, - Level: revoke.Level, - Users: revoke.Users, - is: b.is, + baseExecutor: newBaseExecutor(b.ctx, nil, "RevokeStmt"), + ctx: b.ctx, + Privs: revoke.Privs, + ObjectType: revoke.ObjectType, + Level: revoke.Level, + Users: revoke.Users, + is: b.is, } return e } @@ -1091,8 +1105,10 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount) return nil } + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = v.RowCount e := &TableDualExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + baseExecutor: base, numDualRows: v.RowCount, } // Init the startTS for later use. @@ -1209,9 +1225,10 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu b.err = errors.Trace(b.err) return nil } - e := &MaxOneRowExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), - } + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec) + base.initCap = 2 + base.maxChunkSize = 2 + e := &MaxOneRowExec{baseExecutor: base} return e } @@ -1241,8 +1258,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { return nil } columns2Handle := buildColumns2Handle(v.SelectPlan.Schema(), tblID2table) + base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec) + base.initCap = chunk.ZeroCapacity updateExec := &UpdateExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec), + baseExecutor: base, SelectExec: selExec, OrderedList: v.OrderedList, tblID2table: tblID2table, @@ -1320,8 +1339,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { b.err = errors.Trace(b.err) return nil } + base := newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec) + base.initCap = chunk.ZeroCapacity deleteExec := &DeleteExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID(), selExec), + baseExecutor: base, SelectExec: selExec, Tables: v.Tables, IsMultiTable: v.IsMultiTable, @@ -1542,7 +1563,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) innerKeyCols[i] = v.InnerJoinKeys[i].Index } e.innerCtx.keyCols = innerKeyCols - e.joinResult = e.newChunk() + e.joinResult = e.newFirstChunk() metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin").Inc() return e } diff --git a/executor/delete.go b/executor/delete.go index c60e8b9200018..59c9d1eee5fc7 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -104,8 +104,8 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn() batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize fields := e.children[0].retTypes() + chk := e.children[0].newFirstChunk() for { - chk := e.children[0].newChunk() iter := chunk.NewIterator4Chunk(chk) err := e.children[0].Next(ctx, chk) @@ -133,6 +133,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { } rowCount++ } + chk = chunk.Renew(chk, e.maxChunkSize) } return nil @@ -184,10 +185,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { colPosInfos := e.getColPosInfos(e.children[0].Schema()) tblRowMap := make(tableRowMapType) fields := e.children[0].retTypes() + chk := e.children[0].newFirstChunk() for { - chk := e.children[0].newChunk() iter := chunk.NewIterator4Chunk(chk) - err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -200,6 +200,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { joinedDatumRow := joinedChunkRow.GetDatumRow(fields) e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow) } + chk = chunk.Renew(chk, e.maxChunkSize) } return errors.Trace(e.removeRowsInTblRowMap(tblRowMap)) diff --git a/executor/distsql.go b/executor/distsql.go index 19bb507317f24..d262d8b57bbb0 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -711,7 +711,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er handleCnt := len(task.handles) task.rows = make([]chunk.Row, 0, handleCnt) for { - chk := tableReader.newChunk() + chk := tableReader.newFirstChunk() err = tableReader.Next(ctx, chk) if err != nil { log.Error(err) diff --git a/executor/executor.go b/executor/executor.go index 9b14a7fee8828..b6e2580a0d659 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -67,6 +67,7 @@ type baseExecutor struct { ctx sessionctx.Context id string schema *expression.Schema + initCap int maxChunkSize int children []Executor retFieldTypes []*types.FieldType @@ -102,9 +103,9 @@ func (e *baseExecutor) Schema() *expression.Schema { return e.schema } -// newChunk creates a new chunk to buffer current executor's result. -func (e *baseExecutor) newChunk() *chunk.Chunk { - return chunk.NewChunkWithCapacity(e.retTypes(), e.maxChunkSize) +// newFirstChunk creates a new chunk to buffer current executor's result. +func (e *baseExecutor) newFirstChunk() *chunk.Chunk { + return chunk.New(e.retTypes(), e.initCap, e.maxChunkSize) } // retTypes returns all output column types. @@ -123,6 +124,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin ctx: ctx, id: id, schema: schema, + initCap: ctx.GetSessionVars().MaxChunkSize, maxChunkSize: ctx.GetSessionVars().MaxChunkSize, } if schema != nil { @@ -152,7 +154,7 @@ type Executor interface { Schema() *expression.Schema retTypes() []*types.FieldType - newChunk() *chunk.Chunk + newFirstChunk() *chunk.Chunk } // CancelDDLJobsExec represents a cancel DDL jobs executor. @@ -166,11 +168,11 @@ type CancelDDLJobsExec struct { // Next implements the Executor Next interface. func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobIDs) { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobIDs)-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobIDs)-e.cursor) for i := e.cursor; i < e.cursor+numCurBatch; i++ { chk.AppendString(0, fmt.Sprintf("%d", e.jobIDs[i])) if e.errs[i] != nil { @@ -259,14 +261,14 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *ShowDDLJobQueriesExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobs) { return nil } if len(e.jobIDs) >= len(e.jobs) { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobs)-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor) for _, id := range e.jobIDs { for i := e.cursor; i < e.cursor+numCurBatch; i++ { if id == e.jobs[i].ID { @@ -302,11 +304,11 @@ func (e *ShowDDLJobsExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *ShowDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobs) { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, len(e.jobs)-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), len(e.jobs)-e.cursor) for i := e.cursor; i < e.cursor+numCurBatch; i++ { chk.AppendInt64(0, e.jobs[i].ID) chk.AppendString(1, getSchemaName(e.is, e.jobs[i].SchemaID)) @@ -461,7 +463,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error { if err != nil { return errors.Trace(err) } - chk = e.src.newChunk() + chk = e.src.newFirstChunk() for { err := e.src.Next(ctx, chk) if err != nil { @@ -564,7 +566,7 @@ func (e *SelectLockExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -659,7 +661,7 @@ func (e *LimitExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() e.cursor = 0 e.meetFirstBatch = e.begin == 0 return nil @@ -691,8 +693,8 @@ func init() { if err != nil { return rows, errors.Trace(err) } + chk := exec.newFirstChunk() for { - chk := exec.newChunk() err = exec.Next(ctx, chk) if err != nil { return rows, errors.Trace(err) @@ -705,6 +707,7 @@ func init() { row := r.GetDatumRow(exec.retTypes()) rows = append(rows, row) } + chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) } } } @@ -758,7 +761,7 @@ func (e *SelectionExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() e.batched = expression.Vectorizable(e.filters) if e.batched { e.selected = make([]bool, 0, chunk.InitialCapacity) @@ -777,7 +780,7 @@ func (e *SelectionExec) Close() error { // Next implements the Executor Next interface. func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if !e.batched { return errors.Trace(e.unBatchedNext(ctx, chk)) @@ -788,7 +791,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { if !e.selected[e.inputRow.Idx()] { continue } - if chk.NumRows() == e.maxChunkSize { + if chk.NumRows() >= chk.Capacity() { return nil } chk.AppendRow(e.inputRow) @@ -852,7 +855,7 @@ type TableScanExec struct { // Next implements the Executor Next interface. func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.isVirtualTable { return errors.Trace(e.nextChunk4InfoSchema(ctx, chk)) } @@ -862,7 +865,7 @@ func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { } mutableRow := chunk.MutRowFromTypes(e.retTypes()) - for chk.NumRows() < e.maxChunkSize { + for chk.NumRows() < chk.Capacity() { row, err := e.getRow(handle) if err != nil { return errors.Trace(err) @@ -875,9 +878,9 @@ func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { } func (e *TableScanExec) nextChunk4InfoSchema(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.virtualTableChunkList == nil { - e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.maxChunkSize) + e.virtualTableChunkList = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) columns := make([]*table.Column, e.schema.Len()) for i, colInfo := range e.columns { columns[i] = table.ToColumn(colInfo) @@ -971,7 +974,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error { return errors.New("subquery returns more than 1 row") } - childChunk := e.children[0].newChunk() + childChunk := e.children[0].newFirstChunk() err = e.children[0].Next(ctx, childChunk) if childChunk.NumRows() != 0 { return errors.New("subquery returns more than 1 row") @@ -1033,7 +1036,7 @@ func (e *UnionExec) Open(ctx context.Context) error { return errors.Trace(err) } for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newChunk()) + e.childrenResults = append(e.childrenResults, child.newFirstChunk()) } e.stopFetchData.Store(false) e.initialized = false @@ -1094,7 +1097,7 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) { // Next implements the Executor Next interface. func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if !e.initialized { e.initialize(ctx) e.initialized = true diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index c399d4671cb77..ef1a158fc0801 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -89,7 +89,7 @@ func (s *testExecSuite) TestShowProcessList(c *C) { err := e.Open(ctx) c.Assert(err, IsNil) - chk := e.newChunk() + chk := e.newFirstChunk() it := chunk.NewIterator4Chunk(chk) // Run test and check results. for _, p := range ps { diff --git a/executor/explain.go b/executor/explain.go index 781f026c736af..afc3f871e0883 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -35,12 +35,12 @@ func (e *ExplainExec) Close() error { // Next implements the Executor Next interface. func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.rows) { return nil } - numCurRows := mathutil.Min(e.maxChunkSize, len(e.rows)-e.cursor) + numCurRows := mathutil.Min(chk.Capacity(), len(e.rows)-e.cursor) for i := e.cursor; i < e.cursor+numCurRows; i++ { for j := range e.rows[i] { chk.AppendString(j, e.rows[i][j]) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index cf380e9534494..a0bc75994bc92 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -317,11 +317,11 @@ func (ow *outerWorker) pushToChan(ctx context.Context, task *lookUpJoinTask, dst // buildTask builds a lookUpJoinTask and read outer rows. // When err is not nil, task must not be nil to send the error to the main thread via task. func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { - ow.executor.newChunk() + ow.executor.newFirstChunk() task := &lookUpJoinTask{ doneCh: make(chan error, 1), - outerResult: ow.executor.newChunk(), + outerResult: ow.executor.newFirstChunk(), encodedLookUpKeys: chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, ow.ctx.GetSessionVars().MaxChunkSize), lookupMap: mvmap.NewMVMap(), } @@ -511,7 +511,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa return errors.Trace(err) } defer terror.Call(innerExec.Close) - innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize) + innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize, iw.ctx.GetSessionVars().MaxChunkSize) innerResult.GetMemTracker().SetLabel("inner result") innerResult.GetMemTracker().AttachTo(task.memTracker) for { @@ -523,7 +523,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa break } innerResult.Add(iw.executorChk) - iw.executorChk = innerExec.newChunk() + iw.executorChk = innerExec.newFirstChunk() } task.innerResult = innerResult return nil diff --git a/executor/insert_common.go b/executor/insert_common.go index 8a4d7856dc93a..628bead17cfbe 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -250,9 +250,9 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, cols []*table.C // process `insert|replace into ... select ... from ...` selectExec := e.children[0] fields := selectExec.retTypes() - chk := selectExec.newChunk() + chk := selectExec.newFirstChunk() iter := chunk.NewIterator4Chunk(chk) - rows := make([][]types.Datum, 0, e.ctx.GetSessionVars().MaxChunkSize) + rows := make([][]types.Datum, 0, chk.Capacity()) sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() diff --git a/executor/join.go b/executor/join.go index a8a720feb9513..a979790fe2af7 100644 --- a/executor/join.go +++ b/executor/join.go @@ -250,7 +250,7 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) { // and append them to e.innerResult. func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) { defer close(chkCh) - e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize) + e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel("innerResult") var err error @@ -262,7 +262,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C if e.finished.Load().(bool) { return } - chk := e.children[e.innerIdx].newChunk() + chk := e.children[e.innerIdx].newFirstChunk() err = e.innerExec.Next(ctx, chk) if err != nil { e.innerFinished <- errors.Trace(err) @@ -289,7 +289,7 @@ func (e *HashJoinExec) initializeForProbe() { e.outerChkResourceCh = make(chan *outerChkResource, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.outerChkResourceCh <- &outerChkResource{ - chk: e.outerExec.newChunk(), + chk: e.outerExec.newFirstChunk(), dest: e.outerResultChs[i], } } @@ -299,7 +299,7 @@ func (e *HashJoinExec) initializeForProbe() { e.joinChkResourceCh = make([]chan *chunk.Chunk, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) - e.joinChkResourceCh[i] <- e.newChunk() + e.joinChkResourceCh[i] <- e.newFirstChunk() } // e.joinResultCh is for transmitting the join result chunks to the main thread. @@ -620,9 +620,9 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error { } e.cursor = 0 e.innerRows = e.innerRows[:0] - e.outerChunk = e.outerExec.newChunk() - e.innerChunk = e.innerExec.newChunk() - e.innerList = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize) + e.outerChunk = e.outerExec.newFirstChunk() + e.innerChunk = e.innerExec.newFirstChunk() + e.innerList = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) diff --git a/executor/load_data.go b/executor/load_data.go index 2d5b2b018f66b..652ad68d1aa09 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -50,7 +50,7 @@ func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table, // Next implements the Executor Next interface. func (e *LoadDataExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) // TODO: support load data without local field. if !e.IsLocal { return errors.New("Load Data: don't support load data without local field") diff --git a/executor/load_stats.go b/executor/load_stats.go index edad12113f5bc..968d3b10e0601 100644 --- a/executor/load_stats.go +++ b/executor/load_stats.go @@ -51,7 +51,7 @@ const LoadStatsVarKey loadStatsVarKeyType = 0 // Next implements the Executor Next interface. func (e *LoadStatsExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if len(e.info.Path) == 0 { return errors.New("Load Stats: file path is empty") } diff --git a/executor/merge_join.go b/executor/merge_join.go index 134be86d410c8..56f2102e763e2 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -179,7 +179,7 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Create a new Chunk and append it to "resourceQueue" if there is no more // available chunk in "resourceQueue". if len(t.resourceQueue) == 0 { - newChunk := t.reader.newChunk() + newChunk := t.reader.newFirstChunk() t.memTracker.Consume(newChunk.MemoryUsage()) t.resourceQueue = append(t.resourceQueue, newChunk) } @@ -214,7 +214,7 @@ func (e *MergeJoinExec) Open(ctx context.Context) error { e.childrenResults = make([]*chunk.Chunk, 0, len(e.children)) for _, child := range e.children { - e.childrenResults = append(e.childrenResults, child.newChunk()) + e.childrenResults = append(e.childrenResults, child.newFirstChunk()) } e.innerTable.memTracker = memory.NewTracker("innerTable", -1) diff --git a/executor/pkg_test.go b/executor/pkg_test.go index ac75608b846cf..e75e5fade8a55 100644 --- a/executor/pkg_test.go +++ b/executor/pkg_test.go @@ -29,7 +29,7 @@ type MockExec struct { func (m *MockExec) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() colTypes := m.retTypes() - for ; m.curRowIdx < len(m.Rows) && chk.NumRows() < m.maxChunkSize; m.curRowIdx++ { + for ; m.curRowIdx < len(m.Rows) && chk.NumRows() < chk.Capacity(); m.curRowIdx++ { curRow := m.Rows[m.curRowIdx] for i := 0; i < curRow.Len(); i++ { curDatum := curRow.ToRow().GetDatum(i, colTypes[i]) @@ -91,10 +91,10 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) { innerFilter: []expression.Expression{innerFilter}, joiner: joiner, } - join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.maxChunkSize) - join.innerChunk = innerExec.newChunk() - join.outerChunk = outerExec.newChunk() - joinChk := join.newChunk() + join.innerList = chunk.NewList(innerExec.retTypes(), innerExec.initCap, innerExec.maxChunkSize) + join.innerChunk = innerExec.newFirstChunk() + join.outerChunk = outerExec.newFirstChunk() + joinChk := join.newFirstChunk() it := chunk.NewIterator4Chunk(joinChk) for rowIdx := 1; ; { err := join.Next(ctx, joinChk) diff --git a/executor/point_get.go b/executor/point_get.go index a5c743a8405ba..d2db04d830350 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -195,6 +195,6 @@ func (e *PointGetExecutor) retTypes() []*types.FieldType { return e.tps } -func (e *PointGetExecutor) newChunk() *chunk.Chunk { - return chunk.NewChunkWithCapacity(e.retTypes(), 1) +func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk { + return chunk.New(e.retTypes(), 1, 1) } diff --git a/executor/prepared.go b/executor/prepared.go index bab33f295089c..2e629c7971661 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -89,8 +89,10 @@ type PrepareExec struct { // NewPrepareExec creates a new PrepareExec. func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec { + base := newBaseExecutor(ctx, nil, "PrepareStmt") + base.initCap = chunk.ZeroCapacity return &PrepareExec{ - baseExecutor: newBaseExecutor(ctx, nil, "PrepareStmt"), + baseExecutor: base, is: is, sqlText: sqlTxt, } diff --git a/executor/projection.go b/executor/projection.go index 83609ed6f1893..168ce32f39914 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -75,7 +75,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error { } if e.isUnparallelExec() { - e.childResult = e.children[0].newChunk() + e.childResult = e.children[0].newFirstChunk() } return nil @@ -139,7 +139,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error { // +------------------------------+ +----------------------+ // func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.isUnparallelExec() { return errors.Trace(e.unParallelExecute(ctx, chk)) } @@ -207,11 +207,11 @@ func (e *ProjectionExec) prepare(ctx context.Context) { }) e.fetcher.inputCh <- &projectionInput{ - chk: e.children[0].newChunk(), + chk: e.children[0].newFirstChunk(), targetWorker: e.workers[i], } e.fetcher.outputCh <- &projectionOutput{ - chk: e.newChunk(), + chk: e.newFirstChunk(), done: make(chan error, 1), } } diff --git a/executor/show.go b/executor/show.go index 156dc486d9d04..0c10de649234e 100644 --- a/executor/show.go +++ b/executor/show.go @@ -64,9 +64,9 @@ type ShowExec struct { // Next implements the Executor Next interface. func (e *ShowExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(e.maxChunkSize) if e.result == nil { - e.result = e.newChunk() + e.result = e.newFirstChunk() err := e.fetchAll() if err != nil { return errors.Trace(err) @@ -87,7 +87,7 @@ func (e *ShowExec) Next(ctx context.Context, chk *chunk.Chunk) error { if e.cursor >= e.result.NumRows() { return nil } - numCurBatch := mathutil.Min(e.maxChunkSize, e.result.NumRows()-e.cursor) + numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor) chk.Append(e.result, e.cursor, e.cursor+numCurBatch) e.cursor += numCurBatch return nil diff --git a/executor/sort.go b/executor/sort.go index 603f525b59264..95b9ecac29d38 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -107,11 +107,11 @@ func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *SortExec) fetchRowChunks(ctx context.Context) error { fields := e.retTypes() - e.rowChunks = chunk.NewList(fields, e.maxChunkSize) + e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize) e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel("rowChunks") for { - chk := e.children[0].newChunk() + chk := e.children[0].newFirstChunk() err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -171,7 +171,7 @@ func (e *SortExec) buildKeyExprsAndTypes() { } func (e *SortExec) buildKeyChunks() error { - e.keyChunks = chunk.NewList(e.keyTypes, e.maxChunkSize) + e.keyChunks = chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) e.keyChunks.GetMemTracker().SetLabel("keyChunks") e.keyChunks.GetMemTracker().AttachTo(e.memTracker) @@ -323,11 +323,11 @@ func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { e.chkHeap = &topNChunkHeap{e} - e.rowChunks = chunk.NewList(e.retTypes(), e.maxChunkSize) + e.rowChunks = chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel("rowChunks") for e.rowChunks.Len() < e.totalLimit { - srcChk := e.children[0].newChunk() + srcChk := e.children[0].newFirstChunk() err := e.children[0].Next(ctx, srcChk) if err != nil { return errors.Trace(err) @@ -362,7 +362,7 @@ func (e *TopNExec) executeTopN(ctx context.Context) error { if e.keyChunks != nil { childKeyChk = chunk.NewChunkWithCapacity(e.keyTypes, e.maxChunkSize) } - childRowChk := e.children[0].newChunk() + childRowChk := e.children[0].newFirstChunk() for { err := e.children[0].Next(ctx, childRowChk) if err != nil { @@ -425,7 +425,7 @@ func (e *TopNExec) processChildChk(childRowChk, childKeyChk *chunk.Chunk) error // but we want descending top N, then we will keep all data in memory. // But if data is distributed randomly, this function will be called log(n) times. func (e *TopNExec) doCompaction() error { - newRowChunks := chunk.NewList(e.retTypes(), e.maxChunkSize) + newRowChunks := chunk.NewList(e.retTypes(), e.initCap, e.maxChunkSize) newRowPtrs := make([]chunk.RowPtr, 0, e.rowChunks.Len()) for _, rowPtr := range e.rowPtrs { newRowPtr := newRowChunks.AppendRow(e.rowChunks.GetRow(rowPtr)) @@ -436,7 +436,7 @@ func (e *TopNExec) doCompaction() error { e.rowChunks = newRowChunks if e.keyChunks != nil { - newKeyChunks := chunk.NewList(e.keyTypes, e.maxChunkSize) + newKeyChunks := chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) for _, rowPtr := range e.rowPtrs { newKeyChunks.AppendRow(e.keyChunks.GetRow(rowPtr)) } diff --git a/executor/trace.go b/executor/trace.go index 85204f53a04d0..588df29065c1b 100644 --- a/executor/trace.go +++ b/executor/trace.go @@ -72,7 +72,7 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error { if err != nil { return errors.Trace(err) } - stmtExecChk := stmtExec.newChunk() + stmtExecChk := stmtExec.newFirstChunk() // store span into context ctx = opentracing.ContextWithSpan(ctx, e.rootTrace) diff --git a/executor/union_scan.go b/executor/union_scan.go index 474a02819f9bf..3c1682936d71d 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -119,15 +119,15 @@ func (us *UnionScanExec) Open(ctx context.Context) error { if err := us.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - us.snapshotChunkBuffer = us.newChunk() + us.snapshotChunkBuffer = us.newFirstChunk() return nil } // Next implements the Executor Next interface. func (us *UnionScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() + chk.GrowAndReset(us.maxChunkSize) mutableRow := chunk.MutRowFromTypes(us.retTypes()) - for i, batchSize := 0, us.ctx.GetSessionVars().MaxChunkSize; i < batchSize; i++ { + for i, batchSize := 0, chk.Capacity(); i < batchSize; i++ { row, err := us.getOneRow(ctx) if err != nil { return errors.Trace(err) diff --git a/executor/update.go b/executor/update.go index 4ab7fc345e0fb..de60b92ab8a01 100644 --- a/executor/update.go +++ b/executor/update.go @@ -140,8 +140,8 @@ func (e *UpdateExec) Next(ctx context.Context, chk *chunk.Chunk) error { func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { fields := e.children[0].retTypes() globalRowIdx := 0 + chk := e.children[0].newFirstChunk() for { - chk := e.children[0].newChunk() err := e.children[0].Next(ctx, chk) if err != nil { return errors.Trace(err) @@ -162,6 +162,7 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { e.newRowsData = append(e.newRowsData, newRow) globalRowIdx++ } + chk = chunk.Renew(chk, e.maxChunkSize) } return nil } diff --git a/privilege/privileges/cache.go b/privilege/privileges/cache.go index 100e86efba061..08974a2374ec9 100644 --- a/privilege/privileges/cache.go +++ b/privilege/privileges/cache.go @@ -277,11 +277,8 @@ func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string, defer terror.Call(rs.Close) fs := rs.Fields() + chk := rs.NewChunk() for { - // NOTE: decodeTableRow decodes data from a chunk Row, that is a shallow copy. - // The result will reference memory in the chunk, so the chunk must not be reused - // here, otherwise some werid bug will happen! - chk := rs.NewChunk() err = rs.Next(context.TODO(), chk) if err != nil { return errors.Trace(err) @@ -296,6 +293,10 @@ func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string, return errors.Trace(err) } } + // NOTE: decodeTableRow decodes data from a chunk Row, that is a shallow copy. + // The result will reference memory in the chunk, so the chunk must not be reused + // here, otherwise some werid bug will happen! + chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) } } diff --git a/server/conn.go b/server/conn.go index 2275f7170897f..df2fa182ebe93 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1033,8 +1033,8 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet fetchedRows := rs.GetFetchedRows() // if fetchedRows is not enough, getting data from recordSet. + chk := rs.NewChunk() for len(fetchedRows) < fetchSize { - chk := rs.NewChunk() // Here server.tidbResultSet implements Next method. err := rs.Next(ctx, chk) if err != nil { @@ -1048,6 +1048,7 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet for i := 0; i < rowCount; i++ { fetchedRows = append(fetchedRows, chk.GetRow(i)) } + chk = chunk.Renew(chk, cc.ctx.GetSessionVars().MaxChunkSize) } // tell the client COM_STMT_FETCH has finished by setting proper serverStatus, diff --git a/server/driver.go b/server/driver.go index 905d8aec3f959..ebc2d7278ccc0 100644 --- a/server/driver.go +++ b/server/driver.go @@ -17,6 +17,7 @@ import ( "crypto/tls" "fmt" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/auth" "github.com/pingcap/tidb/util/chunk" @@ -82,6 +83,9 @@ type QueryCtx interface { // ShowProcess shows the information about the session. ShowProcess() util.ProcessInfo + // GetSessionVars return SessionVars. + GetSessionVars() *variable.SessionVars + SetSessionManager(util.SessionManager) } diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 6a583bd357c6f..103556477765c 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -324,6 +325,11 @@ func (tc *TiDBContext) ShowProcess() util.ProcessInfo { return tc.session.ShowProcess() } +// GetSessionVars return SessionVars. +func (tc *TiDBContext) GetSessionVars() *variable.SessionVars { + return tc.session.GetSessionVars() +} + type tidbResultSet struct { recordSet ast.RecordSet columns []*ColumnInfo diff --git a/session/session.go b/session/session.go index 04a0d0cc78f22..cf44975b59e15 100644 --- a/session/session.go +++ b/session/session.go @@ -555,7 +555,7 @@ func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chun ) // Execute all recordset, take out the first one as result. for i, rs := range recordSets { - tmp, err := drainRecordSet(ctx, rs) + tmp, err := drainRecordSet(ctx, se, rs) if err != nil { return nil, nil, errors.Trace(err) } @@ -604,10 +604,10 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R } } -func drainRecordSet(ctx context.Context, rs ast.RecordSet) ([]chunk.Row, error) { +func drainRecordSet(ctx context.Context, se *session, rs ast.RecordSet) ([]chunk.Row, error) { var rows []chunk.Row + chk := rs.NewChunk() for { - chk := rs.NewChunk() err := rs.Next(ctx, chk) if err != nil || chk.NumRows() == 0 { return rows, errors.Trace(err) @@ -616,6 +616,7 @@ func drainRecordSet(ctx context.Context, rs ast.RecordSet) ([]chunk.Row, error) for r := iter.Begin(); r != iter.End(); r = iter.Next() { rows = append(rows, r) } + chk = chunk.Renew(chk, se.sessionVars.MaxChunkSize) } } diff --git a/session/tidb.go b/session/tidb.go index 26ed871736e73..1357c3e753f01 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -199,9 +199,9 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet return nil, nil } var rows []chunk.Row + chk := rs.NewChunk() for { // Since we collect all the rows, we can not reuse the chunk. - chk := rs.NewChunk() iter := chunk.NewIterator4Chunk(chk) err := rs.Next(ctx, chk) @@ -215,6 +215,7 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet for row := iter.Begin(); row != iter.End(); row = iter.Next() { rows = append(rows, row) } + chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize) } return rows, nil } diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index a9211cb79a416..a5f2cddc57fa1 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -38,6 +38,7 @@ type Chunk struct { // Capacity constants. const ( InitialCapacity = 32 + ZeroCapacity = 0 ) // NewChunkWithCapacity creates a new chunk with field types and capacity. @@ -70,8 +71,11 @@ func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { // chk: old chunk(often used in previous call). // maxChunkSize: the limit for the max number of rows. func Renew(chk *Chunk, maxChunkSize int) *Chunk { - newCap := reCalcCapacity(chk, maxChunkSize) newChk := new(Chunk) + if chk.columns == nil { + return newChk + } + newCap := reCalcCapacity(chk, maxChunkSize) newChk.columns = renewColumns(chk.columns, newCap) newChk.numVirtualRows = 0 newChk.capacity = newCap @@ -153,6 +157,9 @@ func (c *Chunk) SetNumVirtualRows(numVirtualRows int) { // Reset resets the chunk, so the memory it allocated can be reused. // Make sure all the data in the chunk is not used anymore before you reuse this chunk. func (c *Chunk) Reset() { + if c.columns == nil { + return + } for _, col := range c.columns { col.reset() } diff --git a/util/chunk/iterator_test.go b/util/chunk/iterator_test.go index ca0b69273014d..eb4f9d7c04cf7 100644 --- a/util/chunk/iterator_test.go +++ b/util/chunk/iterator_test.go @@ -21,7 +21,7 @@ import ( func (s *testChunkSuite) TestIterator(c *check.C) { fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} - chk := NewChunkWithCapacity(fields, 32) + chk := New(fields, 32, 1024) n := 10 var expected []int64 for i := 0; i < n; i++ { @@ -29,8 +29,8 @@ func (s *testChunkSuite) TestIterator(c *check.C) { expected = append(expected, int64(i)) } var rows []Row - li := NewList(fields, 1) - li2 := NewList(fields, 5) + li := NewList(fields, 1, 2) + li2 := NewList(fields, 8, 16) var ptrs []RowPtr var ptrs2 []RowPtr for i := 0; i < n; i++ { diff --git a/util/chunk/list.go b/util/chunk/list.go index 9c89ebbce6ad1..da789211d5a0d 100644 --- a/util/chunk/list.go +++ b/util/chunk/list.go @@ -21,11 +21,12 @@ import ( // List holds a slice of chunks, use to append rows with max chunk size properly handled. type List struct { - fieldTypes []*types.FieldType - maxChunkSize int - length int - chunks []*Chunk - freelist []*Chunk + fieldTypes []*types.FieldType + initChunkSize int + maxChunkSize int + length int + chunks []*Chunk + freelist []*Chunk memTracker *memory.Tracker // track memory usage. consumedIdx int // chunk index in "chunks", has been consumed. @@ -38,13 +39,14 @@ type RowPtr struct { RowIdx uint32 } -// NewList creates a new List with field types and max chunk size. -func NewList(fieldTypes []*types.FieldType, maxChunkSize int) *List { +// NewList creates a new List with field types, init chunk size and max chunk size. +func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List { l := &List{ - fieldTypes: fieldTypes, - maxChunkSize: maxChunkSize, - memTracker: memory.NewTracker("chunk.List", -1), - consumedIdx: -1, + fieldTypes: fieldTypes, + initChunkSize: initChunkSize, + maxChunkSize: maxChunkSize, + memTracker: memory.NewTracker("chunk.List", -1), + consumedIdx: -1, } return l } @@ -72,7 +74,7 @@ func (l *List) GetChunk(chkIdx int) *Chunk { // AppendRow appends a row to the List, the row is copied to the List. func (l *List) AppendRow(row Row) RowPtr { chkIdx := len(l.chunks) - 1 - if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.maxChunkSize || chkIdx == l.consumedIdx { + if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() || chkIdx == l.consumedIdx { newChk := l.allocChunk() l.chunks = append(l.chunks, newChk) if chkIdx != l.consumedIdx { @@ -115,7 +117,10 @@ func (l *List) allocChunk() (chk *Chunk) { chk.Reset() return } - return NewChunkWithCapacity(l.fieldTypes, l.maxChunkSize) + if len(l.chunks) > 0 { + return Renew(l.chunks[len(l.chunks)-1], l.maxChunkSize) + } + return New(l.fieldTypes, l.initChunkSize, l.maxChunkSize) } // GetRow gets a Row from the list by RowPtr. diff --git a/util/chunk/list_test.go b/util/chunk/list_test.go index 8468876c6d826..646812331ceb8 100644 --- a/util/chunk/list_test.go +++ b/util/chunk/list_test.go @@ -28,7 +28,7 @@ func (s *testChunkSuite) TestList(c *check.C) { fields := []*types.FieldType{ types.NewFieldType(mysql.TypeLonglong), } - l := NewList(fields, 2) + l := NewList(fields, 2, 2) srcChunk := NewChunkWithCapacity(fields, 32) srcChunk.AppendInt64(0, 1) srcRow := srcChunk.GetRow(0) @@ -100,7 +100,7 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) { srcChk.AppendTime(3, timeObj) srcChk.AppendDuration(4, durationObj) - list := NewList(fieldTypes, maxChunkSize) + list := NewList(fieldTypes, maxChunkSize, maxChunkSize*2) c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, int64(0)) list.AppendRow(srcChk.GetRow(0)) @@ -131,7 +131,7 @@ func BenchmarkListMemoryUsage(b *testing.B) { row := chk.GetRow(0) initCap := 50 - list := NewList(fieldTypes, 2) + list := NewList(fieldTypes, 2, 8) for i := 0; i < initCap; i++ { list.AppendRow(row) } diff --git a/util/codec/bench_test.go b/util/codec/bench_test.go index d7e837fa92b69..4929458dcf994 100644 --- a/util/codec/bench_test.go +++ b/util/codec/bench_test.go @@ -82,7 +82,7 @@ func BenchmarkDecodeOneToChunk(b *testing.B) { raw = EncodeBytes(raw, str.GetBytes()) intType := types.NewFieldType(mysql.TypeLonglong) b.ResetTimer() - decoder := NewDecoder(chunk.NewChunkWithCapacity([]*types.FieldType{intType}, 32), nil) + decoder := NewDecoder(chunk.New([]*types.FieldType{intType}, 32, 32), nil) for i := 0; i < b.N; i++ { decoder.DecodeOne(raw, 0, intType) } diff --git a/util/codec/codec_test.go b/util/codec/codec_test.go index 667213a4e6524..07b38e79c5cbe 100644 --- a/util/codec/codec_test.go +++ b/util/codec/codec_test.go @@ -935,7 +935,7 @@ func (s *testCodecSuite) TestDecodeOneToChunk(c *C) { datums = append(datums, types.NewDatum(t.value)) } rowCount := 3 - decoder := NewDecoder(chunk.NewChunkWithCapacity(tps, 32), time.Local) + decoder := NewDecoder(chunk.New(tps, 32, 32), time.Local) for rowIdx := 0; rowIdx < rowCount; rowIdx++ { encoded, err := EncodeValue(sc, nil, datums...) c.Assert(err, IsNil)