diff --git a/executor/builder.go b/executor/builder.go index 24f2e3e327357..d95f596070080 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -40,7 +40,6 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/admin" - "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" tipb "github.com/pingcap/tipb/go-tipb" "golang.org/x/net/context" @@ -970,10 +969,7 @@ func (b *executorBuilder) buildApply(apply *plan.PhysicalApply) *NestedLoopApply outer: v.JoinType != plan.InnerJoin, resultGenerator: generator, outerSchema: apply.OuterSchema, - outerChunk: outerExec.newChunk(), - innerChunk: innerExec.newChunk(), } - e.innerList = chunk.NewList(innerExec.retTypes(), e.maxChunkSize) metrics.ExecutorCounter.WithLabelValues("NestedLoopApplyExec").Inc() return e } diff --git a/executor/join.go b/executor/join.go index 12b8932ed9e65..e2ae056ed11e5 100644 --- a/executor/join.go +++ b/executor/join.go @@ -473,6 +473,39 @@ func (e *HashJoinExec) NextChunk(ctx context.Context, chk *chunk.Chunk) (err err return nil } +// buildHashTableForList builds hash table from `list`. +// key of hash table: hash value of key columns +// value of hash table: RowPtr of the corresponded row +func (e *HashJoinExec) buildHashTableForList() error { + e.hashTable = mvmap.NewMVMap() + e.innerKeyColIdx = make([]int, len(e.innerKeys)) + for i := range e.innerKeys { + e.innerKeyColIdx[i] = e.innerKeys[i].Index + } + var ( + hasNull bool + err error + keyBuf = make([]byte, 0, 64) + valBuf = make([]byte, 8) + ) + for i := 0; i < e.innerResult.NumChunks(); i++ { + chk := e.innerResult.GetChunk(i) + for j := 0; j < chk.NumRows(); j++ { + hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf) + if err != nil { + return errors.Trace(err) + } + if hasNull { + continue + } + rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} + *(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr + e.hashTable.Put(keyBuf, valBuf) + } + } + return nil +} + // NestedLoopApplyExec is the executor for apply. type NestedLoopApplyExec struct { baseExecutor @@ -498,21 +531,40 @@ type NestedLoopApplyExec struct { innerSelected []bool innerIter chunk.Iterator outerRow *chunk.Row + + memTracker *memory.Tracker // track memory usage. } // Close implements the Executor interface. func (e *NestedLoopApplyExec) Close() error { e.resultRows = nil e.innerRows = nil + + e.memTracker.Detach() + e.memTracker = nil return errors.Trace(e.outerExec.Close()) } // Open implements the Executor interface. func (e *NestedLoopApplyExec) Open(ctx context.Context) error { + err := e.outerExec.Open(ctx) + if err != nil { + return errors.Trace(err) + } e.cursor = 0 e.resultRows = e.resultRows[:0] e.innerRows = e.innerRows[:0] - return errors.Trace(e.outerExec.Open(ctx)) + e.outerChunk = e.outerExec.newChunk() + e.innerChunk = e.innerExec.newChunk() + e.innerList = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize) + + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaNestedLoopApply) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + + e.innerList.GetMemTracker().SetLabel("innerList") + e.innerList.GetMemTracker().AttachTo(e.memTracker) + + return nil } func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *chunk.Chunk) (*chunk.Row, error) { @@ -576,39 +628,6 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error { } } -// buildHashTableForList builds hash table from `list`. -// key of hash table: hash value of key columns -// value of hash table: RowPtr of the corresponded row -func (e *HashJoinExec) buildHashTableForList() error { - e.hashTable = mvmap.NewMVMap() - e.innerKeyColIdx = make([]int, len(e.innerKeys)) - for i := range e.innerKeys { - e.innerKeyColIdx[i] = e.innerKeys[i].Index - } - var ( - hasNull bool - err error - keyBuf = make([]byte, 0, 64) - valBuf = make([]byte, 8) - ) - for i := 0; i < e.innerResult.NumChunks(); i++ { - chk := e.innerResult.GetChunk(i) - for j := 0; j < chk.NumRows(); j++ { - hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf) - if err != nil { - return errors.Trace(err) - } - if hasNull { - continue - } - rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} - *(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr - e.hashTable.Put(keyBuf, valBuf) - } - } - return nil -} - // NextChunk implements the Executor interface. func (e *NestedLoopApplyExec) NextChunk(ctx context.Context, chk *chunk.Chunk) (err error) { chk.Reset() diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index fba74fac657ea..1c9d6fb7d77c0 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -293,6 +293,8 @@ type SessionVars struct { MemQuotaIndexLookupReader int64 // MemQuotaIndexLookupJoin defines the memory quota for a index lookup join executor. MemQuotaIndexLookupJoin int64 + // MemQuotaNestedLoopApply defines the memory quota for a nested loop apply executor. + MemQuotaNestedLoopApply int64 // EnableStreaming indicates whether the coprocessor request can use streaming API. // TODO: remove this after tidb-server configuration "enable-streaming' removed. @@ -330,6 +332,7 @@ func NewSessionVars() *SessionVars { MemQuotaTopn: DefTiDBMemQuotaTopn, MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader, MemQuotaIndexLookupJoin: DefTiDBMemQuotaIndexLookupJoin, + MemQuotaNestedLoopApply: DefTiDBMemQuotaNestedLoopApply, } var enableStreaming string if config.GetGlobalConfig().EnableStreaming { @@ -514,6 +517,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.MemQuotaIndexLookupReader = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupReader) case TIDBMemQuotaIndexLookupJoin: s.MemQuotaIndexLookupJoin = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupJoin) + case TIDBMemQuotaNestedLoopApply: + s.MemQuotaNestedLoopApply = tidbOptInt64(val, DefTiDBMemQuotaNestedLoopApply) case TiDBGeneralLog: atomic.StoreUint32(&ProcessGeneralLog, uint32(tidbOptPositiveInt(val, DefTiDBGeneralLog))) case TiDBEnableStreaming: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 361b9c483cc2d..c3cbf0916579f 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -629,6 +629,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TIDBMemQuotaTopn, strconv.FormatInt(DefTiDBMemQuotaTopn, 10)}, {ScopeSession, TIDBMemQuotaIndexLookupReader, strconv.FormatInt(DefTiDBMemQuotaIndexLookupReader, 10)}, {ScopeSession, TIDBMemQuotaIndexLookupJoin, strconv.FormatInt(DefTiDBMemQuotaIndexLookupJoin, 10)}, + {ScopeSession, TIDBMemQuotaNestedLoopApply, strconv.FormatInt(DefTiDBMemQuotaNestedLoopApply, 10)}, {ScopeSession, TiDBEnableStreaming, "0"}, /* The following variable is defined as session scope but is actually server scope. */ {ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 5bba9931742b2..3d4d6df29bd04 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -121,6 +121,7 @@ const ( // "tidb_mem_quota_topn": control the memory quota of "TopNExec". // "tidb_mem_quota_indexlookupreader": control the memory quota of "IndexLookUpExecutor". // "tidb_mem_quota_indexlookupjoin": control the memory quota of "IndexLookUpJoin". + // "tidb_mem_quota_nestedloopapply": control the memory quota of "NestedLoopApplyExec". TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes. TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes. TIDBMemQuotaMergeJoin = "tidb_mem_quota_mergejoin" // Bytes. @@ -128,6 +129,7 @@ const ( TIDBMemQuotaTopn = "tidb_mem_quota_topn" // Bytes. TIDBMemQuotaIndexLookupReader = "tidb_mem_quota_indexlookupreader" // Bytes. TIDBMemQuotaIndexLookupJoin = "tidb_mem_quota_indexlookupjoin" // Bytes. + TIDBMemQuotaNestedLoopApply = "tidb_mem_quota_nestedloopapply" // Bytes. // tidb_general_log is used to log every query in the server in info level. TiDBGeneralLog = "tidb_general_log" @@ -160,6 +162,7 @@ const ( DefTiDBMemQuotaTopn = 32 << 30 // 32GB. DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB. DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB. + DefTiDBMemQuotaNestedLoopApply = 32 << 30 // 32GB. DefTiDBGeneralLog = 0 )