Skip to content

Commit 2aef327

Browse files
authored
*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095) (#54260)
close #54005
1 parent 1e48813 commit 2aef327

File tree

5 files changed

+45
-8
lines changed

5 files changed

+45
-8
lines changed

executor/distsql.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,11 @@ func (e *IndexLookUpExecutor) open(ctx context.Context) error {
547547
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
548548
// situation.
549549
e.initRuntimeStats()
550-
e.memTracker = memory.NewTracker(e.id, -1)
550+
if e.memTracker != nil {
551+
e.memTracker.Reset()
552+
} else {
553+
e.memTracker = memory.NewTracker(e.id, -1)
554+
}
551555
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
552556

553557
e.finished = make(chan struct{})
@@ -805,7 +809,6 @@ func (e *IndexLookUpExecutor) Close() error {
805809
e.tblWorkerWg.Wait()
806810
e.finished = nil
807811
e.workerStarted = false
808-
e.memTracker = nil
809812
e.resultCurr = nil
810813
return nil
811814
}

executor/join.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ func (e *HashJoinExec) Close() error {
173173
}
174174
e.probeSideTupleFetcher.probeChkResourceCh = nil
175175
terror.Call(e.rowContainer.Close)
176+
e.hashJoinCtx.sessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.rowContainer.ActionSpill())
176177
e.waiterWg.Wait()
177178
}
178179
e.outerMatchedStatus = e.outerMatchedStatus[:0]
@@ -211,8 +212,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
211212
}
212213
e.hashJoinCtx.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
213214

214-
e.diskTracker = disk.NewTracker(e.id, -1)
215-
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)
215+
if e.hashJoinCtx.diskTracker != nil {
216+
e.hashJoinCtx.diskTracker.Reset()
217+
} else {
218+
e.hashJoinCtx.diskTracker = disk.NewTracker(e.id, -1)
219+
}
220+
e.hashJoinCtx.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)
216221

217222
e.workerWg = util.WaitGroupWrapper{}
218223
e.waiterWg = util.WaitGroupWrapper{}
@@ -1420,7 +1425,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
14201425

14211426
if e.canUseCache {
14221427
// create a new one since it may be in the cache
1423-
e.innerList = chunk.NewList(retTypes(e.innerExec), e.initCap, e.maxChunkSize)
1428+
e.innerList = chunk.NewListWithMemTracker(retTypes(e.innerExec), e.initCap, e.maxChunkSize, e.innerList.GetMemTracker())
14241429
} else {
14251430
e.innerList.Reset()
14261431
}

util/chunk/list.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,23 @@ type RowPtr struct {
4040
RowIdx uint32
4141
}
4242

43-
// NewList creates a new List with field types, init chunk size and max chunk size.
44-
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
43+
// NewListWithMemTracker creates a new List with field types, init chunk size, max chunk size and memory tracker.
44+
func NewListWithMemTracker(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int, tracker *memory.Tracker) *List {
4545
l := &List{
4646
fieldTypes: fieldTypes,
4747
initChunkSize: initChunkSize,
4848
maxChunkSize: maxChunkSize,
49-
memTracker: memory.NewTracker(memory.LabelForChunkList, -1),
49+
memTracker: tracker,
5050
consumedIdx: -1,
5151
}
5252
return l
5353
}
5454

55+
// NewList creates a new List with field types, init chunk size and max chunk size.
56+
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
57+
return NewListWithMemTracker(fieldTypes, initChunkSize, maxChunkSize, memory.NewTracker(memory.LabelForChunkList, -1))
58+
}
59+
5560
// GetMemTracker returns the memory tracker of this List.
5661
func (l *List) GetMemTracker() *memory.Tracker {
5762
return l.memTracker

util/chunk/row_container.go

+3
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,14 @@ func (c *RowContainer) Close() (err error) {
318318
c.actionSpill.cond.Broadcast()
319319
c.actionSpill.SetFinished()
320320
}
321+
c.memTracker.Detach()
322+
c.diskTracker.Detach()
321323
if c.alreadySpilled() {
322324
err = c.m.records.inDisk.Close()
323325
c.m.records.inDisk = nil
324326
}
325327
c.m.records.inMemory.Clear()
328+
c.m.records.inMemory = nil
326329
return
327330
}
328331

util/memory/tracker.go

+21
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,27 @@ func (t *Tracker) UnbindActions() {
247247
t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{}
248248
}
249249

250+
// UnbindActionFromHardLimit unbinds action from hardLimit.
251+
func (t *Tracker) UnbindActionFromHardLimit(actionToUnbind ActionOnExceed) {
252+
t.actionMuForHardLimit.Lock()
253+
defer t.actionMuForHardLimit.Unlock()
254+
255+
var prev ActionOnExceed
256+
for current := t.actionMuForHardLimit.actionOnExceed; current != nil; current = current.GetFallback() {
257+
if current == actionToUnbind {
258+
if prev == nil {
259+
// actionToUnbind is the first element
260+
t.actionMuForHardLimit.actionOnExceed = current.GetFallback()
261+
} else {
262+
// actionToUnbind is not the first element
263+
prev.SetFallback(current.GetFallback())
264+
}
265+
break
266+
}
267+
prev = current
268+
}
269+
}
270+
250271
// reArrangeFallback merge two action chains and rearrange them by priority in descending order.
251272
func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed {
252273
if a == nil {

0 commit comments

Comments
 (0)