Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: track memory usage of index lookup join #6169

Merged
merged 8 commits into from
Mar 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 48 additions & 14 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package executor

import (
"fmt"
"runtime"
"sort"
"sync"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mvmap"
"github.com/pingcap/tidb/util/ranger"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -63,6 +65,8 @@ type IndexLookUpJoin struct {
indexRanges []*ranger.NewRange
keyOff2IdxOff []int
innerPtrBytes [][]byte

memTracker *memory.Tracker // track memory usage.
}

type outerCtx struct {
Expand All @@ -88,6 +92,8 @@ type lookUpJoinTask struct {

doneCh chan error
cursor int

memTracker *memory.Tracker // track memory usage.
}

type outerWorker struct {
Expand All @@ -103,6 +109,8 @@ type outerWorker struct {

resultCh chan<- *lookUpJoinTask
innerCh chan<- *lookUpJoinTask

parentMemTracker *memory.Tracker
}

type innerWorker struct {
Expand All @@ -123,6 +131,8 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupJoin)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
e.startWorkers(ctx)
return nil
Expand All @@ -145,14 +155,15 @@ func (e *IndexLookUpJoin) startWorkers(ctx context.Context) {

func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask) *outerWorker {
ow := &outerWorker{
outerCtx: e.outerCtx,
ctx: e.ctx,
executor: e.children[0],
executorChk: chunk.NewChunk(e.outerCtx.rowTypes),
resultCh: resultCh,
innerCh: innerCh,
batchSize: 32,
maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize,
outerCtx: e.outerCtx,
ctx: e.ctx,
executor: e.children[0],
executorChk: chunk.NewChunk(e.outerCtx.rowTypes),
resultCh: resultCh,
innerCh: innerCh,
batchSize: 32,
maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize,
parentMemTracker: e.memTracker,
}
return ow
}
Expand Down Expand Up @@ -216,6 +227,7 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
if task != nil && task.cursor < task.outerResult.NumRows() {
return task, nil
}

select {
case task = <-e.resultCh:
case <-ctx.Done():
Expand All @@ -233,6 +245,10 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
case <-ctx.Done():
return nil, nil
}

if e.task != nil {
e.task.memTracker.Detach()
}
e.task = task
return task, nil
}
Expand All @@ -241,6 +257,7 @@ func (e *IndexLookUpJoin) lookUpMatchedInners(task *lookUpJoinTask, rowIdx int)
outerKey := task.encodedLookUpKeys.GetRow(rowIdx).GetBytes(0)
e.innerPtrBytes = task.lookupMap.Get(outerKey, e.innerPtrBytes[:0])
task.matchedInners = task.matchedInners[:0]

for _, b := range e.innerPtrBytes {
ptr := *(*chunk.RowPtr)(unsafe.Pointer(&b[0]))
matchedInner := task.innerResult.GetRow(ptr)
Expand Down Expand Up @@ -297,13 +314,19 @@ func (ow *outerWorker) pushToChan(ctx context.Context, task *lookUpJoinTask, dst
// When err is not nil, task must not be nil to send the error to the main thread via task.
func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
ow.executor.newChunk()
task := new(lookUpJoinTask)
task.doneCh = make(chan error, 1)
task.outerResult = ow.executor.newChunk()
task.encodedLookUpKeys = chunk.NewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)})
task.lookupMap = mvmap.NewMVMap()

task := &lookUpJoinTask{
doneCh: make(chan error, 1),
outerResult: ow.executor.newChunk(),
encodedLookUpKeys: chunk.NewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}),
lookupMap: mvmap.NewMVMap(),
}
task.memTracker = memory.NewTracker(fmt.Sprintf("lookup join task %p", task), -1)
task.memTracker.AttachTo(ow.parentMemTracker)

ow.increaseBatchSize()

task.memTracker.Consume(task.outerResult.MemoryUsage())
for task.outerResult.NumRows() < ow.batchSize {
err := ow.executor.NextChunk(ctx, ow.executorChk)
if err != nil {
Expand All @@ -312,7 +335,11 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
if ow.executorChk.NumRows() == 0 {
break
}

oldMemUsage := task.outerResult.MemoryUsage()
task.outerResult.Append(ow.executorChk, 0, ow.executorChk.NumRows())
newMemUsage := task.outerResult.MemoryUsage()
task.memTracker.Consume(newMemUsage - oldMemUsage)
}
if task.outerResult.NumRows() == 0 {
return nil, nil
Expand All @@ -325,6 +352,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
if err != nil {
return task, errors.Trace(err)
}
task.memTracker.Consume(int64(cap(task.outerMatch)))
}
return task, nil
}
Expand Down Expand Up @@ -409,6 +437,8 @@ func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types
task.encodedLookUpKeys.AppendBytes(0, keyBuf)
dLookUpKeys = append(dLookUpKeys, dLookUpKey)
}

