Skip to content

Commit

Permalink
*: support memTracker.detach for HashJoin, Apply and IndexLookUp in C…
Browse files Browse the repository at this point in the history
…lose func (#54095) (#54330)
  • Loading branch information
XuHuaiyu authored Jul 1, 2024
1 parent 1e563fc commit 6cf87c4
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 7 deletions.
7 changes: 5 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,11 @@ func (e *IndexLookUpExecutor) open(ctx context.Context) error {
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
// situation.
e.initRuntimeStats()
e.memTracker = memory.NewTracker(e.id, -1)
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.id, -1)
}
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

e.finished = make(chan struct{})
Expand Down Expand Up @@ -759,7 +763,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker = nil
e.resultCurr = nil
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (e *HashJoinExec) Close() error {
}
e.probeSideTupleFetcher.probeChkResourceCh = nil
terror.Call(e.rowContainer.Close)
e.sessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.rowContainer.ActionSpill())
e.waiterWg.Wait()
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]
Expand Down Expand Up @@ -210,7 +211,11 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
}
e.hashJoinCtx.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

e.diskTracker = disk.NewTracker(e.id, -1)
if e.diskTracker != nil {
e.diskTracker.Reset()
} else {
e.diskTracker = disk.NewTracker(e.id, -1)
}
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)

e.workerWg = util.WaitGroupWrapper{}
Expand Down Expand Up @@ -1419,7 +1424,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

if e.canUseCache {
// create a new one since it may be in the cache
e.innerList = chunk.NewList(retTypes(e.innerExec), e.initCap, e.maxChunkSize)
e.innerList = chunk.NewListWithMemTracker(retTypes(e.innerExec), e.initCap, e.maxChunkSize, e.innerList.GetMemTracker())
} else {
e.innerList.Reset()
}
Expand Down
11 changes: 8 additions & 3 deletions util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,23 @@ type RowPtr struct {
RowIdx uint32
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
// NewListWithMemTracker creates a new List with field types, init chunk size, max chunk size and memory tracker.
func NewListWithMemTracker(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int, tracker *memory.Tracker) *List {
l := &List{
fieldTypes: fieldTypes,
initChunkSize: initChunkSize,
maxChunkSize: maxChunkSize,
memTracker: memory.NewTracker(memory.LabelForChunkList, -1),
memTracker: tracker,
consumedIdx: -1,
}
return l
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
return NewListWithMemTracker(fieldTypes, initChunkSize, maxChunkSize, memory.NewTracker(memory.LabelForChunkList, -1))
}

// GetMemTracker returns the memory tracker of this List.
func (l *List) GetMemTracker() *memory.Tracker {
return l.memTracker
Expand Down
3 changes: 3 additions & 0 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,14 @@ func (c *RowContainer) Close() (err error) {
c.actionSpill.cond.Broadcast()
c.actionSpill.SetFinished()
}
c.memTracker.Detach()
c.diskTracker.Detach()
if c.alreadySpilled() {
err = c.m.records.inDisk.Close()
c.m.records.inDisk = nil
}
c.m.records.inMemory.Clear()
c.m.records.inMemory = nil
return
}

Expand Down
21 changes: 21 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,27 @@ func (t *Tracker) UnbindActions() {
t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{}
}

// UnbindActionFromHardLimit unbinds action from hardLimit.
func (t *Tracker) UnbindActionFromHardLimit(actionToUnbind ActionOnExceed) {
t.actionMuForHardLimit.Lock()
defer t.actionMuForHardLimit.Unlock()

var prev ActionOnExceed
for current := t.actionMuForHardLimit.actionOnExceed; current != nil; current = current.GetFallback() {
if current == actionToUnbind {
if prev == nil {
// actionToUnbind is the first element
t.actionMuForHardLimit.actionOnExceed = current.GetFallback()
} else {
// actionToUnbind is not the first element
prev.SetFallback(current.GetFallback())
}
break
}
prev = current
}
}

// reArrangeFallback merge two action chains and rearrange them by priority in descending order.
func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed {
if a == nil {
Expand Down

0 comments on commit 6cf87c4

Please sign in to comment.