diff --git a/config/config.go b/config/config.go index 4deb0d9c9c0ad..8bb9cf1776920 100644 --- a/config/config.go +++ b/config/config.go @@ -278,6 +278,10 @@ type Config struct { Plugin Plugin `toml:"plugin" json:"plugin"` MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` RunDDL bool `toml:"run-ddl" json:"run-ddl"` + // TiDBMaxReuseChunk indicates max cached chunk num + TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` + // TiDBMaxReuseColumn indicates max cached column num + TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"` } // UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed @@ -975,6 +979,8 @@ var defaultConf = Config{ NewCollationsEnabledOnFirstBootstrap: true, EnableGlobalKill: true, TrxSummary: DefaultTrxSummary(), + TiDBMaxReuseChunk: 64, + TiDBMaxReuseColumn: 256, } var ( diff --git a/config/config_test.go b/config/config_test.go index 50e3227de049c..9a6d12a284817 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -730,6 +730,8 @@ enable-enum-length-limit = false stores-refresh-interval = 30 enable-forwarding = true enable-global-kill = true +tidb-max-reuse-chunk = 10 +tidb-max-reuse-column = 20 [performance] txn-total-size-limit=2000 tcp-no-delay = false @@ -798,6 +800,8 @@ max_connections = 200 require.True(t, conf.RepairMode) require.Equal(t, uint64(16), conf.TiKVClient.ResolveLockLiteThreshold) require.Equal(t, uint32(200), conf.Instance.MaxConnections) + require.Equal(t, uint32(10), conf.TiDBMaxReuseChunk) + require.Equal(t, uint32(20), conf.TiDBMaxReuseColumn) require.Equal(t, []string{"tiflash"}, conf.IsolationRead.Engines) require.Equal(t, 3080, conf.MaxIndexLength) require.Equal(t, 70, conf.IndexLimit) diff --git a/distsql/select_result.go b/distsql/select_result.go index 0e807b360d0ad..a2d6215987a32 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -311,7 +311,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) error { if r.respChunkDecoder == nil { r.respChunkDecoder = chunk.NewDecoder( - chunk.NewChunkWithCapacity(r.fieldTypes, 0), + r.ctx.GetSessionVars().GetNewChunk(r.fieldTypes, 0), r.fieldTypes, ) } diff --git a/executor/adapter.go b/executor/adapter.go index c4007a09213e4..b638b28690c8b 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -840,7 +840,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor }() var rows []chunk.Row var err error - req := newFirstChunk(e) + req := tryNewCacheChunk(e) for { err = a.next(ctx, e, req) if err != nil { @@ -887,7 +887,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex } } - err = a.next(ctx, e, newFirstChunk(e)) + err = a.next(ctx, e, tryNewCacheChunk(e)) if err != nil { return nil, err } diff --git a/executor/admin.go b/executor/admin.go index ba219b70b6db3..6e549e246da42 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -111,7 +111,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error { FieldType: *colTypeForHandle, }) - e.srcChunk = newFirstChunk(e) + e.srcChunk = tryNewCacheChunk(e) dagPB, err := e.buildDAGPB() if err != nil { return err diff --git a/executor/aggregate.go b/executor/aggregate.go index b407ce80a5353..771d928c9bbad 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -323,13 +323,14 @@ func (e *HashAggExec) initForUnparallelExec() { failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(hack.DefBucketMemoryUsageForMapStrToSlice*(1< 0 fields := retTypes(e.children[0]) - chk := newFirstChunk(e.children[0]) + chk := tryNewCacheChunk(e.children[0]) columns := e.children[0].Schema().Columns if len(columns) != len(fields) { logutil.BgLogger().Error("schema columns and fields mismatch", @@ -190,7 +190,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { colPosInfos := e.tblColPosInfos tblRowMap := make(tableRowMapType) fields := retTypes(e.children[0]) - chk := newFirstChunk(e.children[0]) + chk := tryNewCacheChunk(e.children[0]) memUsageOfChk := int64(0) for { e.memTracker.Consume(-memUsageOfChk) diff --git a/executor/distsql.go b/executor/distsql.go index 6121d5fcaa4cd..182831bc90021 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -866,7 +866,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } }() retTps := w.idxLookup.getRetTpsByHandle() - chk := chunk.NewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize) + chk := w.idxLookup.ctx.GetSessionVars().GetNewChunk(retTps, w.idxLookup.maxChunkSize) idxID := w.idxLookup.getIndexPlanRootID() if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { if idxID != w.idxLookup.id && w.idxLookup.stats != nil { @@ -1161,7 +1161,7 @@ func (e *IndexLookUpRunTimeStats) Tp() int { } func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader Executor) error { - chk := newFirstChunk(tableReader) + chk := tryNewCacheChunk(tableReader) tblInfo := w.idxLookup.table.Meta() vals := make([]types.Datum, 0, len(w.idxTblCols)) @@ -1317,7 +1317,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er handleCnt := len(task.handles) task.rows = make([]chunk.Row, 0, handleCnt) for { - chk := newFirstChunk(tableReader) + chk := tryNewCacheChunk(tableReader) err = Next(ctx, tableReader, chk) if err != nil { logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err)) diff --git a/executor/executor.go b/executor/executor.go index 21523159e7a9a..ec662c00d8a63 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -229,6 +229,12 @@ func newFirstChunk(e Executor) *chunk.Chunk { return chunk.New(base.retFieldTypes, base.initCap, base.maxChunkSize) } +func tryNewCacheChunk(e Executor) *chunk.Chunk { + base := e.base() + s := base.ctx.GetSessionVars() + return s.GetNewChunkWithCapacity(base.retFieldTypes, base.initCap, base.maxChunkSize) +} + // newList creates a new List to buffer current executor's result. func newList(e Executor) *chunk.List { base := e.base() @@ -1387,7 +1393,7 @@ func (e *LimitExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } - e.childResult = newFirstChunk(e.children[0]) + e.childResult = tryNewCacheChunk(e.children[0]) e.cursor = 0 e.meetFirstBatch = e.begin == 0 return nil @@ -1444,8 +1450,7 @@ func init() { if err != nil { return nil, err } - chk := newFirstChunk(exec) - + chk := tryNewCacheChunk(exec) err = Next(ctx, exec, chk) if err != nil { return nil, err @@ -1520,7 +1525,7 @@ func (e *SelectionExec) Open(ctx context.Context) error { func (e *SelectionExec) open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.childResult = newFirstChunk(e.children[0]) + e.childResult = tryNewCacheChunk(e.children[0]) e.memTracker.Consume(e.childResult.MemoryUsage()) e.batched = expression.Vectorizable(e.filters) if e.batched { @@ -1700,7 +1705,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, req *chunk.Chunk) error { return ErrSubqueryMoreThan1Row } - childChunk := newFirstChunk(e.children[0]) + childChunk := tryNewCacheChunk(e.children[0]) err = Next(ctx, e.children[0], childChunk) if err != nil { return err @@ -2134,6 +2139,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { errCount, warnCount := vars.StmtCtx.NumErrorWarnings() vars.SysErrorCount = errCount vars.SysWarningCount = warnCount + vars.ExchangeChunkStatus() vars.StmtCtx = sc vars.PrevFoundInPlanCache = vars.FoundInPlanCache vars.FoundInPlanCache = false @@ -2172,6 +2178,10 @@ func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars // expression using rows from a chunk, and then fill this value into the chunk func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnIndex []int, schema *expression.Schema, columns []*model.ColumnInfo, sctx sessionctx.Context, req *chunk.Chunk) error { + if len(virtualColumnIndex) == 0 { + return nil + } + virCols := chunk.NewChunkWithCapacity(virtualRetTypes, req.Capacity()) iter := chunk.NewIterator4Chunk(req) for i, idx := range virtualColumnIndex { diff --git a/executor/explain.go b/executor/explain.go index 288fcc6b16b88..3f9f1eec6704e 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -120,7 +120,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) { }).run() } e.executed = true - chk := newFirstChunk(e.analyzeExec) + chk := tryNewCacheChunk(e.analyzeExec) for { err = Next(ctx, e.analyzeExec, chk) if err != nil || chk.NumRows() == 0 { diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index bdda0ab3535bc..9601dffc77900 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -418,7 +418,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, innerCtx: e.innerCtx, outerCtx: e.outerCtx, ctx: e.ctx, - executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), + executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 62ee5cea0bdd2..92f195985a191 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -226,7 +226,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork outerCtx: e.outerCtx, taskCh: taskCh, ctx: e.ctx, - executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), + executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, @@ -431,7 +431,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize for requiredRows > task.outerResult.Len() { - chk := chunk.NewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize) + chk := ow.ctx.GetSessionVars().GetNewChunk(ow.outerCtx.rowTypes, maxChunkSize) chk = chk.SetRequiredRows(requiredRows, maxChunkSize) err := Next(ctx, ow.executor, chk) if err != nil { @@ -462,7 +462,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks()) for i := range task.encodedLookUpKeys { - task.encodedLookUpKeys[i] = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows()) + task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows()) } return task, nil } @@ -714,7 +714,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa break } innerResult.Add(iw.executorChk) - iw.executorChk = newFirstChunk(innerExec) + iw.executorChk = tryNewCacheChunk(innerExec) } task.innerResult = innerResult return nil diff --git a/executor/index_lookup_join_test.go b/executor/index_lookup_join_test.go index d37f3f8a1f743..2f251761b71c2 100644 --- a/executor/index_lookup_join_test.go +++ b/executor/index_lookup_join_test.go @@ -352,13 +352,16 @@ func TestIssue23722(t *testing.T) { tk.MustExec("insert into t values (20301,'Charlie',x'7a');") tk.MustQuery("select * from t;").Check(testkit.Rows("20301 Charlie z")) tk.MustQuery("select * from t where c in (select c from t where t.c >= 'a');").Check(testkit.Rows("20301 Charlie z")) + tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1")) // Test lookup content exceeds primary key prefix. tk.MustExec("drop table if exists t;") tk.MustExec("create table t (a int, b char(10), c varchar(255), primary key (c(5)) clustered);") tk.MustExec("insert into t values (20301,'Charlie','aaaaaaa');") + tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1")) tk.MustQuery("select * from t;").Check(testkit.Rows("20301 Charlie aaaaaaa")) tk.MustQuery("select * from t where c in (select c from t where t.c >= 'a');").Check(testkit.Rows("20301 Charlie aaaaaaa")) + tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1")) // Test the original case. tk.MustExec("drop table if exists t;") @@ -452,7 +455,9 @@ func TestIssue27893(t *testing.T) { tk.MustExec("insert into t1 values('x')") tk.MustExec("insert into t2 values(1)") tk.MustQuery("select /*+ inl_join(t2) */ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1")) tk.MustQuery("select /*+ inl_hash_join(t2) */ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows("1")) + tk.MustQuery("select @@last_sql_use_alloc").Check(testkit.Rows("1")) } func TestPartitionTableIndexJoinAndIndexReader(t *testing.T) { diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 369c18716dbc3..1ba2c2940c3fd 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -357,7 +357,7 @@ func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTas requiredRows = omw.maxBatchSize } for requiredRows > 0 { - execChk := newFirstChunk(omw.executor) + execChk := tryNewCacheChunk(omw.executor) err := Next(ctx, omw.executor, execChk) if err != nil { return task, err @@ -706,7 +706,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo // fetchNextInnerResult collects a chunk of inner results from inner child executor. func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) { - task.innerResult = chunk.NewChunkWithCapacity(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize) + task.innerResult = imw.ctx.GetSessionVars().GetNewChunk(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize) err = Next(ctx, imw.innerExec, task.innerResult) task.innerIter = chunk.NewIterator4Chunk(task.innerResult) beginRow = task.innerIter.Begin() diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index bc29199a2c2b7..82c6ab2f50817 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -508,7 +508,7 @@ func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { - chk := chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize) + chk := w.sc.GetSessionVars().GetNewChunk(retTypes(w.tableReader), w.maxChunkSize) var basic *execdetails.BasicRuntimeStats if be := w.tableReader.base(); be != nil && be.runtimeStats != nil { basic = be.runtimeStats @@ -817,7 +817,7 @@ func (w *partialIndexWorker) fetchHandles( resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { - chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) + chk := w.sc.GetSessionVars().GetNewChunk(handleCols.GetFieldsTypes(), w.maxChunkSize) var basicStats *execdetails.BasicRuntimeStats if w.stats != nil { if w.idxID != 0 { @@ -961,7 +961,7 @@ func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *looku handleCnt := len(task.handles) task.rows = make([]chunk.Row, 0, handleCnt) for { - chk := newFirstChunk(tableReader) + chk := tryNewCacheChunk(tableReader) err = Next(ctx, tableReader, chk) if err != nil { logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err)) diff --git a/executor/insert_common.go b/executor/insert_common.go index d4e3dab1bf839..5bb7feb2441da 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -460,7 +460,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { e := base.insertCommon() selectExec := e.children[0] fields := retTypes(selectExec) - chk := newFirstChunk(selectExec) + chk := tryNewCacheChunk(selectExec) iter := chunk.NewIterator4Chunk(chk) rows := make([][]types.Datum, 0, chk.Capacity()) diff --git a/executor/join.go b/executor/join.go index c97543b4cb3e2..48d3e5d5a56f8 100644 --- a/executor/join.go +++ b/executor/join.go @@ -297,7 +297,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu if e.finished.Load().(bool) { return } - chk := chunk.NewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize) + chk := e.ctx.GetSessionVars().GetNewChunk(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize) err = Next(ctx, e.buildSideExec, chk) if err != nil { e.buildFinished <- errors.Trace(err) @@ -1307,8 +1307,8 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error { } e.cursor = 0 e.innerRows = e.innerRows[:0] - e.outerChunk = newFirstChunk(e.outerExec) - e.innerChunk = newFirstChunk(e.innerExec) + e.outerChunk = tryNewCacheChunk(e.outerExec) + e.innerChunk = tryNewCacheChunk(e.innerExec) e.innerList = chunk.NewList(retTypes(e.innerExec), e.initCap, e.maxChunkSize) e.memTracker = memory.NewTracker(e.id, -1) diff --git a/executor/joiner.go b/executor/joiner.go index 842135802444f..5fe4d92eba2a2 100644 --- a/executor/joiner.go +++ b/executor/joiner.go @@ -192,7 +192,7 @@ func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType, return &antiLeftOuterSemiJoiner{base} case plannercore.LeftOuterJoin, plannercore.RightOuterJoin, plannercore.InnerJoin: if len(base.conditions) > 0 { - base.chk = chunk.NewChunkWithCapacity(shallowRowType, ctx.GetSessionVars().MaxChunkSize) + base.chk = ctx.GetSessionVars().GetNewChunk(shallowRowType, ctx.GetSessionVars().MaxChunkSize) } switch joinType { case plannercore.LeftOuterJoin: diff --git a/executor/merge_join.go b/executor/merge_join.go index 233d140ade678..a64a9fa0c33dc 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -77,7 +77,7 @@ type mergeJoinTable struct { func (t *mergeJoinTable) init(exec *MergeJoinExec) { child := exec.children[t.childIndex] - t.childChunk = newFirstChunk(child) + t.childChunk = tryNewCacheChunk(child) t.childChunkIter = chunk.NewIterator4Chunk(t.childChunk) items := make([]expression.Expression, 0, len(t.joinKeys)) diff --git a/executor/parallel_apply.go b/executor/parallel_apply.go index d0aa68af87d2b..2c4499b14818c 100644 --- a/executor/parallel_apply.go +++ b/executor/parallel_apply.go @@ -107,7 +107,7 @@ func (e *ParallelNestedLoopApplyExec) Open(ctx context.Context) error { e.hasMatch = make([]bool, e.concurrency) e.hasNull = make([]bool, e.concurrency) for i := 0; i < e.concurrency; i++ { - e.innerChunk[i] = newFirstChunk(e.innerExecs[i]) + e.innerChunk[i] = tryNewCacheChunk(e.innerExecs[i]) e.innerList[i] = chunk.NewList(retTypes(e.innerExecs[i]), e.initCap, e.maxChunkSize) e.innerList[i].GetMemTracker().SetLabel(memory.LabelForInnerList) e.innerList[i].GetMemTracker().AttachTo(e.memTracker) @@ -206,7 +206,7 @@ func (e *ParallelNestedLoopApplyExec) outerWorker(ctx context.Context) { var err error for { failpoint.Inject("parallelApplyOuterWorkerPanic", nil) - chk := newFirstChunk(e.outerExec) + chk := tryNewCacheChunk(e.outerExec) if err := Next(ctx, e.outerExec, chk); err != nil { e.putResult(nil, err) return diff --git a/executor/pipelined_window.go b/executor/pipelined_window.go index 1a72864bcf225..505cf09f415d7 100644 --- a/executor/pipelined_window.go +++ b/executor/pipelined_window.go @@ -205,7 +205,7 @@ func (e *PipelinedWindowExec) getRowsInPartition(ctx context.Context) (err error func (e *PipelinedWindowExec) fetchChild(ctx context.Context) (EOF bool, err error) { // TODO: reuse chunks - childResult := newFirstChunk(e.children[0]) + childResult := tryNewCacheChunk(e.children[0]) err = Next(ctx, e.children[0], childResult) if err != nil { return false, errors.Trace(err) @@ -217,7 +217,7 @@ func (e *PipelinedWindowExec) fetchChild(ctx context.Context) (EOF bool, err err } // TODO: reuse chunks - resultChk := chunk.New(e.retFieldTypes, 0, numRows) + resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows) err = e.copyChk(childResult, resultChk) if err != nil { return false, err diff --git a/executor/projection.go b/executor/projection.go index ac060e7e4a391..27ce1bafc0b8a 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -109,7 +109,7 @@ func (e *ProjectionExec) open(ctx context.Context) error { } if e.isUnparallelExec() { - e.childResult = newFirstChunk(e.children[0]) + e.childResult = tryNewCacheChunk(e.children[0]) e.memTracker.Consume(e.childResult.MemoryUsage()) } diff --git a/executor/select_into.go b/executor/select_into.go index d526edec874f4..556edc58c7cbe 100644 --- a/executor/select_into.go +++ b/executor/select_into.go @@ -60,7 +60,7 @@ func (s *SelectIntoExec) Open(ctx context.Context) error { s.started = true s.dstFile = f s.writer = bufio.NewWriter(s.dstFile) - s.chk = newFirstChunk(s.children[0]) + s.chk = tryNewCacheChunk(s.children[0]) s.lineBuf = make([]byte, 0, 1024) s.fieldBuf = make([]byte, 0, 64) s.escapeBuf = make([]byte, 0, 64) diff --git a/executor/set_test.go b/executor/set_test.go index 72190fc4422af..a8d4a0b059246 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -2041,3 +2041,20 @@ func TestSetPlanCacheMemoryMonitor(t *testing.T) { tk.MustExec("set @@global.tidb_enable_prepared_plan_cache_memory_monitor=off;") tk.MustQuery("select @@global.tidb_enable_prepared_plan_cache_memory_monitor").Check(testkit.Rows("0")) } + +func TestSetChunkReuseVariable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_enable_reuse_chunk=ON;") + tk.MustQuery("select @@session.tidb_enable_reuse_chunk").Check(testkit.Rows("1")) + tk.MustExec("set GLOBAL tidb_enable_reuse_chunk=ON;") + tk.MustQuery("select @@global.tidb_enable_reuse_chunk").Check(testkit.Rows("1")) + + tk.MustExec("set @@tidb_enable_reuse_chunk=OFF;") + tk.MustQuery("select @@session.tidb_enable_reuse_chunk").Check(testkit.Rows("0")) + tk.MustExec("set GLOBAL tidb_enable_reuse_chunk=OFF;") + tk.MustQuery("select @@global.tidb_enable_reuse_chunk").Check(testkit.Rows("0")) + + // error value + tk.MustGetErrCode("set @@tidb_enable_reuse_chunk=s;", errno.ErrWrongValueForVar) +} diff --git a/executor/shuffle.go b/executor/shuffle.go index 2827d0e1cece7..7596d2cef1970 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -262,7 +262,7 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context, dataSourceIndex int workerIndices []int ) results := make([]*chunk.Chunk, len(e.workers)) - chk := newFirstChunk(e.dataSources[dataSourceIndex]) + chk := tryNewCacheChunk(e.dataSources[dataSourceIndex]) defer func() { if r := recover(); r != nil { diff --git a/executor/sort.go b/executor/sort.go index e82ffddf3f4ae..06241993e05f3 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -194,7 +194,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { e.rowChunks.GetDiskTracker().SetLabel(memory.LabelForRowChunks) } for { - chk := newFirstChunk(e.children[0]) + chk := tryNewCacheChunk(e.children[0]) err := Next(ctx, e.children[0], chk) if err != nil { return err @@ -434,7 +434,7 @@ func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { e.rowChunks.GetMemTracker().AttachTo(e.memTracker) e.rowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks) for uint64(e.rowChunks.Len()) < e.totalLimit { - srcChk := newFirstChunk(e.children[0]) + srcChk := tryNewCacheChunk(e.children[0]) // adjust required rows by total limit srcChk.SetRequiredRows(int(e.totalLimit-uint64(e.rowChunks.Len())), e.maxChunkSize) err := Next(ctx, e.children[0], srcChk) @@ -460,7 +460,7 @@ func (e *TopNExec) executeTopN(ctx context.Context) error { // The number of rows we loaded may exceeds total limit, remove greatest rows by Pop. heap.Pop(e.chkHeap) } - childRowChk := newFirstChunk(e.children[0]) + childRowChk := tryNewCacheChunk(e.children[0]) for { err := Next(ctx, e.children[0], childRowChk) if err != nil { diff --git a/executor/union_scan.go b/executor/union_scan.go index 9c0483d974f1c..a23cd8b8c7873 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -129,7 +129,7 @@ func (us *UnionScanExec) open(ctx context.Context) error { if err != nil { return err } - us.snapshotChunkBuffer = newFirstChunk(us) + us.snapshotChunkBuffer = tryNewCacheChunk(us) return nil } diff --git a/executor/update.go b/executor/update.go index d9cffabd08355..cf0a6ae2e33f4 100644 --- a/executor/update.go +++ b/executor/update.go @@ -250,7 +250,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { fields := retTypes(e.children[0]) colsInfo := plannercore.GetUpdateColumnsInfo(e.tblID2table, e.tblColPosInfos, len(fields)) globalRowIdx := 0 - chk := newFirstChunk(e.children[0]) + chk := tryNewCacheChunk(e.children[0]) if !e.allAssignmentsAreConstant { e.evalBuffer = chunk.MutRowFromTypes(fields) } diff --git a/executor/window.go b/executor/window.go index 3ce26a03e59cf..ef284344d0c8c 100644 --- a/executor/window.go +++ b/executor/window.go @@ -151,7 +151,7 @@ func (e *WindowExec) consumeGroupRows(groupRows []chunk.Row) (err error) { } func (e *WindowExec) fetchChild(ctx context.Context) (EOF bool, err error) { - childResult := newFirstChunk(e.children[0]) + childResult := tryNewCacheChunk(e.children[0]) err = Next(ctx, e.children[0], childResult) if err != nil { return false, errors.Trace(err) @@ -162,7 +162,7 @@ func (e *WindowExec) fetchChild(ctx context.Context) (EOF bool, err error) { return true, nil } - resultChk := chunk.New(e.retFieldTypes, 0, numRows) + resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows) err = e.copyChk(childResult, resultChk) if err != nil { return false, err diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 1c6ab1490f9bb..64b38de1c9db3 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1543,6 +1543,7 @@ func TestVariablesInfo(t *testing.T) { // See session/bootstrap.go:doDMLWorks() for where the exceptions are defined. stmt := tk.MustQuery(`SELECT variable_name, default_value, current_value FROM information_schema.variables_info WHERE current_value != default_value and default_value != '' ORDER BY variable_name`) stmt.Check(testkit.Rows( + "last_sql_use_alloc OFF ON", // for test stability "tidb_enable_auto_analyze ON OFF", // always changed for tests "tidb_enable_collect_execution_info ON OFF", // for test stability "tidb_enable_mutation_checker OFF ON", // for new installs diff --git a/server/conn.go b/server/conn.go index 4912ab030a927..54e29c566c7eb 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1122,6 +1122,7 @@ func (cc *clientConn) Run(ctx context.Context) { startTime := time.Now() err = cc.dispatch(ctx, data) cc.chunkAlloc.Reset() + cc.ctx.GetSessionVars().ClearAlloc() if err != nil { cc.audit(plugin.Error) // tell the plugin API there was a dispatch error if terror.ErrorEqual(err, io.EOF) { @@ -1866,6 +1867,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { sc := cc.ctx.GetSessionVars().StmtCtx prevWarns := sc.GetWarnings() var stmts []ast.StmtNode + cc.ctx.GetSessionVars().SetAlloc(cc.chunkAlloc) if stmts, err = cc.ctx.Parse(ctx, sql); err != nil { return err } diff --git a/server/conn_stmt.go b/server/conn_stmt.go index ef92ea85abaf0..65ad113d6ad5a 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -158,6 +158,10 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e return mysql.NewErrf(mysql.ErrUnknown, "unsupported flag: CursorTypeScrollable", nil) } + if !useCursor { + // not using streaming ,can reuse chunk + cc.ctx.GetSessionVars().SetAlloc(cc.chunkAlloc) + } // skip iteration-count, always 1 pos += 4 @@ -295,6 +299,7 @@ const ( func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err error) { cc.ctx.GetSessionVars().StartTime = time.Now() + cc.ctx.GetSessionVars().ClearAlloc() stmtID, fetchSize, err := parseStmtFetchCmd(data) if err != nil { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 9d5fc86387cee..86e3b36dc1f73 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -339,6 +339,9 @@ type StatementContext struct { SavepointName string HasFKCascades bool } + + // useChunkAlloc indicates whether statement use chunk alloc + useChunkAlloc bool } // StmtHints are SessionVars related sql hints. @@ -502,6 +505,21 @@ func (sc *StatementContext) GetResourceGroupTagger() tikvrpc.ResourceGroupTagger } } +// SetUseChunkAlloc set use chunk alloc status +func (sc *StatementContext) SetUseChunkAlloc() { + sc.useChunkAlloc = true +} + +// ClearUseChunkAlloc clear useChunkAlloc status +func (sc *StatementContext) ClearUseChunkAlloc() { + sc.useChunkAlloc = false +} + +// GetUseChunkAllocStatus returns useChunkAlloc status +func (sc *StatementContext) GetUseChunkAllocStatus() bool { + return sc.useChunkAlloc +} + // SetPlanDigest sets the normalized plan and plan digest. func (sc *StatementContext) SetPlanDigest(normalized string, planDigest *parser.Digest) { if planDigest != nil { diff --git a/sessionctx/variable/BUILD.bazel b/sessionctx/variable/BUILD.bazel index 10ecc7d92b030..fa4865079e8bf 100644 --- a/sessionctx/variable/BUILD.bazel +++ b/sessionctx/variable/BUILD.bazel @@ -97,6 +97,7 @@ go_test( "//testkit", "//testkit/testsetup", "//types", + "//util/chunk", "//util/execdetails", "//util/memory", "//util/mock", diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 06a1ac9010823..3e8c9ed0008cc 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1293,6 +1293,72 @@ type SessionVars struct { // OptPrefixIndexSingleScan indicates whether to do some optimizations to avoid double scan for prefix index. // When set to true, `col is (not) null`(`col` is index prefix column) is regarded as index filter rather than table filter. OptPrefixIndexSingleScan bool + + // ChunkPool Several chunks and columns are cached + ChunkPool struct { + Lock sync.Mutex + Alloc chunk.Allocator + } + // EnableReuseCheck indicates request chunk whether use chunk alloc + EnableReuseCheck bool + + // preuseChunkAlloc indicates whether pre statement use chunk alloc + // like select @@last_sql_use_alloc + preUseChunkAlloc bool +} + +// GetNewChunk Attempt to request memory from the chunk pool +// thread safety +func (s *SessionVars) GetNewChunk(fields []*types.FieldType, capacity int) *chunk.Chunk { + //Chunk memory pool is not set + if s.ChunkPool.Alloc == nil { + return chunk.NewChunkWithCapacity(fields, capacity) + } + s.ChunkPool.Lock.Lock() + defer s.ChunkPool.Lock.Unlock() + if s.ChunkPool.Alloc.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) { + s.StmtCtx.SetUseChunkAlloc() + } + chk := s.ChunkPool.Alloc.Alloc(fields, capacity, capacity) + return chk +} + +// GetNewChunkWithCapacity Attempt to request memory from the chunk pool +// thread safety +func (s *SessionVars) GetNewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk { + if s.ChunkPool.Alloc == nil { + return chunk.New(fields, capacity, maxCachesize) + } + s.ChunkPool.Lock.Lock() + defer s.ChunkPool.Lock.Unlock() + if s.ChunkPool.Alloc.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) { + s.StmtCtx.SetUseChunkAlloc() + } + chk := s.ChunkPool.Alloc.Alloc(fields, capacity, maxCachesize) + return chk +} + +// ExchangeChunkStatus give the status to preUseChunkAlloc +func (s *SessionVars) ExchangeChunkStatus() { + s.preUseChunkAlloc = s.GetUseChunkAlloc() +} + +// GetUseChunkAlloc return useChunkAlloc status +func (s *SessionVars) GetUseChunkAlloc() bool { + return s.StmtCtx.GetUseChunkAllocStatus() +} + +// SetAlloc Attempt to set the buffer pool address +func (s *SessionVars) SetAlloc(alloc chunk.Allocator) { + if !s.EnableReuseCheck { + return + } + s.ChunkPool.Alloc = alloc +} + +// ClearAlloc indicates stop reuse chunk +func (s *SessionVars) ClearAlloc() { + s.ChunkPool.Alloc = nil } // GetPreparedStmtByName returns the prepared statement specified by stmtName. @@ -1587,6 +1653,13 @@ func NewSessionVars(hctx HookContext) *SessionVars { EnableTiFlashReadForWriteStmt: DefTiDBEnableTiFlashReadForWriteStmt, ForeignKeyChecks: DefTiDBForeignKeyChecks, HookContext: hctx, + EnableReuseCheck: DefTiDBEnableReusechunk, + //useChunkAlloc: DefTiDBUseAlloc, + preUseChunkAlloc: DefTiDBUseAlloc, + ChunkPool: struct { + Lock sync.Mutex + Alloc chunk.Allocator + }{Alloc: nil}, } vars.KVVars = tikvstore.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 91f1394499c97..903ecf1bf4fa8 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -23,10 +23,13 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/auth" + "github.com/pingcap/tidb/parser/mysql" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" @@ -422,3 +425,71 @@ func TestHookContext(t *testing.T) { ctx.GetSessionVars().SetSystemVar("testhooksysvar", "test") } + +func TestGetReuseChunk(t *testing.T) { + fieldTypes := []*types.FieldType{ + types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeJSON).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeFloat).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeNewDecimal).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeDouble).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeLonglong).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeDatetime).BuildP(), + } + + sessVars := variable.NewSessionVars(nil) + + // SetAlloc efficient + sessVars.SetAlloc(nil) + require.Nil(t, sessVars.ChunkPool.Alloc) + require.False(t, sessVars.GetUseChunkAlloc()) + // alloc is nil ,Allocate memory from the system + chk1 := sessVars.GetNewChunk(fieldTypes, 10) + require.NotNil(t, chk1) + chk2 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10) + require.NotNil(t, chk2) + + chunkReuseMap := make(map[*chunk.Chunk]struct{}, 14) + columnReuseMap := make(map[*chunk.Column]struct{}, 14) + + alloc := chunk.NewAllocator() + sessVars.EnableReuseCheck = true + sessVars.SetAlloc(alloc) + require.NotNil(t, sessVars.ChunkPool.Alloc) + require.Equal(t, alloc, sessVars.ChunkPool.Alloc) + require.False(t, sessVars.GetUseChunkAlloc()) + + //tries to apply from the cache + initCap := 10 + chk1 = sessVars.GetNewChunk(fieldTypes, initCap) + require.NotNil(t, chk1) + chunkReuseMap[chk1] = struct{}{} + for i := 0; i < chk1.NumCols(); i++ { + columnReuseMap[chk1.Column(i)] = struct{}{} + } + chk2 = sessVars.GetNewChunkWithCapacity(fieldTypes, initCap, initCap) + require.NotNil(t, chk2) + chunkReuseMap[chk2] = struct{}{} + for i := 0; i < chk2.NumCols(); i++ { + columnReuseMap[chk2.Column(i)] = struct{}{} + } + + alloc.Reset() + chkres1 := sessVars.GetNewChunk(fieldTypes, 10) + _, exist := chunkReuseMap[chkres1] + require.True(t, exist) + for i := 0; i < chkres1.NumCols(); i++ { + _, exist := columnReuseMap[chkres1.Column(i)] + require.True(t, exist) + } + chkres2 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10) + require.NotNil(t, chkres2) + _, exist = chunkReuseMap[chkres2] + require.True(t, exist) + for i := 0; i < chkres2.NumCols(); i++ { + _, exist := columnReuseMap[chkres2.Column(i)] + require.True(t, exist) + } + sessVars.ClearAlloc() + require.Nil(t, sessVars.ChunkPool.Alloc) +} diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index c032824110f1d..a7acc03734611 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -347,6 +347,9 @@ var defaultSysVars = []*SysVar{ return s.LastPlanReplayerToken, nil }, }, + {Scope: ScopeSession, Name: TiDBUseAlloc, Value: BoolToOnOff(DefTiDBUseAlloc), Type: TypeBool, ReadOnly: true, GetSession: func(s *SessionVars) (string, error) { + return BoolToOnOff(s.preUseChunkAlloc), nil + }}, /* The system variables below have INSTANCE scope */ {Scope: ScopeInstance, Name: TiDBLogFileMaxDays, Value: strconv.Itoa(config.GetGlobalConfig().Log.File.MaxDays), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { maxAge, err := strconv.ParseInt(val, 10, 32) @@ -1968,6 +1971,7 @@ var defaultSysVars = []*SysVar{ return nil }, }, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptPrefixIndexSingleScan, Value: BoolToOnOff(DefTiDBOptPrefixIndexSingleScan), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { s.OptPrefixIndexSingleScan = TiDBOptOn(val) return nil @@ -1989,6 +1993,11 @@ var defaultSysVars = []*SysVar{ s.EnableExternalTSRead = TiDBOptOn(val) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableReusechunk, Value: BoolToOnOff(DefTiDBEnableReusechunk), Type: TypeBool, + SetSession: func(s *SessionVars, val string) error { + s.EnableReuseCheck = TiDBOptOn(val) + return nil + }}, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 97dc4bda88456..db6a39e3280a1 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -252,6 +252,9 @@ const ( // TiDBEnableTiFlashReadForWriteStmt indicates whether to enable TiFlash to read for write statements. TiDBEnableTiFlashReadForWriteStmt = "tidb_enable_tiflash_read_for_write_stmt" + + // TiDBUseAlloc indicates whether the last statement used chunk alloc + TiDBUseAlloc = "last_sql_use_alloc" ) // TiDB system variable names that both in session and global scope. @@ -764,6 +767,9 @@ const ( // TiDBEnableExternalTSRead indicates whether to enable read through an external ts TiDBEnableExternalTSRead = "tidb_enable_external_ts_read" + + // TiDBEnableReusechunk indicates whether to enable chunk alloc + TiDBEnableReusechunk = "tidb_enable_reuse_chunk" ) // TiDB vars that have only global scope @@ -1082,6 +1088,8 @@ const ( DefTiDBOptPrefixIndexSingleScan = true DefTiDBExternalTS = 0 DefTiDBEnableExternalTSRead = false + DefTiDBEnableReusechunk = true + DefTiDBUseAlloc = false ) // Process global variables. diff --git a/testkit/BUILD.bazel b/testkit/BUILD.bazel index 2ab1fb67ada3e..39cedb2ce0b53 100644 --- a/testkit/BUILD.bazel +++ b/testkit/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//store/mockstore", "//util", "//util/breakpoint", + "//util/chunk", "//util/gctuner", "//util/sqlexec", "@com_github_pingcap_errors//:errors", diff --git a/testkit/testkit.go b/testkit/testkit.go index 345746e6c7ef2..6952f4c36d484 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -48,6 +49,7 @@ type TestKit struct { t testing.TB store kv.Storage session session.Session + alloc chunk.Allocator } // NewTestKit returns a new *TestKit. @@ -57,6 +59,7 @@ func NewTestKit(t testing.TB, store kv.Storage) *TestKit { assert: assert.New(t), t: t, store: store, + alloc: chunk.NewAllocator(), } tk.RefreshSession() @@ -86,6 +89,7 @@ func NewTestKitWithSession(t testing.TB, store kv.Storage, se session.Session) * t: t, store: store, session: se, + alloc: chunk.NewAllocator(), } } @@ -110,6 +114,12 @@ func (tk *TestKit) Session() session.Session { // MustExec executes a sql statement and asserts nil error. func (tk *TestKit) MustExec(sql string, args ...interface{}) { + defer func() { + tk.Session().GetSessionVars().ClearAlloc() + if tk.alloc != nil { + tk.alloc.Reset() + } + }() tk.MustExecWithContext(context.Background(), sql, args...) } @@ -127,6 +137,12 @@ func (tk *TestKit) MustExecWithContext(ctx context.Context, sql string, args ... // MustQuery query the statements and returns result rows. // If expected result is set it asserts the query result equals expected result. func (tk *TestKit) MustQuery(sql string, args ...interface{}) *Result { + defer func() { + tk.Session().GetSessionVars().ClearAlloc() + if tk.alloc != nil { + tk.alloc.Reset() + } + }() return tk.MustQueryWithContext(context.Background(), sql, args...) } @@ -269,6 +285,7 @@ func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...inte } warns := sc.GetWarnings() parserWarns := warns[len(prevWarns):] + tk.Session().GetSessionVars().SetAlloc(tk.alloc) var rs0 sqlexec.RecordSet for i, stmt := range stmts { var rs sqlexec.RecordSet @@ -297,6 +314,7 @@ func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...inte return nil, errors.Trace(err) } params := expression.Args2Expressions4Test(args...) + tk.Session().GetSessionVars().SetAlloc(tk.alloc) rs, err := tk.session.ExecutePreparedStmt(ctx, stmtID, params) if err != nil { return rs, errors.Trace(err) diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index b1ee1d700d8bc..960f68a2b597d 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//store/mockstore/unistore/metrics", "//tidb-binlog/pump_client", "//util", + "//util/chunk", "//util/cpuprofile", "//util/deadlockhistory", "//util/disk", diff --git a/tidb-server/main.go b/tidb-server/main.go index 056814222775e..a6e80cbbfe4b6 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -58,6 +58,7 @@ import ( uni_metrics "github.com/pingcap/tidb/store/mockstore/unistore/metrics" pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/cpuprofile" "github.com/pingcap/tidb/util/deadlockhistory" "github.com/pingcap/tidb/util/disk" @@ -715,6 +716,7 @@ func setGlobalVars() { deadlockhistory.GlobalDeadlockHistory.Resize(cfg.PessimisticTxn.DeadlockHistoryCapacity) txninfo.Recorder.ResizeSummaries(cfg.TrxSummary.TransactionSummaryCapacity) txninfo.Recorder.SetMinDuration(time.Duration(cfg.TrxSummary.TransactionIDDigestMinDuration) * time.Millisecond) + chunk.InitChunkAllocSize(cfg.TiDBMaxReuseChunk, cfg.TiDBMaxReuseColumn) } func setupLog() { diff --git a/util/chunk/alloc.go b/util/chunk/alloc.go index 4b706e6297431..44fbb126a4989 100644 --- a/util/chunk/alloc.go +++ b/util/chunk/alloc.go @@ -15,6 +15,8 @@ package chunk import ( + "math" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/mathutil" ) @@ -24,18 +26,37 @@ import ( // and Alloc() allocates from the pool. type Allocator interface { Alloc(fields []*types.FieldType, capacity, maxChunkSize int) *Chunk + CheckReuseAllocSize() bool Reset() } +var maxFreeChunks = 64 +var maxFreeColumnsPerType = 256 + +// InitChunkAllocSize init the maximum cache size +func InitChunkAllocSize(setMaxFreeChunks, setMaxFreeColumns uint32) { + if setMaxFreeChunks > math.MaxInt32 { + setMaxFreeChunks = math.MaxInt32 + } + if setMaxFreeColumns > math.MaxInt32 { + setMaxFreeColumns = math.MaxInt32 + } + maxFreeChunks = int(setMaxFreeChunks) + maxFreeColumnsPerType = int(setMaxFreeColumns) +} + // NewAllocator creates an Allocator. func NewAllocator() *allocator { - ret := &allocator{} + ret := &allocator{freeChunk: maxFreeChunks} ret.columnAlloc.init() return ret } var _ Allocator = &allocator{} +// MaxCachedLen Maximum cacheable length +var MaxCachedLen = 16 * 1024 + // allocator try to reuse objects. // It uses `poolColumnAllocator` to alloc chunk column objects. // The allocated chunks are recorded in the `allocated` array. @@ -45,6 +66,27 @@ type allocator struct { allocated []*Chunk free []*Chunk columnAlloc poolColumnAllocator + freeChunk int +} + +// columnList keep column +type columnList struct { + freeColumns []*Column + allocColumns []*Column +} + +func (cList *columnList) add(col *Column) { + cList.freeColumns = append(cList.freeColumns, col) +} + +// columnList Len Get the number of elements in the list +func (cList *columnList) Len() int { + return len(cList.freeColumns) + len(cList.allocColumns) +} + +// CheckReuseAllocSize return whether the cache can cache objects +func (a *allocator) CheckReuseAllocSize() bool { + return a.freeChunk > 0 || a.columnAlloc.freeColumnsPerType > 0 } // Alloc implements the Allocator interface. @@ -66,53 +108,80 @@ func (a *allocator) Alloc(fields []*types.FieldType, capacity, maxChunkSize int) chk.columns = append(chk.columns, a.columnAlloc.NewColumn(f, chk.capacity)) } - a.allocated = append(a.allocated, chk) + //avoid OOM + if a.freeChunk > len(a.allocated) { + a.allocated = append(a.allocated, chk) + } + return chk } -const ( - maxFreeChunks = 64 - maxFreeColumnsPerType = 256 -) - // Reset implements the Allocator interface. func (a *allocator) Reset() { - a.free = a.free[:0] for i, chk := range a.allocated { a.allocated[i] = nil - // Decouple chunk into chunk column objects and put them back to the column allocator for reuse. - for _, col := range chk.columns { - a.columnAlloc.put(col) - } - // Reset the chunk and put it to the free list for reuse. chk.resetForReuse() - - if len(a.free) < maxFreeChunks { // Don't cache too much data. + if len(a.free) < a.freeChunk { // Don't cache too much data. a.free = append(a.free, chk) } } a.allocated = a.allocated[:0] + + //column objects and put them to the column allocator for reuse. + for _, pool := range a.columnAlloc.pool { + for _, col := range pool.allocColumns { + if (len(pool.freeColumns) < a.columnAlloc.freeColumnsPerType) && (!col.avoidReusing) && (cap(col.data) < MaxCachedLen) { + col.reset() + pool.freeColumns = append(pool.freeColumns, col) + } + } + pool.allocColumns = pool.allocColumns[:0] + } } var _ ColumnAllocator = &poolColumnAllocator{} type poolColumnAllocator struct { - pool map[int]freeList + pool map[int]*columnList + freeColumnsPerType int } // poolColumnAllocator implements the ColumnAllocator interface. func (alloc *poolColumnAllocator) NewColumn(ft *types.FieldType, count int) *Column { typeSize := getFixedLen(ft) + col := alloc.NewSizeColumn(typeSize, count) + + //column objects and put them back to the allocated column . + alloc.put(col) + return col +} + +// poolColumnAllocator implements the ColumnAllocator interface. +func (alloc *poolColumnAllocator) NewSizeColumn(typeSize int, count int) *Column { l := alloc.pool[typeSize] if l != nil && !l.empty() { col := l.pop() + + if cap(col.data) < count { + col = newColumn(typeSize, count) + } return col } return newColumn(typeSize, count) } +func (cList *columnList) pop() *Column { + if len(cList.freeColumns) == 0 { + return nil + } + col := cList.freeColumns[len(cList.freeColumns)-1] + cList.freeColumns = cList.freeColumns[:len(cList.freeColumns)-1] + return col +} + func (alloc *poolColumnAllocator) init() { - alloc.pool = make(map[int]freeList) + alloc.pool = make(map[int]*columnList) + alloc.freeColumnsPerType = maxFreeColumnsPerType } func (alloc *poolColumnAllocator) put(col *Column) { @@ -120,16 +189,20 @@ func (alloc *poolColumnAllocator) put(col *Column) { return } typeSize := col.typeSize() - if typeSize <= 0 { + if typeSize <= 0 && typeSize != varElemLen { return } l := alloc.pool[typeSize] if l == nil { - l = make(map[*Column]struct{}, 8) + l = &columnList{freeColumns: nil, allocColumns: nil} + l.freeColumns = make([]*Column, 0, alloc.freeColumnsPerType) + l.allocColumns = make([]*Column, 0, alloc.freeColumnsPerType) alloc.pool[typeSize] = l } - l.push(col) + if len(l.allocColumns) < alloc.freeColumnsPerType { + l.push(col) + } } // freeList is defined as a map, rather than a list, because when recycling chunk @@ -137,22 +210,12 @@ func (alloc *poolColumnAllocator) put(col *Column) { // reference to the others. type freeList map[*Column]struct{} -func (l freeList) empty() bool { - return len(l) == 0 +func (cList *columnList) empty() bool { + return len(cList.freeColumns) == 0 } -func (l freeList) pop() *Column { - for k := range l { - delete(l, k) - return k - } - return nil -} - -func (l freeList) push(c *Column) { - if len(l) >= maxFreeColumnsPerType { - // Don't cache too much to save memory. - return +func (cList *columnList) push(col *Column) { + if cap(col.data) < MaxCachedLen { + cList.allocColumns = append(cList.allocColumns, col) } - l[c] = struct{}{} } diff --git a/util/chunk/alloc_test.go b/util/chunk/alloc_test.go index 33b52590eab7c..edad5e3008e77 100644 --- a/util/chunk/alloc_test.go +++ b/util/chunk/alloc_test.go @@ -114,7 +114,7 @@ func TestColumnAllocator(t *testing.T) { // Check max column size. freeList := alloc1.pool[getFixedLen(ft)] require.NotNil(t, freeList) - require.Len(t, freeList, maxFreeColumnsPerType) + require.Equal(t, freeList.Len(), maxFreeColumnsPerType) } func TestNoDuplicateColumnReuse(t *testing.T) { @@ -202,3 +202,71 @@ func TestAvoidColumnReuse(t *testing.T) { require.True(t, col.avoidReusing) } } + +func TestColumnAllocatorLimit(t *testing.T) { + fieldTypes := []*types.FieldType{ + types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeJSON).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeFloat).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeNewDecimal).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeDouble).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeLonglong).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeDatetime).BuildP(), + } + + //set cache size + InitChunkAllocSize(10, 20) + alloc := NewAllocator() + require.True(t, alloc.CheckReuseAllocSize()) + for i := 0; i < maxFreeChunks+10; i++ { + alloc.Alloc(fieldTypes, 5, 10) + } + alloc.Reset() + require.Equal(t, len(alloc.free), 10) + for _, p := range alloc.columnAlloc.pool { + require.True(t, (p.Len() <= 20)) + } + + //Reduce capacity + InitChunkAllocSize(5, 10) + alloc = NewAllocator() + for i := 0; i < maxFreeChunks+10; i++ { + alloc.Alloc(fieldTypes, 5, 10) + } + alloc.Reset() + require.Equal(t, len(alloc.free), 5) + for _, p := range alloc.columnAlloc.pool { + require.True(t, (p.Len() <= 10)) + } + + //increase capacity + InitChunkAllocSize(50, 100) + alloc = NewAllocator() + for i := 0; i < maxFreeChunks+10; i++ { + alloc.Alloc(fieldTypes, 5, 10) + } + alloc.Reset() + require.Equal(t, len(alloc.free), 50) + for _, p := range alloc.columnAlloc.pool { + require.True(t, (p.Len() <= 100)) + } + + //long characters are not cached + alloc = NewAllocator() + rs := alloc.Alloc([]*types.FieldType{types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).BuildP()}, 1024, 1024) + nu := len(alloc.columnAlloc.pool[varElemLen].allocColumns) + require.Equal(t, nu, 1) + for _, col := range rs.columns { + for i := 0; i < 20480; i++ { + col.data = append(col.data, byte('a')) + } + } + alloc.Reset() + for _, p := range alloc.columnAlloc.pool { + require.True(t, (p.Len() == 0)) + } + + InitChunkAllocSize(0, 0) + alloc = NewAllocator() + require.False(t, alloc.CheckReuseAllocSize()) +}