Skip to content

Commit

Permalink
*: support memTracker.detach for HashJoin, Apply and IndexLookUp in C…
Browse files Browse the repository at this point in the history
…lose func (#54095) (#54264)

close #54005
  • Loading branch information
ti-chi-bot authored Jul 18, 2024
1 parent 431781f commit 4ba6bdd
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 9 deletions.
6 changes: 5 additions & 1 deletion pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ func (e *HashAggExec) initForUnparallelExec() {

e.tmpChkForSpill = exec.TryNewCacheChunk(e.Children(0))
if vars := e.Ctx().GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
e.diskTracker = disk.NewTracker(e.ID(), -1)
if e.diskTracker != nil {
e.diskTracker.Reset()
} else {
e.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker)
e.listInDisk.GetDiskTracker().AttachTo(e.diskTracker)
vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
Expand Down
7 changes: 5 additions & 2 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,11 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
// situation.
e.initRuntimeStats()
e.memTracker = memory.NewTracker(e.ID(), -1)
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)

e.finished = make(chan struct{})
Expand Down Expand Up @@ -846,7 +850,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker = nil
e.resultCurr = nil
return nil
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (e *HashJoinExec) Close() error {
}
e.probeSideTupleFetcher.probeChkResourceCh = nil
terror.Call(e.rowContainer.Close)
e.hashJoinCtx.sessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.rowContainer.ActionSpill())
e.waiterWg.Wait()
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]
Expand Down Expand Up @@ -214,8 +215,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
}
e.hashJoinCtx.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)

e.diskTracker = disk.NewTracker(e.ID(), -1)
e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker)
if e.hashJoinCtx.diskTracker != nil {
e.hashJoinCtx.diskTracker.Reset()
} else {
e.hashJoinCtx.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.hashJoinCtx.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker)

e.workerWg = util.WaitGroupWrapper{}
e.waiterWg = util.WaitGroupWrapper{}
Expand Down Expand Up @@ -1423,7 +1428,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

if e.canUseCache {
// create a new one since it may be in the cache
e.innerList = chunk.NewList(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize())
e.innerList = chunk.NewListWithMemTracker(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize(), e.innerList.GetMemTracker())
} else {
e.innerList.Reset()
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,23 @@ type RowPtr struct {
RowIdx uint32
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
// NewListWithMemTracker creates a new List with field types, init chunk size, max chunk size and memory tracker.
func NewListWithMemTracker(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int, tracker *memory.Tracker) *List {
l := &List{
fieldTypes: fieldTypes,
initChunkSize: initChunkSize,
maxChunkSize: maxChunkSize,
memTracker: memory.NewTracker(memory.LabelForChunkList, -1),
memTracker: tracker,
consumedIdx: -1,
}
return l
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
return NewListWithMemTracker(fieldTypes, initChunkSize, maxChunkSize, memory.NewTracker(memory.LabelForChunkList, -1))
}

// GetMemTracker returns the memory tracker of this List.
func (l *List) GetMemTracker() *memory.Tracker {
return l.memTracker
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,14 @@ func (c *RowContainer) Close() (err error) {
c.actionSpill.cond.Broadcast()
c.actionSpill.SetFinished()
}
c.memTracker.Detach()
c.diskTracker.Detach()
if c.alreadySpilled() {
err = c.m.records.inDisk.Close()
c.m.records.inDisk = nil
}
c.m.records.inMemory.Clear()
c.m.records.inMemory = nil
return
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,27 @@ func (t *Tracker) UnbindActions() {
t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{}
}

// UnbindActionFromHardLimit unbinds action from hardLimit.
func (t *Tracker) UnbindActionFromHardLimit(actionToUnbind ActionOnExceed) {
t.actionMuForHardLimit.Lock()
defer t.actionMuForHardLimit.Unlock()

var prev ActionOnExceed
for current := t.actionMuForHardLimit.actionOnExceed; current != nil; current = current.GetFallback() {
if current == actionToUnbind {
if prev == nil {
// actionToUnbind is the first element
t.actionMuForHardLimit.actionOnExceed = current.GetFallback()
} else {
// actionToUnbind is not the first element
prev.SetFallback(current.GetFallback())
}
break
}
prev = current
}
}

// reArrangeFallback merge two action chains and rearrange them by priority in descending order.
func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed {
if a == nil {
Expand Down

0 comments on commit 4ba6bdd

Please sign in to comment.