Skip to content

Commit

Permalink
executor: track memory usage of index lookup join (#6169)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored Mar 30, 2018
1 parent 0d78547 commit 564e889
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 14 deletions.
62 changes: 48 additions & 14 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package executor

import (
"fmt"
"runtime"
"sort"
"sync"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mvmap"
"github.com/pingcap/tidb/util/ranger"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -63,6 +65,8 @@ type IndexLookUpJoin struct {
indexRanges []*ranger.NewRange
keyOff2IdxOff []int
innerPtrBytes [][]byte

memTracker *memory.Tracker // track memory usage.
}

type outerCtx struct {
Expand All @@ -88,6 +92,8 @@ type lookUpJoinTask struct {

doneCh chan error
cursor int

memTracker *memory.Tracker // track memory usage.
}

type outerWorker struct {
Expand All @@ -103,6 +109,8 @@ type outerWorker struct {

resultCh chan<- *lookUpJoinTask
innerCh chan<- *lookUpJoinTask

parentMemTracker *memory.Tracker
}

type innerWorker struct {
Expand All @@ -123,6 +131,8 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupJoin)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
e.startWorkers(ctx)
return nil
Expand All @@ -145,14 +155,15 @@ func (e *IndexLookUpJoin) startWorkers(ctx context.Context) {

func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask) *outerWorker {
ow := &outerWorker{
outerCtx: e.outerCtx,
ctx: e.ctx,
executor: e.children[0],
executorChk: chunk.NewChunk(e.outerCtx.rowTypes),
resultCh: resultCh,
innerCh: innerCh,
batchSize: 32,
maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize,
outerCtx: e.outerCtx,
ctx: e.ctx,
executor: e.children[0],
executorChk: chunk.NewChunk(e.outerCtx.rowTypes),
resultCh: resultCh,
innerCh: innerCh,
batchSize: 32,
maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize,
parentMemTracker: e.memTracker,
}
return ow
}
Expand Down Expand Up @@ -216,6 +227,7 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
if task != nil && task.cursor < task.outerResult.NumRows() {
return task, nil
}

select {
case task = <-e.resultCh:
case <-ctx.Done():
Expand All @@ -233,6 +245,10 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
case <-ctx.Done():
return nil, nil
}

if e.task != nil {
e.task.memTracker.Detach()
}
e.task = task
return task, nil
}
Expand All @@ -241,6 +257,7 @@ func (e *IndexLookUpJoin) lookUpMatchedInners(task *lookUpJoinTask, rowIdx int)
outerKey := task.encodedLookUpKeys.GetRow(rowIdx).GetBytes(0)
e.innerPtrBytes = task.lookupMap.Get(outerKey, e.innerPtrBytes[:0])
task.matchedInners = task.matchedInners[:0]

for _, b := range e.innerPtrBytes {
ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0]))
matchedInner := task.innerResult.GetRow(ptr)
Expand Down Expand Up @@ -297,13 +314,19 @@ func (ow *outerWorker) pushToChan(ctx context.Context, task *lookUpJoinTask, dst
// When err is not nil, task must not be nil to send the error to the main thread via task.
func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
ow.executor.newChunk()
task := new(lookUpJoinTask)
task.doneCh = make(chan error, 1)
task.outerResult = ow.executor.newChunk()
task.encodedLookUpKeys = chunk.NewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)})
task.lookupMap = mvmap.NewMVMap()

task := &lookUpJoinTask{
doneCh: make(chan error, 1),
outerResult: ow.executor.newChunk(),
encodedLookUpKeys: chunk.NewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}),
lookupMap: mvmap.NewMVMap(),
}
task.memTracker = memory.NewTracker(fmt.Sprintf("lookup join task %p", task), -1)
task.memTracker.AttachTo(ow.parentMemTracker)

ow.increaseBatchSize()

task.memTracker.Consume(task.outerResult.MemoryUsage())
for task.outerResult.NumRows() < ow.batchSize {
err := ow.executor.NextChunk(ctx, ow.executorChk)
if err != nil {
Expand All @@ -312,7 +335,11 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
if ow.executorChk.NumRows() == 0 {
break
}

oldMemUsage := task.outerResult.MemoryUsage()
task.outerResult.Append(ow.executorChk, 0, ow.executorChk.NumRows())
newMemUsage := task.outerResult.MemoryUsage()
task.memTracker.Consume(newMemUsage - oldMemUsage)
}
if task.outerResult.NumRows() == 0 {
return nil, nil
Expand All @@ -325,6 +352,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
if err != nil {
return task, errors.Trace(err)
}
task.memTracker.Consume(int64(cap(task.outerMatch)))
}
return task, nil
}
Expand Down Expand Up @@ -409,6 +437,8 @@ func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types
task.encodedLookUpKeys.AppendBytes(0, keyBuf)
dLookUpKeys = append(dLookUpKeys, dLookUpKey)
}

