From 73c01b540863f0e3d8263a80ac601775889c3758 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Wed, 28 Mar 2018 14:32:45 +0800 Subject: [PATCH 1/5] executor: track memory usage of index lookup join --- executor/index_lookup_join.go | 61 +++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 7ada7c0d6273d..edd5de4ded223 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -14,6 +14,7 @@ package executor import ( + "fmt" "runtime" "sort" "sync" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" "github.com/pingcap/tidb/util/ranger" log "github.com/sirupsen/logrus" @@ -63,6 +65,8 @@ type IndexLookUpJoin struct { indexRanges []*ranger.NewRange keyOff2IdxOff []int innerPtrBytes [][]byte + + memTracker *memory.Tracker // track memory usage. } type outerCtx struct { @@ -88,6 +92,8 @@ type lookUpJoinTask struct { doneCh chan error cursor int + + memTracker *memory.Tracker // track memory usage. } type outerWorker struct { @@ -103,6 +109,8 @@ type outerWorker struct { resultCh chan<- *lookUpJoinTask innerCh chan<- *lookUpJoinTask + + parentMemTracker *memory.Tracker // only pass it. } type innerWorker struct { @@ -123,6 +131,8 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { if err != nil { return errors.Trace(err) } + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaHashJoin) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) e.startWorkers(ctx) return nil @@ -145,14 +155,15 @@ func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask) *outerWorker { ow := &outerWorker{ - outerCtx: e.outerCtx, - ctx: e.ctx, - executor: e.children[0], - executorChk: chunk.NewChunk(e.outerCtx.rowTypes), - resultCh: resultCh, - innerCh: innerCh, - batchSize: 32, - maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, + outerCtx: e.outerCtx, + ctx: e.ctx, + executor: e.children[0], + executorChk: chunk.NewChunk(e.outerCtx.rowTypes), + resultCh: resultCh, + innerCh: innerCh, + batchSize: 32, + maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, + parentMemTracker: e.memTracker, } return ow } @@ -216,6 +227,12 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, if task != nil && task.cursor < task.outerResult.NumRows() { return task, nil } + if task != nil { + // TODO: jianzhang.zj detach this memory tracker after PR: + // https://github.com/pingcap/tidb/pull/6148 merged. + // + // task.memTracker.Detach() + } select { case task = <-e.resultCh: case <-ctx.Done(): @@ -241,11 +258,15 @@ func (e *IndexLookUpJoin) lookUpMatchedInners(task *lookUpJoinTask, rowIdx int) outerKey := task.encodedLookUpKeys.GetRow(rowIdx).GetBytes(0) e.innerPtrBytes = task.lookupMap.Get(outerKey, e.innerPtrBytes[:0]) task.matchedInners = task.matchedInners[:0] + + oldMemUsage := int64(cap(task.matchedInners)) * int64(unsafe.Sizeof(chunk.Row{})) for _, b := range e.innerPtrBytes { ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0])) matchedInner := task.innerResult.GetRow(ptr) task.matchedInners = append(task.matchedInners, matchedInner) } + newMemUsage := int64(cap(task.matchedInners)) * int64(unsafe.Sizeof(chunk.Row{})) + task.memTracker.Consume(newMemUsage - oldMemUsage) } func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { @@ -297,11 +318,16 @@ func (ow *outerWorker) pushToChan(ctx context.Context, task *lookUpJoinTask, dst // When err is not nil, task must not be nil to send the error to the main thread via task. func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { ow.executor.newChunk() - task := new(lookUpJoinTask) - task.doneCh = make(chan error, 1) - task.outerResult = ow.executor.newChunk() - task.encodedLookUpKeys = chunk.NewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}) - task.lookupMap = mvmap.NewMVMap() + + task := &lookUpJoinTask{ + doneCh: make(chan error, 1), + outerResult: ow.executor.newChunk(), + encodedLookUpKeys: chunk.NewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}), + lookupMap: mvmap.NewMVMap(), + } + task.memTracker = memory.NewTracker(fmt.Sprintf("lookup join task %p", task), -1) + task.memTracker.AttachTo(ow.parentMemTracker) + ow.increaseBatchSize() for task.outerResult.NumRows() < ow.batchSize { @@ -312,7 +338,11 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { if ow.executorChk.NumRows() == 0 { break } + + oldMemUsage := task.outerResult.MemoryUsage() task.outerResult.Append(ow.executorChk, 0, ow.executorChk.NumRows()) + newMemUsage := task.outerResult.MemoryUsage() + task.memTracker.Consume(newMemUsage - oldMemUsage) } if task.outerResult.NumRows() == 0 { return nil, nil @@ -325,6 +355,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { if err != nil { return task, errors.Trace(err) } + task.memTracker.Consume(int64(cap(task.outerMatch))) } return task, nil } @@ -409,6 +440,8 @@ func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types task.encodedLookUpKeys.AppendBytes(0, keyBuf) dLookUpKeys = append(dLookUpKeys, dLookUpKey) } + + task.memTracker.Consume(task.encodedLookUpKeys.MemoryUsage()) return dLookUpKeys, nil } @@ -481,6 +514,8 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa } defer terror.Call(innerExec.Close) innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize) + innerResult.GetMemTracker().SetLabel("inner result") + innerResult.GetMemTracker().AttachTo(task.memTracker) for { err := innerExec.NextChunk(ctx, iw.executorChk) if err != nil { From cdac75709e3f043470ad09f32bb296498c47fbb6 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Wed, 28 Mar 2018 17:04:19 +0800 Subject: [PATCH 2/5] address comment --- executor/index_lookup_join.go | 13 ++++++------- sessionctx/variable/session.go | 5 +++++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 3 +++ 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index edd5de4ded223..c8bc708094ea9 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -110,7 +110,7 @@ type outerWorker struct { resultCh chan<- *lookUpJoinTask innerCh chan<- *lookUpJoinTask - parentMemTracker *memory.Tracker // only pass it. + parentMemTracker *memory.Tracker } type innerWorker struct { @@ -131,7 +131,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { if err != nil { return errors.Trace(err) } - e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaHashJoin) + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupJoin) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) e.startWorkers(ctx) @@ -228,10 +228,7 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return task, nil } if task != nil { - // TODO: jianzhang.zj detach this memory tracker after PR: - // https://github.com/pingcap/tidb/pull/6148 merged. - // - // task.memTracker.Detach() + task.memTracker.Detach() } select { case task = <-e.resultCh: @@ -561,5 +558,7 @@ func (e *IndexLookUpJoin) Close() error { e.cancelFunc() } e.workerWg.Wait() - return e.children[0].Close() + e.memTracker.Detach() + e.memTracker = nil + return errors.Trace(e.children[0].Close()) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cfac03a7ab2c9..e77c4fa4a327b 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -289,6 +289,8 @@ type SessionVars struct { MemQuotaTopn int64 // MemQuotaIndexLookupReader defines the memory quota for a index lookup reader executor. MemQuotaIndexLookupReader int64 + // MemQuotaIndexLookupJoin defines the memory quota for a index lookup join executor. + MemQuotaIndexLookupJoin int64 // EnableStreaming indicates whether the coprocessor request can use streaming API. // TODO: remove this after tidb-server configuration "enable-streaming' removed. @@ -324,6 +326,7 @@ func NewSessionVars() *SessionVars { MemQuotaSort: DefTiDBMemQuotaSort, MemQuotaTopn: DefTiDBMemQuotaTopn, MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader, + MemQuotaIndexLookupJoin: DefTiDBMemQuotaIndexLookupJoin, } var enableStreaming string if config.GetGlobalConfig().EnableStreaming { @@ -504,6 +507,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.MemQuotaTopn = tidbOptInt64(val, DefTiDBMemQuotaTopn) case TIDBMemQuotaIndexLookupReader: s.MemQuotaIndexLookupReader = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupReader) + case TIDBMemQuotaIndexLookupJoin: + s.MemQuotaIndexLookupJoin = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupJoin) case TiDBGeneralLog: atomic.StoreUint32(&ProcessGeneralLog, uint32(tidbOptPositiveInt(val, DefTiDBGeneralLog))) case TiDBEnableStreaming: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index c5fc184904bae..087e1b23ed835 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -627,6 +627,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TIDBMemQuotaSort, strconv.FormatInt(DefTiDBMemQuotaSort, 10)}, {ScopeSession, TIDBMemQuotaTopn, strconv.FormatInt(DefTiDBMemQuotaTopn, 10)}, {ScopeSession, TIDBMemQuotaIndexLookupReader, strconv.FormatInt(DefTiDBMemQuotaIndexLookupReader, 10)}, + {ScopeSession, TIDBMemQuotaIndexLookupJoin, strconv.FormatInt(DefTiDBMemQuotaIndexLookupJoin, 10)}, {ScopeSession, TiDBEnableStreaming, "0"}, /* The following variable is defined as session scope but is actually server scope. */ {ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 0c3996d151d19..1d020782d3aa5 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -119,11 +119,13 @@ const ( // "tidb_mem_quota_sort": control the memory quota of "SortExec". // "tidb_mem_quota_topn": control the memory quota of "TopNExec". // "tidb_mem_quota_indexlookupreader": control the memory quota of "IndexLookUpExecutor". + // "tidb_mem_quota_indexlookupjoin": control the memory quota of "IndexLookUpJoin". TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes. TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes. TIDBMemQuotaSort = "tidb_mem_quota_sort" // Bytes. TIDBMemQuotaTopn = "tidb_mem_quota_topn" // Bytes. TIDBMemQuotaIndexLookupReader = "tidb_mem_quota_indexlookupreader" // Bytes. + TIDBMemQuotaIndexLookupJoin = "tidb_mem_quota_indexlookupjoin" // Bytes. // tidb_general_log is used to log every query in the server in info level. TiDBGeneralLog = "tidb_general_log" @@ -154,6 +156,7 @@ const ( DefTiDBMemQuotaSort = 32 << 30 // 32GB. DefTiDBMemQuotaTopn = 32 << 30 // 32GB. DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB. + DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB. DefTiDBGeneralLog = 0 ) From 781d73f4b38f50b6c724ade8ec0975ab7fab1b98 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Wed, 28 Mar 2018 17:16:28 +0800 Subject: [PATCH 3/5] tiny refine --- executor/index_lookup_join.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index c8bc708094ea9..73a869520c31c 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -327,6 +327,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { ow.increaseBatchSize() + task.memTracker.Consume(task.outerResult.MemoryUsage()) for task.outerResult.NumRows() < ow.batchSize { err := ow.executor.NextChunk(ctx, ow.executorChk) if err != nil { @@ -354,6 +355,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } task.memTracker.Consume(int64(cap(task.outerMatch))) } + task.memTracker.Consume(int64(cap(task.matchedInners)) * int64(unsafe.Sizeof(chunk.Row{}))) return task, nil } From ab11086ffeabebdb871a4562af39072063780d73 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Wed, 28 Mar 2018 17:43:39 +0800 Subject: [PATCH 4/5] fix ci --- executor/index_lookup_join.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 73a869520c31c..3276a8e329107 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -227,9 +227,7 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, if task != nil && task.cursor < task.outerResult.NumRows() { return task, nil } - if task != nil { - task.memTracker.Detach() - } + select { case task = <-e.resultCh: case <-ctx.Done(): @@ -247,6 +245,10 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, case <-ctx.Done(): return nil, nil } + + if e.task != nil { + e.task.memTracker.Detach() + } e.task = task return task, nil } From 61ae85f2cedad491e036ac377cfa1678fcb4460d Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Thu, 29 Mar 2018 16:32:23 +0800 Subject: [PATCH 5/5] address comment --- executor/index_lookup_join.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 3276a8e329107..970ea14c4c3f3 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -258,14 +258,11 @@ func (e *IndexLookUpJoin) lookUpMatchedInners(task *lookUpJoinTask, rowIdx int) e.innerPtrBytes = task.lookupMap.Get(outerKey, e.innerPtrBytes[:0]) task.matchedInners = task.matchedInners[:0] - oldMemUsage := int64(cap(task.matchedInners)) * int64(unsafe.Sizeof(chunk.Row{})) for _, b := range e.innerPtrBytes { ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0])) matchedInner := task.innerResult.GetRow(ptr) task.matchedInners = append(task.matchedInners, matchedInner) } - newMemUsage := int64(cap(task.matchedInners)) * int64(unsafe.Sizeof(chunk.Row{})) - task.memTracker.Consume(newMemUsage - oldMemUsage) } func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { @@ -357,7 +354,6 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } task.memTracker.Consume(int64(cap(task.outerMatch))) } - task.memTracker.Consume(int64(cap(task.matchedInners)) * int64(unsafe.Sizeof(chunk.Row{}))) return task, nil }