diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 62c25cfc3e5f2..59546860fbfec 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mock" @@ -55,7 +56,8 @@ type testSuite struct { func (s *testSuite) SetUpSuite(c *C) { ctx := mock.NewContext() ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{ - MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL), + MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL), + DiskTracker: disk.NewTracker(stringutil.StringerStr("testSuite"), -1), } ctx.Store = &mock.Store{ Client: &mock.Client{ diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index a120c2b6d1850..3260ce2cac7b0 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/stringutil" @@ -558,6 +559,7 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1) + ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1) ctx.GetSessionVars().IndexLookupJoinConcurrency = 4 tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}} tc.cols = cols @@ -603,7 +605,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) } t := memory.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), memLimit) t.SetActionOnExceed(nil) + t2 := disk.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), -1) e.ctx.GetSessionVars().StmtCtx.MemTracker = t + e.ctx.GetSessionVars().StmtCtx.DiskTracker = t2 return e } @@ -865,6 +869,7 @@ func defaultIndexJoinTestCase() *indexJoinTestCase { ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize ctx.GetSessionVars().SnapshotTS = 1 ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1) + ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1) tc := &indexJoinTestCase{ outerRows: 100000, innerRows: variable.DefMaxChunkSize * 100, diff --git a/executor/executor.go b/executor/executor.go index ff4acd00a981d..6a10c3620642d 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" @@ -1493,9 +1494,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { memQuota = stmtHints.MemQuotaQuery } sc := &stmtctx.StatementContext{ - StmtHints: stmtHints, - TimeZone: vars.Location(), - MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota), + StmtHints: stmtHints, + TimeZone: vars.Location(), + MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota), + DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1), } switch config.GetGlobalConfig().OOMAction { case config.OOMActionCancel: diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 348b043f883de..76dcb742c1796 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mock" ) @@ -206,6 +207,7 @@ func defaultCtx() sessionctx.Context { ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, ctx.GetSessionVars().MemQuotaQuery) + ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1) ctx.GetSessionVars().SnapshotTS = uint64(1) return ctx } diff --git a/executor/hash_table.go b/executor/hash_table.go index 77e11576004c3..c453e781efd21 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/stringutil" "go.uber.org/zap" ) @@ -88,7 +89,8 @@ type hashRowContainer struct { // memTracker is the reference of records.GetMemTracker(). // records would be set to nil for garbage collection when spilling is activated // so we need this reference. - memTracker *memory.Tracker + memTracker *memory.Tracker + diskTracker *disk.Tracker // records stores the chunks in memory. records *chunk.List @@ -122,9 +124,10 @@ func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContex sc: sCtx.GetSessionVars().StmtCtx, hCtx: hCtx, - hashTable: newRowHashMap(estCount), - memTracker: initList.GetMemTracker(), - records: initList, + hashTable: newRowHashMap(estCount), + memTracker: initList.GetMemTracker(), + diskTracker: disk.NewTracker(stringutil.StringerStr("hashRowContainer"), -1), + records: initList, } return c @@ -174,6 +177,7 @@ func (c *hashRowContainer) matchJoinKey(buildRow, probeRow chunk.Row, probeHCtx func (c *hashRowContainer) spillToDisk() (err error) { N := c.records.NumChunks() c.recordsInDisk = chunk.NewListInDisk(c.hCtx.allTypes) + c.recordsInDisk.GetDiskTracker().AttachTo(c.diskTracker) for i := 0; i < N; i++ { chk := c.records.GetChunk(i) err = c.recordsInDisk.Add(chk) @@ -271,7 +275,7 @@ func (c *hashRowContainer) Close() error { func (c *hashRowContainer) GetMemTracker() *memory.Tracker { return c.memTracker } // GetDiskTracker returns the underlying disk usage tracker in hashRowContainer. -func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.recordsInDisk.GetDiskTracker() } +func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.diskTracker } // ActionSpill returns a memory.ActionOnExceed for spilling over to disk. func (c *hashRowContainer) ActionSpill() memory.ActionOnExceed { diff --git a/executor/join.go b/executor/join.go index d9d7c921dd226..8e6c95f339b2d 100644 --- a/executor/join.go +++ b/executor/join.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/util/bitmap" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" ) @@ -70,6 +71,7 @@ type HashJoinExec struct { joinResultCh chan *hashjoinWorkerResult memTracker *memory.Tracker // track memory usage. + diskTracker *disk.Tracker // track disk usage. prepared bool isOuterJoin bool @@ -145,6 +147,9 @@ func (e *HashJoinExec) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.diskTracker = disk.NewTracker(e.id, -1) + e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + e.closeCh = make(chan struct{}) e.finished.Store(false) e.joinWorkerWaitGroup = sync.WaitGroup{} @@ -677,6 +682,8 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx) e.rowContainer.GetMemTracker().AttachTo(e.memTracker) e.rowContainer.GetMemTracker().SetLabel(buildSideResultLabel) + e.rowContainer.GetDiskTracker().AttachTo(e.diskTracker) + e.rowContainer.GetDiskTracker().SetLabel(buildSideResultLabel) if config.GetGlobalConfig().OOMUseTmpStorage { actionSpill := e.rowContainer.ActionSpill() e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill) diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index bf3f591dd8c63..b4663605fd0c4 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -94,7 +94,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) { rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1") c.Assert(len(rs.Rows()), Equals, 10) for _, row := range rs.Rows() { - c.Assert(len(row), Equals, 6) + c.Assert(len(row), Equals, 7) execInfo := row[4].(string) c.Assert(strings.Contains(execInfo, "time"), Equals, true) c.Assert(strings.Contains(execInfo, "loops"), Equals, true) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 3687edab2d0ac..8436eeda994af 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -675,7 +675,7 @@ func (e *Explain) prepareSchema() error { case format == ast.ExplainFormatROW && !e.Analyze: fieldNames = []string{"id", "count", "task", "operator info"} case format == ast.ExplainFormatROW && e.Analyze: - fieldNames = []string{"id", "count", "task", "operator info", "execution info", "memory"} + fieldNames = []string{"id", "count", "task", "operator info", "execution info", "memory", "disk"} case format == ast.ExplainFormatDOT: fieldNames = []string{"dot contents"} default: @@ -815,9 +815,16 @@ func (e *Explain) prepareOperatorInfo(p Plan, taskType string, indent string, is } row = append(row, analyzeInfo) - tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) - if tracker != nil { - row = append(row, tracker.BytesToString(tracker.MaxConsumed())) + memTracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) + if memTracker != nil { + row = append(row, memTracker.BytesToString(memTracker.MaxConsumed())) + } else { + row = append(row, "N/A") + } + + diskTracker := e.ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTracker(p.ExplainID().String()) + if diskTracker != nil { + row = append(row, diskTracker.BytesToString(diskTracker.MaxConsumed())) } else { row = append(row, "N/A") } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index b3b9807ec98d4..f7074ad392ced 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -126,6 +127,7 @@ type StatementContext struct { Priority mysql.PriorityEnum NotFillCache bool MemTracker *memory.Tracker + DiskTracker *disk.Tracker RuntimeStatsColl *execdetails.RuntimeStatsColl TableIDs []int64 IndexNames []string diff --git a/util/mock/context.go b/util/mock/context.go index de79e16dcc92c..41f5625cca6cd 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sqlexec" @@ -267,6 +268,7 @@ func NewContext() *Context { sctx.sessionVars.MaxChunkSize = 32 sctx.sessionVars.StmtCtx.TimeZone = time.UTC sctx.sessionVars.StmtCtx.MemTracker = memory.NewTracker(stringutil.StringerStr("mock.NewContext"), -1) + sctx.sessionVars.StmtCtx.DiskTracker = disk.NewTracker(stringutil.StringerStr("mock.NewContext"), -1) sctx.sessionVars.GlobalVarsAccessor = variable.NewMockGlobalAccessor() if err := sctx.GetSessionVars().SetSystemVar(variable.MaxAllowedPacket, "67108864"); err != nil { panic(err)