task.memTracker.Consume(task.encodedLookUpKeys.MemoryUsage())
return dLookUpKeys, nil
}

Expand Down Expand Up @@ -481,6 +511,8 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
}
defer terror.Call(innerExec.Close)
innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize)
innerResult.GetMemTracker().SetLabel("inner result")
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
err := innerExec.NextChunk(ctx, iw.executorChk)
if err != nil {
Expand Down Expand Up @@ -526,5 +558,7 @@ func (e *IndexLookUpJoin) Close() error {
e.cancelFunc()
}
e.workerWg.Wait()
return e.children[0].Close()
e.memTracker.Detach()
e.memTracker = nil
return errors.Trace(e.children[0].Close())
}
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ type SessionVars struct {
MemQuotaTopn int64
// MemQuotaIndexLookupReader defines the memory quota for a index lookup reader executor.
MemQuotaIndexLookupReader int64
// MemQuotaIndexLookupJoin defines the memory quota for a index lookup join executor.
MemQuotaIndexLookupJoin int64

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
Expand Down Expand Up @@ -324,6 +326,7 @@ func NewSessionVars() *SessionVars {
MemQuotaSort: DefTiDBMemQuotaSort,
MemQuotaTopn: DefTiDBMemQuotaTopn,
MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader,
MemQuotaIndexLookupJoin: DefTiDBMemQuotaIndexLookupJoin,
}
var enableStreaming string
if config.GetGlobalConfig().EnableStreaming {
Expand Down Expand Up @@ -504,6 +507,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.MemQuotaTopn = tidbOptInt64(val, DefTiDBMemQuotaTopn)
case TIDBMemQuotaIndexLookupReader:
s.MemQuotaIndexLookupReader = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupReader)
case TIDBMemQuotaIndexLookupJoin:
s.MemQuotaIndexLookupJoin = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupJoin)
case TiDBGeneralLog:
atomic.StoreUint32(&ProcessGeneralLog, uint32(tidbOptPositiveInt(val, DefTiDBGeneralLog)))
case TiDBEnableStreaming:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TIDBMemQuotaSort, strconv.FormatInt(DefTiDBMemQuotaSort, 10)},
{ScopeSession, TIDBMemQuotaTopn, strconv.FormatInt(DefTiDBMemQuotaTopn, 10)},
{ScopeSession, TIDBMemQuotaIndexLookupReader, strconv.FormatInt(DefTiDBMemQuotaIndexLookupReader, 10)},
{ScopeSession, TIDBMemQuotaIndexLookupJoin, strconv.FormatInt(DefTiDBMemQuotaIndexLookupJoin, 10)},
{ScopeSession, TiDBEnableStreaming, "0"},
/* The following variable is defined as session scope but is actually server scope. */
{ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)},
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ const (
// "tidb_mem_quota_sort": control the memory quota of "SortExec".
// "tidb_mem_quota_topn": control the memory quota of "TopNExec".
// "tidb_mem_quota_indexlookupreader": control the memory quota of "IndexLookUpExecutor".
// "tidb_mem_quota_indexlookupjoin": control the memory quota of "IndexLookUpJoin".
TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes.
TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes.
TIDBMemQuotaSort = "tidb_mem_quota_sort" // Bytes.
TIDBMemQuotaTopn = "tidb_mem_quota_topn" // Bytes.
TIDBMemQuotaIndexLookupReader = "tidb_mem_quota_indexlookupreader" // Bytes.
TIDBMemQuotaIndexLookupJoin = "tidb_mem_quota_indexlookupjoin" // Bytes.

// tidb_general_log is used to log every query in the server in info level.
TiDBGeneralLog = "tidb_general_log"
Expand Down Expand Up @@ -154,6 +156,7 @@ const (
DefTiDBMemQuotaSort = 32 << 30 // 32GB.
DefTiDBMemQuotaTopn = 32 << 30 // 32GB.
DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB.
DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
)

Expand Down

0 comments on commit 564e889

Please sign in to comment.