task.memTracker.Consume(task.encodedLookUpKeys.MemoryUsage())
return dLookUpKeys, nil
}

Expand Down Expand Up @@ -481,6 +511,8 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
}
defer terror.Call(innerExec.Close)
innerResult := chunk.NewList(innerExec.retTypes(), iw.ctx.GetSessionVars().MaxChunkSize)
innerResult.GetMemTracker().SetLabel("inner result")
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
err := innerExec.NextChunk(ctx, iw.executorChk)
if err != nil {
Expand Down Expand Up @@ -526,5 +558,7 @@ func (e *IndexLookUpJoin) Close() error {
e.cancelFunc()
}
e.workerWg.Wait()
return e.children[0].Close()
e.memTracker.Detach()
e.memTracker = nil
return errors.Trace(e.children[0].Close())
}
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ type SessionVars struct {
MemQuotaTopn int64
// MemQuotaIndexLookupReader defines the memory quota for a index lookup reader executor.
MemQuotaIndexLookupReader int64
// MemQuotaIndexLookupJoin defines the memory quota for a index lookup join executor.
MemQuotaIndexLookupJoin int64

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
Expand Down Expand Up @@ -324,6 +326,7 @@ func NewSessionVars() *SessionVars {
MemQuotaSort: DefTiDBMemQuotaSort,
MemQuotaTopn: DefTiDBMemQuotaTopn,
MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader,
MemQuotaIndexLookupJoin: DefTiDBMemQuotaIndexLookupJoin,
}
var enableStreaming string
if config.GetGlobalConfig().EnableStreaming {
Expand Down Expand Up @@ -504,6 +507,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.MemQuotaTopn = tidbOptInt64(val, DefTiDBMemQuotaTopn)
case TIDBMemQuotaIndexLookupReader:
s.MemQuotaIndexLookupReader = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupReader)
case TIDBMemQuotaIndexLookupJoin:
s.MemQuotaIndexLookupJoin = tidbOptInt64(val, DefTiDBMemQuotaIndexLookupJoin)
case TiDBGeneralLog:
atomic.StoreUint32(&ProcessGeneralLog, uint32(tidbOptPositiveInt(val, DefTiDBGeneralLog)))
case TiDBEnableStreaming:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TIDBMemQuotaSort, strconv.FormatInt(DefTiDBMemQuotaSort, 10)},
{ScopeSession, TIDBMemQuotaTopn, strconv.FormatInt(DefTiDBMemQuotaTopn, 10)},
{ScopeSession, TIDBMemQuotaIndexLookupReader, strconv.FormatInt(DefTiDBMemQuotaIndexLookupReader, 10)},
{ScopeSession, TIDBMemQuotaIndexLookupJoin, strconv.FormatInt(DefTiDBMemQuotaIndexLookupJoin, 10)},
{ScopeSession, TiDBEnableStreaming, "0"},
/* The following variable is defined as session scope but is actually server scope. */
{ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)},
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ const (
// "tidb_mem_quota_sort": control the memory quota of "SortExec".
// "tidb_mem_quota_topn": control the memory quota of "TopNExec".
// "tidb_mem_quota_indexlookupreader": control the memory quota of "IndexLookUpExecutor".
// "tidb_mem_quota_indexlookupjoin": control the memory quota of "IndexLookUpJoin".
TIDBMemQuotaQuery = "tidb_mem_quota_query" // Bytes.
TIDBMemQuotaHashJoin = "tidb_mem_quota_hashjoin" // Bytes.
TIDBMemQuotaSort = "tidb_mem_quota_sort" // Bytes.
TIDBMemQuotaTopn = "tidb_mem_quota_topn" // Bytes.
TIDBMemQuotaIndexLookupReader = "tidb_mem_quota_indexlookupreader" // Bytes.
TIDBMemQuotaIndexLookupJoin = "tidb_mem_quota_indexlookupjoin" // Bytes.

// tidb_general_log is used to log every query in the server in info level.
TiDBGeneralLog = "tidb_general_log"
Expand Down Expand Up @@ -154,6 +156,7 @@ const (
DefTiDBMemQuotaSort = 32 << 30 // 32GB.
DefTiDBMemQuotaTopn = 32 << 30 // 32GB.
DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB.
DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
)

Expand Down