From ca6457ca230512ba3fc2a6ad45920267ed9f2474 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 18 Jul 2019 19:49:57 +0800 Subject: [PATCH 01/13] show memory in explain analyze --- executor/distsql.go | 8 ++------ executor/index_lookup_join.go | 1 - executor/join.go | 2 -- executor/merge_join.go | 1 - executor/sort.go | 1 - executor/table_reader.go | 5 +---- planner/core/common_plans.go | 9 ++++++++- util/memory/tracker.go | 32 ++++++++++++++++++++++---------- 8 files changed, 33 insertions(+), 26 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index fcfbc110fcad3..8b521248b8357 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -261,8 +261,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { return e.open(ctx, kvRanges) } -var indexReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("IndexReaderDistSQLTracker") - func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { @@ -284,7 +282,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, indexReaderDistSQLTrackerLabel). + SetMemTracker(e.ctx, e.id). Build() if err != nil { e.feedback.Invalidate() @@ -471,8 +469,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k return nil } -var tableWorkerLabel fmt.Stringer = stringutil.StringerStr("tableWorker") - // startTableWorker launchs some background goroutines which pick tasks from workCh and execute the task. func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) { lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency @@ -486,7 +482,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha keepOrder: e.keepOrder, handleIdx: e.handleIdx, isCheckOp: e.isCheckOp, - memTracker: memory.NewTracker(tableWorkerLabel, -1), + memTracker: memory.NewTracker(stringutil.StringerStr(fmt.Sprintf("worker_%v", i)), -1), } worker.memTracker.AttachTo(e.memTracker) ctx1, cancel := context.WithCancel(ctx) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 7f90e85303fe5..3ea1f62e7d6d3 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -650,7 +650,6 @@ func (e *IndexLookUpJoin) Close() error { e.cancelFunc() } e.workerWg.Wait() - e.memTracker.Detach() e.memTracker = nil return e.children[0].Close() } diff --git a/executor/join.go b/executor/join.go index 12cd377271ade..d6f8e9042ca4a 100644 --- a/executor/join.go +++ b/executor/join.go @@ -134,7 +134,6 @@ func (e *HashJoinExec) Close() error { e.outerChkResourceCh = nil e.joinChkResourceCh = nil } - e.memTracker.Detach() e.memTracker = nil err := e.baseExecutor.Close() @@ -633,7 +632,6 @@ type NestedLoopApplyExec struct { func (e *NestedLoopApplyExec) Close() error { e.innerRows = nil - e.memTracker.Detach() e.memTracker = nil return e.outerExec.Close() } diff --git a/executor/merge_join.go b/executor/merge_join.go index 9a7a09912d440..f7415d6bf1444 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -200,7 +200,6 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Close implements the Executor Close interface. func (e *MergeJoinExec) Close() error { - e.memTracker.Detach() e.childrenResults = nil e.memTracker = nil diff --git a/executor/sort.go b/executor/sort.go index 4209dafd91d4c..aa895015b1eec 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -54,7 +54,6 @@ type SortExec struct { // Close implements the Executor Close interface. func (e *SortExec) Close() error { - e.memTracker.Detach() e.memTracker = nil return e.children[0].Close() } diff --git a/executor/table_reader.go b/executor/table_reader.go index 3f6bea659295a..26fba0b54f008 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" - "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -148,8 +147,6 @@ func (e *TableReaderExecutor) Close() error { return err } -var tableReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("TableReaderDistSQLTracker") - // buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { @@ -160,7 +157,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, tableReaderDistSQLTrackerLabel). + SetMemTracker(e.ctx, e.id). Build() if err != nil { return nil, err diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 96a33507287f2..17ce710ca3146 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -563,7 +563,7 @@ func (e *Explain) prepareSchema() error { case ast.ExplainFormatROW: retFields := []string{"id", "count", "task", "operator info"} if e.Analyze { - retFields = append(retFields, "execution info") + retFields = append(retFields, "execution info", "memory") } schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) for _, fieldName := range retFields { @@ -643,6 +643,13 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st } else { row = append(row, "time:0ns, loops:0, rows:0") } + + tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) + if tracker != nil { + row = append(row, tracker.BytesToString(tracker.MaxConsumed())) + } else { + row = append(row, "-") + } } e.Rows = append(e.Rows, row) } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index befbab5ce3e76..07b5f7d850140 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -144,17 +144,13 @@ func (t *Tracker) Consume(bytes int64) { rootExceed = tracker } - if tracker.parent == nil { - // since we only need a total memory usage during execution, - // we only record max consumed bytes in root(statement-level) for performance. - for { - maxNow := atomic.LoadInt64(&tracker.maxConsumed) - consumed := atomic.LoadInt64(&tracker.bytesConsumed) - if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) { - continue - } - break + for { + maxNow := atomic.LoadInt64(&tracker.maxConsumed) + consumed := atomic.LoadInt64(&tracker.bytesConsumed) + if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) { + continue } + break } } if rootExceed != nil { @@ -172,6 +168,21 @@ func (t *Tracker) MaxConsumed() int64 { return atomic.LoadInt64(&t.maxConsumed) } +// SearchTracker searches the specific tracker under this tracker. +func (t *Tracker) SearchTracker(label string) *Tracker { + if t.label.String() == label { + return t + } + t.mu.Lock() + defer t.mu.Unlock() + for _, child := range t.mu.children { + if result := child.SearchTracker(label); result != nil { + return result + } + } + return nil +} + // String returns the string representation of this Tracker tree. func (t *Tracker) String() string { buffer := bytes.NewBufferString("\n") @@ -185,6 +196,7 @@ func (t *Tracker) toString(indent string, buffer *bytes.Buffer) { fmt.Fprintf(buffer, "%s \"quota\": %s\n", indent, t.BytesToString(t.bytesLimit)) } fmt.Fprintf(buffer, "%s \"consumed\": %s\n", indent, t.BytesToString(t.BytesConsumed())) + fmt.Fprintf(buffer, "%s \"max-consumed\": %s\n", indent, t.BytesToString(t.MaxConsumed())) t.mu.Lock() for i := range t.mu.children { From 37d1be2a660805be1af71014fa38df6d6ff47990 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 18 Jul 2019 20:03:06 +0800 Subject: [PATCH 02/13] format --- util/memory/tracker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 07b5f7d850140..5d44e7c3f3dce 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -212,17 +212,17 @@ func (t *Tracker) toString(indent string, buffer *bytes.Buffer) { func (t *Tracker) BytesToString(numBytes int64) string { GB := float64(numBytes) / float64(1<<30) if GB > 1 { - return fmt.Sprintf("%v GB", GB) + return fmt.Sprintf("%.3f GB", GB) } MB := float64(numBytes) / float64(1<<20) if MB > 1 { - return fmt.Sprintf("%v MB", MB) + return fmt.Sprintf("%.3f MB", MB) } KB := float64(numBytes) / float64(1<<10) if KB > 1 { - return fmt.Sprintf("%v KB", KB) + return fmt.Sprintf("%.3f KB", KB) } return fmt.Sprintf("%v Bytes", numBytes) From ca0beecca30d92a8634f40ebd6aca8ca0465a500 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 18 Jul 2019 20:31:41 +0800 Subject: [PATCH 03/13] add UT --- executor/explain_test.go | 48 ++++++++++++++++++++++++++++++++++++++++ util/memory/tracker.go | 1 - 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/executor/explain_test.go b/executor/explain_test.go index c36179a283e52..87fd715e12f2e 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -19,6 +19,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/util/testkit" + "strings" ) func (s *testSuite1) TestExplainPriviliges(c *C) { @@ -74,3 +75,50 @@ func (s *testSuite1) TestExplainWrite(c *C) { tk.MustExec("explain analyze insert into t select 1") tk.MustQuery("select * from t order by a").Check(testkit.Rows("1", "2")) } + +func (s *testSuite1) TestExplainAnalyzeMemory(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (v int, k int, key(k))") + tk.MustExec("insert into t values (1, 1), (1, 1), (1, 1), (1, 1), (1, 1)") + + s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v") + s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v limit 5") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_HJ(t1, t2) */ t1.k from t t1, t t2 where t1.v = t2.v+1") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_SMJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k+1") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_INLJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k and t1.v=1") + s.checkMemoryInfo(c, tk, "explain analyze select sum(k) from t group by v") + s.checkMemoryInfo(c, tk, "explain analyze select sum(v) from t group by k") + s.checkMemoryInfo(c, tk, "explain analyze select * from t") + s.checkMemoryInfo(c, tk, "explain analyze select k from t use index(k)") + s.checkMemoryInfo(c, tk, "explain analyze select * from t use index(k)") +} + +func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) { + memCol := 5 + ops := []string{"Join", "MergeAgg", "Reader", "Top", "Sort"} + rows := tk.MustQuery(sql).Rows() + for _, row := range rows { + strs := make([]string, len(row)) + for i, c := range row { + strs[i] = c.(string) + } + if strings.Contains(strs[2], "cop") { + continue + } + + shouldHasMem := false + for _, op := range ops { + if strings.Contains(strs[0], op) { + shouldHasMem = true + break + } + } + + if shouldHasMem { + c.Assert(strs[memCol], Not(Equals), "-") + } else { + c.Assert(strs[memCol], Equals, "-") + } + } +} diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 5d44e7c3f3dce..0b3eba17bb815 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -196,7 +196,6 @@ func (t *Tracker) toString(indent string, buffer *bytes.Buffer) { fmt.Fprintf(buffer, "%s \"quota\": %s\n", indent, t.BytesToString(t.bytesLimit)) } fmt.Fprintf(buffer, "%s \"consumed\": %s\n", indent, t.BytesToString(t.BytesConsumed())) - fmt.Fprintf(buffer, "%s \"max-consumed\": %s\n", indent, t.BytesToString(t.MaxConsumed())) t.mu.Lock() for i := range t.mu.children { From 8d5f9c2f22fb7260bc4333b0c482247d6804238d Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 18 Jul 2019 20:34:25 +0800 Subject: [PATCH 04/13] update UT --- executor/explain_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/explain_test.go b/executor/explain_test.go index 87fd715e12f2e..75ec67e472d08 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -96,7 +96,7 @@ func (s *testSuite1) TestExplainAnalyzeMemory(c *C) { func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) { memCol := 5 - ops := []string{"Join", "MergeAgg", "Reader", "Top", "Sort"} + ops := []string{"Join", "Reader", "Top", "Sort"} rows := tk.MustQuery(sql).Rows() for _, row := range rows { strs := make([]string, len(row)) From 003b73823f3204e677a54fa3d19524d3b8c24b31 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 18 Jul 2019 20:48:28 +0800 Subject: [PATCH 05/13] fix UT --- planner/core/cbo_test.go | 4 ++-- util/memory/tracker.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index cdce14d6e8f73..222512c2419c0 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -80,7 +80,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) { rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1") c.Assert(len(rs.Rows()), Equals, 10) for _, row := range rs.Rows() { - c.Assert(len(row), Equals, 5) + c.Assert(len(row), Equals, 6) execInfo := row[4].(string) c.Assert(strings.Contains(execInfo, "time"), Equals, true) c.Assert(strings.Contains(execInfo, "loops"), Equals, true) @@ -977,7 +977,7 @@ func (s *testAnalyzeSuite) TestIssue9805(c *C) { c.Assert(rs.Rows(), HasLen, 10) hasIndexLookUp12 := false for _, row := range rs.Rows() { - c.Assert(row, HasLen, 5) + c.Assert(row, HasLen, 6) if strings.HasSuffix(row[0].(string), "IndexLookUp_12") { hasIndexLookUp12 = true c.Assert(row[4], Equals, "time:0ns, loops:0, rows:0") diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 0b3eba17bb815..56761f4f4c700 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -211,17 +211,17 @@ func (t *Tracker) toString(indent string, buffer *bytes.Buffer) { func (t *Tracker) BytesToString(numBytes int64) string { GB := float64(numBytes) / float64(1<<30) if GB > 1 { - return fmt.Sprintf("%.3f GB", GB) + return fmt.Sprintf("%v GB", GB) } MB := float64(numBytes) / float64(1<<20) if MB > 1 { - return fmt.Sprintf("%.3f MB", MB) + return fmt.Sprintf("%v MB", MB) } KB := float64(numBytes) / float64(1<<10) if KB > 1 { - return fmt.Sprintf("%.3f KB", KB) + return fmt.Sprintf("%v KB", KB) } return fmt.Sprintf("%v Bytes", numBytes) From bcd46138f6d8860349ba9df6c59d2078516d128a Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 18 Jul 2019 20:52:04 +0800 Subject: [PATCH 06/13] format --- executor/explain_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/explain_test.go b/executor/explain_test.go index 75ec67e472d08..ae00b402a4284 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -14,12 +14,13 @@ package executor_test import ( + "strings" + . "github.com/pingcap/check" "github.com/pingcap/parser/auth" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/util/testkit" - "strings" ) func (s *testSuite1) TestExplainPriviliges(c *C) { From 6f2c5a2ad12b35813f55046050457bc58ca5c4f2 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 19 Jul 2019 13:18:15 +0800 Subject: [PATCH 07/13] remove Tracker.Detach --- executor/distsql.go | 1 - executor/index_lookup_join.go | 3 --- util/memory/tracker.go | 5 ----- 3 files changed, 9 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 8b521248b8357..e20fb92c5fba9 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -527,7 +527,6 @@ func (e *IndexLookUpExecutor) Close() error { e.tblWorkerWg.Wait() e.finished = nil e.workerStarted = false - e.memTracker.Detach() e.memTracker = nil if e.runtimeStats != nil { copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String()) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 3ea1f62e7d6d3..5f596a69b168d 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -295,9 +295,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, nil } - if e.task != nil { - e.task.memTracker.Detach() - } e.task = task return task, nil } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 56761f4f4c700..3b935360f0fce 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -87,11 +87,6 @@ func (t *Tracker) AttachTo(parent *Tracker) { t.parent.Consume(t.BytesConsumed()) } -// Detach detaches this Tracker from its parent. -func (t *Tracker) Detach() { - t.parent.remove(t) -} - func (t *Tracker) remove(oldChild *Tracker) { t.mu.Lock() defer t.mu.Unlock() From 95c8e49a40296452108b726d85da61f56e0158df Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 19 Jul 2019 13:31:09 +0800 Subject: [PATCH 08/13] fixup --- distsql/distsql_test.go | 1 - distsql/request_builder.go | 8 ++------ executor/distsql.go | 9 +++++++-- executor/table_reader.go | 8 +++++++- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 470f2e78bab67..a4bc06661cca3 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -42,7 +42,6 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str SetDesc(false). SetKeepOrder(false). SetFromSessionVars(variable.NewSessionVars()). - SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")). Build() c.Assert(err, IsNil) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 9e1ef6c624f05..10c91e93598d1 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -14,12 +14,10 @@ package distsql import ( - "fmt" "math" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" @@ -44,10 +42,8 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { } // SetMemTracker sets a memTracker for this request. -func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label fmt.Stringer) *RequestBuilder { - t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL) - t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker) - builder.Request.MemTracker = t +func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBuilder { + builder.Request.MemTracker = tracker return builder } diff --git a/executor/distsql.go b/executor/distsql.go index e20fb92c5fba9..f2a6c393f2233 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -220,6 +220,8 @@ type IndexReaderExecutor struct { colLens []int plans []plannercore.PhysicalPlan + memTracker *memory.Tracker + selectResultHook // for testing } @@ -275,6 +277,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.dagPB.CollectExecutionSummaries = &collExec } + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -282,7 +285,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, e.id). + SetMemTracker(e.memTracker). Build() if err != nil { e.feedback.Invalidate() @@ -413,6 +416,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k e.dagPB.CollectExecutionSummaries = &collExec } + tracker := memory.NewTracker(stringutil.StringerStr("InnerWorker"), e.ctx.GetSessionVars().MemQuotaDistSQL) + tracker.AttachTo(e.memTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -420,7 +425,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k SetKeepOrder(e.keepOrder). SetStreaming(e.indexStreaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, indexLookupDistSQLTrackerLabel). + SetMemTracker(tracker). Build() if err != nil { return err diff --git a/executor/table_reader.go b/executor/table_reader.go index 26fba0b54f008..8f80c0bced49c 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/distsql" @@ -71,11 +72,16 @@ type TableReaderExecutor struct { corColInAccess bool plans []plannercore.PhysicalPlan + memTracker *memory.Tracker + selectResultHook // for testing } // Open initialzes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + var err error if e.corColInFilter { e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) @@ -157,7 +163,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, e.id). + SetMemTracker(e.memTracker). Build() if err != nil { return nil, err From ce45ed5b8113467a394c9d56fea974877efc2022 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 19 Jul 2019 14:00:36 +0800 Subject: [PATCH 09/13] fix UT --- executor/distsql.go | 1 + executor/explain_test.go | 4 ++-- executor/table_reader.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index f2a6c393f2233..29aa9657e6b75 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -278,6 +278,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). diff --git a/executor/explain_test.go b/executor/explain_test.go index ae00b402a4284..277cf3df96af5 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -15,7 +15,7 @@ package executor_test import ( "strings" - + . "github.com/pingcap/check" "github.com/pingcap/parser/auth" plannercore "github.com/pingcap/tidb/planner/core" @@ -97,7 +97,7 @@ func (s *testSuite1) TestExplainAnalyzeMemory(c *C) { func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) { memCol := 5 - ops := []string{"Join", "Reader", "Top", "Sort"} + ops := []string{"Join", "Reader", "Top", "Sort", "LookUp"} rows := tk.MustQuery(sql).Rows() for _, row := range rows { strs := make([]string, len(row)) diff --git a/executor/table_reader.go b/executor/table_reader.go index 8f80c0bced49c..019957e678247 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -16,7 +16,6 @@ package executor import ( "context" "fmt" - "github.com/pingcap/tidb/util/memory" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/distsql" @@ -27,6 +26,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tipb/go-tipb" ) From 61e9f2b95e988e8f6f1d3fe925ba620544b7613a Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 19 Jul 2019 14:29:03 +0800 Subject: [PATCH 10/13] fix CI --- distsql/distsql_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index a4bc06661cca3..4a145717d0fce 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -42,6 +43,8 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str SetDesc(false). SetKeepOrder(false). SetFromSessionVars(variable.NewSessionVars()). + SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), + s.sctx.GetSessionVars().MemQuotaDistSQL)). Build() c.Assert(err, IsNil) From e7288f4d32c8744e1b765a2d26544019d8d0c381 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 19 Jul 2019 15:46:50 +0800 Subject: [PATCH 11/13] address comments --- executor/distsql.go | 5 +++-- executor/explain_test.go | 4 ++-- planner/core/common_plans.go | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 29aa9657e6b75..2f09acddf19ca 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -417,7 +417,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k e.dagPB.CollectExecutionSummaries = &collExec } - tracker := memory.NewTracker(stringutil.StringerStr("InnerWorker"), e.ctx.GetSessionVars().MemQuotaDistSQL) + tracker := memory.NewTracker(stringutil.StringerStr("InnerWorker"), e.ctx.GetSessionVars().MemQuotaIndexLookupReader) tracker.AttachTo(e.memTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). @@ -488,7 +488,8 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha keepOrder: e.keepOrder, handleIdx: e.handleIdx, isCheckOp: e.isCheckOp, - memTracker: memory.NewTracker(stringutil.StringerStr(fmt.Sprintf("worker_%v", i)), -1), + memTracker: memory.NewTracker(stringutil.StringerStr(fmt.Sprintf("worker_%v", i)), + e.ctx.GetSessionVars().MemQuotaIndexLookupReader), } worker.memTracker.AttachTo(e.memTracker) ctx1, cancel := context.WithCancel(ctx) diff --git a/executor/explain_test.go b/executor/explain_test.go index 277cf3df96af5..47c634f3f454e 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -117,9 +117,9 @@ func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) { } if shouldHasMem { - c.Assert(strs[memCol], Not(Equals), "-") + c.Assert(strs[memCol], Not(Equals), "N/A") } else { - c.Assert(strs[memCol], Equals, "-") + c.Assert(strs[memCol], Equals, "N/A") } } } diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 17ce710ca3146..c43f03bf67b50 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -648,7 +648,7 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st if tracker != nil { row = append(row, tracker.BytesToString(tracker.MaxConsumed())) } else { - row = append(row, "-") + row = append(row, "N/A") } } e.Rows = append(e.Rows, row) From 13efd9243360788e7cb0be516e7d6ca743e50ef5 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Tue, 23 Jul 2019 21:02:01 +0800 Subject: [PATCH 12/13] rename tacker name --- executor/distsql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 2f09acddf19ca..e00b3f7177cda 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -417,7 +417,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k e.dagPB.CollectExecutionSummaries = &collExec } - tracker := memory.NewTracker(stringutil.StringerStr("InnerWorker"), e.ctx.GetSessionVars().MemQuotaIndexLookupReader) + tracker := memory.NewTracker(stringutil.StringerStr("IndexWorker"), e.ctx.GetSessionVars().MemQuotaIndexLookupReader) tracker.AttachTo(e.memTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). @@ -488,7 +488,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha keepOrder: e.keepOrder, handleIdx: e.handleIdx, isCheckOp: e.isCheckOp, - memTracker: memory.NewTracker(stringutil.StringerStr(fmt.Sprintf("worker_%v", i)), + memTracker: memory.NewTracker(stringutil.StringerStr(fmt.Sprintf("TableWorker_%v", i)), e.ctx.GetSessionVars().MemQuotaIndexLookupReader), } worker.memTracker.AttachTo(e.memTracker) From f276b088b23c7785efe96d09d5930c07d1c5d178 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Tue, 23 Jul 2019 21:04:01 +0800 Subject: [PATCH 13/13] address comments --- executor/distsql.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/distsql.go b/executor/distsql.go index e00b3f7177cda..c4cf91124d696 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -19,6 +19,7 @@ import ( "math" "runtime" "sort" + "strconv" "sync" "sync/atomic" "time" @@ -488,7 +489,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha keepOrder: e.keepOrder, handleIdx: e.handleIdx, isCheckOp: e.isCheckOp, - memTracker: memory.NewTracker(stringutil.StringerStr(fmt.Sprintf("TableWorker_%v", i)), + memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }), e.ctx.GetSessionVars().MemQuotaIndexLookupReader), } worker.memTracker.AttachTo(e.memTracker)