Skip to content

Commit

Permalink
executor: show operators' disk consumption in results of `EXPLAIN ANA…
Browse files Browse the repository at this point in the history
…LYZE` (pingcap#13764)
  • Loading branch information
Reminiscent authored and XiaTianliang committed Dec 21, 2019
1 parent 5758a36 commit ec93466
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 14 deletions.
4 changes: 3 additions & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -55,7 +56,8 @@ type testSuite struct {
func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL),
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL),
DiskTracker: disk.NewTracker(stringutil.StringerStr("testSuite"), -1),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
Expand Down
5 changes: 5 additions & 0 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
Expand Down Expand Up @@ -558,6 +559,7 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().IndexLookupJoinConcurrency = 4
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}}
tc.cols = cols
Expand Down Expand Up @@ -603,7 +605,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), memLimit)
t.SetActionOnExceed(nil)
t2 := disk.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), -1)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
e.ctx.GetSessionVars().StmtCtx.DiskTracker = t2
return e
}

Expand Down Expand Up @@ -865,6 +869,7 @@ func defaultIndexJoinTestCase() *indexJoinTestCase {
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
tc := &indexJoinTestCase{
outerRows: 100000,
innerRows: variable.DefMaxChunkSize * 100,
Expand Down
8 changes: 5 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -1493,9 +1494,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
memQuota = stmtHints.MemQuotaQuery
}
sc := &stmtctx.StatementContext{
StmtHints: stmtHints,
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota),
StmtHints: stmtHints,
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota),
DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1),
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down
2 changes: 2 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
)
Expand Down Expand Up @@ -206,6 +207,7 @@ func defaultCtx() sessionctx.Context {
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, ctx.GetSessionVars().MemQuotaQuery)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().SnapshotTS = uint64(1)
return ctx
}
Expand Down
14 changes: 9 additions & 5 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -88,7 +89,8 @@ type hashRowContainer struct {
// memTracker is the reference of records.GetMemTracker().
// records would be set to nil for garbage collection when spilling is activated
// so we need this reference.
memTracker *memory.Tracker
memTracker *memory.Tracker
diskTracker *disk.Tracker

// records stores the chunks in memory.
records *chunk.List
Expand Down Expand Up @@ -122,9 +124,10 @@ func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContex
sc: sCtx.GetSessionVars().StmtCtx,
hCtx: hCtx,

hashTable: newRowHashMap(estCount),
memTracker: initList.GetMemTracker(),
records: initList,
hashTable: newRowHashMap(estCount),
memTracker: initList.GetMemTracker(),
diskTracker: disk.NewTracker(stringutil.StringerStr("hashRowContainer"), -1),
records: initList,
}

return c
Expand Down Expand Up @@ -174,6 +177,7 @@ func (c *hashRowContainer) matchJoinKey(buildRow, probeRow chunk.Row, probeHCtx
func (c *hashRowContainer) spillToDisk() (err error) {
N := c.records.NumChunks()
c.recordsInDisk = chunk.NewListInDisk(c.hCtx.allTypes)
c.recordsInDisk.GetDiskTracker().AttachTo(c.diskTracker)
for i := 0; i < N; i++ {
chk := c.records.GetChunk(i)
err = c.recordsInDisk.Add(chk)
Expand Down Expand Up @@ -271,7 +275,7 @@ func (c *hashRowContainer) Close() error {
func (c *hashRowContainer) GetMemTracker() *memory.Tracker { return c.memTracker }

// GetDiskTracker returns the underlying disk usage tracker in hashRowContainer.
func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.recordsInDisk.GetDiskTracker() }
func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.diskTracker }

// ActionSpill returns a memory.ActionOnExceed for spilling over to disk.
func (c *hashRowContainer) ActionSpill() memory.ActionOnExceed {
Expand Down
7 changes: 7 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util/bitmap"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
)
Expand Down Expand Up @@ -70,6 +71,7 @@ type HashJoinExec struct {
joinResultCh chan *hashjoinWorkerResult

memTracker *memory.Tracker // track memory usage.
diskTracker *disk.Tracker // track disk usage.
prepared bool
isOuterJoin bool

Expand Down Expand Up @@ -145,6 +147,9 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

e.diskTracker = disk.NewTracker(e.id, -1)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)

e.closeCh = make(chan struct{})
e.finished.Store(false)
e.joinWorkerWaitGroup = sync.WaitGroup{}
Expand Down Expand Up @@ -677,6 +682,8 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx)
e.rowContainer.GetMemTracker().AttachTo(e.memTracker)
e.rowContainer.GetMemTracker().SetLabel(buildSideResultLabel)
e.rowContainer.GetDiskTracker().AttachTo(e.diskTracker)
e.rowContainer.GetDiskTracker().SetLabel(buildSideResultLabel)
if config.GetGlobalConfig().OOMUseTmpStorage {
actionSpill := e.rowContainer.ActionSpill()
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) {
rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1")
c.Assert(len(rs.Rows()), Equals, 10)
for _, row := range rs.Rows() {
c.Assert(len(row), Equals, 6)
c.Assert(len(row), Equals, 7)
execInfo := row[4].(string)
c.Assert(strings.Contains(execInfo, "time"), Equals, true)
c.Assert(strings.Contains(execInfo, "loops"), Equals, true)
Expand Down
15 changes: 11 additions & 4 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (e *Explain) prepareSchema() error {
case format == ast.ExplainFormatROW && !e.Analyze:
fieldNames = []string{"id", "count", "task", "operator info"}
case format == ast.ExplainFormatROW && e.Analyze:
fieldNames = []string{"id", "count", "task", "operator info", "execution info", "memory"}
fieldNames = []string{"id", "count", "task", "operator info", "execution info", "memory", "disk"}
case format == ast.ExplainFormatDOT:
fieldNames = []string{"dot contents"}
default:
Expand Down Expand Up @@ -815,9 +815,16 @@ func (e *Explain) prepareOperatorInfo(p Plan, taskType string, indent string, is
}
row = append(row, analyzeInfo)

tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if tracker != nil {
row = append(row, tracker.BytesToString(tracker.MaxConsumed()))
memTracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if memTracker != nil {
row = append(row, memTracker.BytesToString(memTracker.MaxConsumed()))
} else {
row = append(row, "N/A")
}

diskTracker := e.ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTracker(p.ExplainID().String())
if diskTracker != nil {
row = append(row, diskTracker.BytesToString(diskTracker.MaxConsumed()))
} else {
row = append(row, "N/A")
}
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -126,6 +127,7 @@ type StatementContext struct {
Priority mysql.PriorityEnum
NotFillCache bool
MemTracker *memory.Tracker
DiskTracker *disk.Tracker
RuntimeStatsColl *execdetails.RuntimeStatsColl
TableIDs []int64
IndexNames []string
Expand Down
2 changes: 2 additions & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -267,6 +268,7 @@ func NewContext() *Context {
sctx.sessionVars.MaxChunkSize = 32
sctx.sessionVars.StmtCtx.TimeZone = time.UTC
sctx.sessionVars.StmtCtx.MemTracker = memory.NewTracker(stringutil.StringerStr("mock.NewContext"), -1)
sctx.sessionVars.StmtCtx.DiskTracker = disk.NewTracker(stringutil.StringerStr("mock.NewContext"), -1)
sctx.sessionVars.GlobalVarsAccessor = variable.NewMockGlobalAccessor()
if err := sctx.GetSessionVars().SetSystemVar(variable.MaxAllowedPacket, "67108864"); err != nil {
panic(err)
Expand Down

0 comments on commit ec93466

Please sign in to comment.