From f3f6a5b450a088e9b3d8c8738d653f5f19feb58d Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Wed, 28 Mar 2018 16:25:03 +0800 Subject: [PATCH 1/2] executor: track memory usage for merge join --- executor/merge_join.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/executor/merge_join.go b/executor/merge_join.go index 05178ee7906af..8ebcfe8b3b81d 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" "golang.org/x/net/context" ) @@ -43,6 +44,8 @@ type MergeJoinExec struct { innerRows []chunk.Row innerIter4Row chunk.Iterator + + memTracker *memory.Tracker } type mergeJoinOuterTable struct { @@ -75,6 +78,8 @@ type mergeJoinInnerTable struct { curResultInUse bool resultQueue []*chunk.Chunk resourceQueue []*chunk.Chunk + + memTracker *memory.Tracker } func (t *mergeJoinInnerTable) init(ctx context.Context, chk4Reader *chunk.Chunk) (err error) { @@ -87,6 +92,7 @@ func (t *mergeJoinInnerTable) init(ctx context.Context, chk4Reader *chunk.Chunk) t.curRow = t.curIter.End() t.curResultInUse = false t.resultQueue = append(t.resultQueue, chk4Reader) + t.memTracker.Consume(chk4Reader.MemoryUsage()) t.firstRow4Key, err = t.nextRow() t.compareFuncs = make([]chunk.CompareFunc, 0, len(t.joinKeys)) for i := range t.joinKeys { @@ -125,12 +131,15 @@ func (t *mergeJoinInnerTable) rowsWithSameKey() ([]chunk.Row, error) { func (t *mergeJoinInnerTable) nextRow() (chunk.Row, error) { if t.curRow == t.curIter.End() { t.reallocReaderResult() + oldMemUsage := t.curResult.MemoryUsage() err := t.reader.NextChunk(t.ctx, t.curResult) // error happens or no more data. if err != nil || t.curResult.NumRows() == 0 { t.curRow = t.curIter.End() return t.curRow, errors.Trace(err) } + newMemUsage := t.curResult.MemoryUsage() + t.memTracker.Consume(newMemUsage - oldMemUsage) t.curRow = t.curIter.Begin() } result := t.curRow @@ -151,7 +160,9 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Create a new Chunk and append it to "resourceQueue" if there is no more // available chunk in "resourceQueue". if len(t.resourceQueue) == 0 { - t.resourceQueue = append(t.resourceQueue, t.reader.newChunk()) + newChunk := t.reader.newChunk() + t.memTracker.Consume(newChunk.MemoryUsage()) + t.resourceQueue = append(t.resourceQueue, newChunk) } // NOTE: "t.curResult" is always the last element of "resultQueue". @@ -165,10 +176,10 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Close implements the Executor Close interface. func (e *MergeJoinExec) Close() error { - if err := e.baseExecutor.Close(); err != nil { - return errors.Trace(err) - } - return nil + e.memTracker.Detach() + e.memTracker = nil + + return errors.Trace(e.baseExecutor.Close()) } // Open implements the Executor Open interface. @@ -176,7 +187,14 @@ func (e *MergeJoinExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } + e.prepared = false + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + + e.innerTable.memTracker = memory.NewTracker("innerTable", -1) + e.innerTable.memTracker.AttachTo(e.memTracker) + return nil } From 0ca81c780884d563d121b000628b051c0541fb38 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Fri, 30 Mar 2018 11:32:02 +0800 Subject: [PATCH 2/2] add session variable --- executor/merge_join.go | 2 +- sessionctx/variable/session.go | 5 +++++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 3 +++ 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/executor/merge_join.go b/executor/merge_join.go index 8ebcfe8b3b81d..f3ab0a5fed40a 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -189,7 +189,7 @@ func (e *MergeJoinExec) Open(ctx context.Context) error { } e.prepared = false - e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaMergeJoin) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerTable.memTracker = memory.NewTracker("innerTable", -1) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e77c4fa4a327b..fba74fac657ea 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -283,6 +283,8 @@ type SessionVars struct { MemQuotaQuery int64 // MemQuotaHashJoin defines the memory quota for a hash join executor. MemQuotaHashJoin int64 + // MemQuotaMergeJoin defines the memory quota for a merge join executor. + MemQuotaMergeJoin int64 // MemQuotaSort defines the memory quota for a sort executor. MemQuotaSort int64 // MemQuotaTopn defines the memory quota for a top n executor. @@ -323,6 +325,7 @@ func NewSessionVars() *SessionVars { DMLBatchSize: DefDMLBatchSize, MemQuotaQuery: DefTiDBMemQuotaQuery, MemQuotaHashJoin: DefTiDBMemQuotaHashJoin, + MemQuotaMergeJoin: DefTiDBMemQuotaMergeJoin, MemQuotaSort: DefTiDBMemQuotaSort, MemQuotaTopn: DefTiDBMemQuotaTopn, MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader, @@ -501,6 +504,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.MemQuotaQuery = tidbOptInt64(val, DefTiDBMemQuotaQuery) case TIDBMemQuotaHashJoin: s.MemQuotaHashJoin = tidbOptInt64(val, DefTiDBMemQuotaHashJoin) + case TIDBMemQuotaMergeJoin: + s.MemQuotaMergeJoin = tidbOptInt64(val, DefTiDBMemQuotaMergeJoin) case TIDBMemQuotaSort: s.MemQuotaSort = tidbOptInt64(val, DefTiDBMemQuotaSort) case TIDBMemQuotaTopn: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 087e1b23ed835..361b9c483cc2d 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -624,6 +624,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)}, {ScopeSession, TIDBMemQuotaQuery, strconv.FormatInt(DefTiDBMemQuotaQuery, 10)}, {ScopeSession, TIDBMemQuotaHashJoin, strconv.FormatInt(DefTiDBMemQuotaHashJoin, 10)}, + {ScopeSession, TIDBMemQuotaMergeJoin, strconv.FormatInt(DefTiDBMemQuotaMergeJoin, 10)}, {ScopeSession, TIDBMemQuotaSort, strconv.FormatInt(DefTiDBMemQuotaSort, 10)}, {ScopeSession, TIDBMemQuotaTopn, strconv.FormatInt(DefTiDBMemQuotaTopn, 10)}, {ScopeSession, TIDBMemQuotaIndexLookupReader, strconv.FormatInt(DefTiDBMemQuotaIndexLookupReader, 10)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 1d020782d3aa5..5bba9931742b2 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -116,12 +116,14 @@ const ( // The following session variables controls the memory quota during query execution. // "tidb_mem_quota_query": control the memory quota of a query. // "tidb_mem_quota_hashjoin": control the memory quota of "HashJoinExec". + // "tidb_mem_quota_mergejoin": control the memory quota of "MergeJoinExec". // "tidb_mem_quota_sort": control the memory quota of "SortExec". // "tidb_mem_quota_topn": control the memory quota of "TopNExec". // "tidb_mem_quota_indexlookupreader": control the memory quota of "IndexLookUpExecutor". // "tidb_mem_quota_indexlookupjoin": control the memory quota of "IndexLookUpJoin". TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes. TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes. + TIDBMemQuotaMergeJoin = "tidb_mem_quota_mergejoin" // Bytes. TIDBMemQuotaSort = "tidb_mem_quota_sort" // Bytes. TIDBMemQuotaTopn = "tidb_mem_quota_topn" // Bytes. TIDBMemQuotaIndexLookupReader = "tidb_mem_quota_indexlookupreader" // Bytes. @@ -153,6 +155,7 @@ const ( DefDMLBatchSize = 20000 DefTiDBMemQuotaQuery = 32 << 30 // 32GB. DefTiDBMemQuotaHashJoin = 32 << 30 // 32GB. + DefTiDBMemQuotaMergeJoin = 32 << 30 // 32GB. DefTiDBMemQuotaSort = 32 << 30 // 32GB. DefTiDBMemQuotaTopn = 32 << 30 // 32GB. DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB.