From af96f19db3f61efb314cde5759d69b13f6330768 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Wed, 26 Jun 2024 19:10:22 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #54095 Signed-off-by: ti-chi-bot --- pkg/executor/aggregate/agg_hash_executor.go | 12 ++++++++++-- pkg/executor/distsql.go | 7 +++++-- pkg/executor/join.go | 18 ++++++++++++++++-- pkg/util/chunk/list.go | 11 ++++++++--- pkg/util/chunk/row_container.go | 3 +++ pkg/util/memory/tracker.go | 21 +++++++++++++++++++++ 6 files changed, 63 insertions(+), 9 deletions(-) diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index a8c5fda843448..793eecc3e98b6 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -273,7 +273,11 @@ func (e *HashAggExec) initForUnparallelExec() { e.tmpChkForSpill = exec.TryNewCacheChunk(e.Children(0)) if vars := e.Ctx().GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() { - e.diskTracker = disk.NewTracker(e.ID(), -1) + if e.diskTracker != nil { + e.diskTracker.Reset() + } else { + e.diskTracker = disk.NewTracker(e.ID(), -1) + } e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker) e.dataInDisk.GetDiskTracker().AttachTo(e.diskTracker) vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill()) @@ -396,7 +400,11 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) error { }, spillChunkFieldTypes) if isTrackerEnabled && isParallelHashAggSpillEnabled { - e.diskTracker = disk.NewTracker(e.ID(), -1) + if e.diskTracker != nil { + e.diskTracker.Reset() + } else { + e.diskTracker = disk.NewTracker(e.ID(), -1) + } e.diskTracker.AttachTo(sessionVars.StmtCtx.DiskTracker) e.spillHelper.diskTracker = e.diskTracker sessionVars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill()) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index a1348e1db20c1..7e1031e2ac74f 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -574,7 +574,11 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error { // constructed by a "IndexLookUpJoin" and "Open" will not be called in that // situation. e.initRuntimeStats() - e.memTracker = memory.NewTracker(e.ID(), -1) + if e.memTracker != nil { + e.memTracker.Reset() + } else { + e.memTracker = memory.NewTracker(e.ID(), -1) + } e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) e.finished = make(chan struct{}) @@ -858,7 +862,6 @@ func (e *IndexLookUpExecutor) Close() error { e.tblWorkerWg.Wait() e.finished = nil e.workerStarted = false - e.memTracker = nil e.resultCurr = nil return nil } diff --git a/pkg/executor/join.go b/pkg/executor/join.go index 3c47feceabe7d..722e5a872bfd6 100644 --- a/pkg/executor/join.go +++ b/pkg/executor/join.go @@ -174,8 +174,14 @@ func (e *HashJoinExec) Close() error { close(e.probeWorkers[i].joinChkResourceCh) channel.Clear(e.probeWorkers[i].joinChkResourceCh) } +<<<<<<< HEAD:pkg/executor/join.go e.probeSideTupleFetcher.probeChkResourceCh = nil terror.Call(e.rowContainer.Close) +======= + e.ProbeSideTupleFetcher.probeChkResourceCh = nil + terror.Call(e.RowContainer.Close) + e.HashJoinCtxV1.SessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.RowContainer.ActionSpill()) +>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go e.waiterWg.Wait() } e.outerMatchedStatus = e.outerMatchedStatus[:0] @@ -214,8 +220,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error { } e.hashJoinCtx.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) - e.diskTracker = disk.NewTracker(e.ID(), -1) - e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) + if e.HashJoinCtxV1.diskTracker != nil { + e.HashJoinCtxV1.diskTracker.Reset() + } else { + e.HashJoinCtxV1.diskTracker = disk.NewTracker(e.ID(), -1) + } + e.HashJoinCtxV1.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) e.workerWg = util.WaitGroupWrapper{} e.waiterWg = util.WaitGroupWrapper{} @@ -1468,7 +1478,11 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error { if e.canUseCache { // create a new one since it may be in the cache +<<<<<<< HEAD:pkg/executor/join.go e.innerList = chunk.NewList(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize()) +======= + e.InnerList = chunk.NewListWithMemTracker(exec.RetTypes(e.InnerExec), e.InitCap(), e.MaxChunkSize(), e.InnerList.GetMemTracker()) +>>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go } else { e.innerList.Reset() } diff --git a/pkg/util/chunk/list.go b/pkg/util/chunk/list.go index f8246850ecd6b..4dea40d1ec646 100644 --- a/pkg/util/chunk/list.go +++ b/pkg/util/chunk/list.go @@ -40,18 +40,23 @@ type RowPtr struct { RowIdx uint32 } -// NewList creates a new List with field types, init chunk size and max chunk size. -func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List { +// NewListWithMemTracker creates a new List with field types, init chunk size, max chunk size and memory tracker. +func NewListWithMemTracker(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int, tracker *memory.Tracker) *List { l := &List{ fieldTypes: fieldTypes, initChunkSize: initChunkSize, maxChunkSize: maxChunkSize, - memTracker: memory.NewTracker(memory.LabelForChunkList, -1), + memTracker: tracker, consumedIdx: -1, } return l } +// NewList creates a new List with field types, init chunk size and max chunk size. +func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List { + return NewListWithMemTracker(fieldTypes, initChunkSize, maxChunkSize, memory.NewTracker(memory.LabelForChunkList, -1)) +} + // GetMemTracker returns the memory tracker of this List. func (l *List) GetMemTracker() *memory.Tracker { return l.memTracker diff --git a/pkg/util/chunk/row_container.go b/pkg/util/chunk/row_container.go index 899109b6f4ec4..9b443569fb05c 100644 --- a/pkg/util/chunk/row_container.go +++ b/pkg/util/chunk/row_container.go @@ -340,11 +340,14 @@ func (c *RowContainer) Close() (err error) { c.actionSpill.cond.Broadcast() c.actionSpill.SetFinished() } + c.memTracker.Detach() + c.diskTracker.Detach() if c.alreadySpilled() { err = c.m.records.inDisk.Close() c.m.records.inDisk = nil } c.m.records.inMemory.Clear() + c.m.records.inMemory = nil return } diff --git a/pkg/util/memory/tracker.go b/pkg/util/memory/tracker.go index ec5a84c7b8ec5..a2863b5ef6d2f 100644 --- a/pkg/util/memory/tracker.go +++ b/pkg/util/memory/tracker.go @@ -245,6 +245,27 @@ func (t *Tracker) UnbindActions() { t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{} } +// UnbindActionFromHardLimit unbinds action from hardLimit. +func (t *Tracker) UnbindActionFromHardLimit(actionToUnbind ActionOnExceed) { + t.actionMuForHardLimit.Lock() + defer t.actionMuForHardLimit.Unlock() + + var prev ActionOnExceed + for current := t.actionMuForHardLimit.actionOnExceed; current != nil; current = current.GetFallback() { + if current == actionToUnbind { + if prev == nil { + // actionToUnbind is the first element + t.actionMuForHardLimit.actionOnExceed = current.GetFallback() + } else { + // actionToUnbind is not the first element + prev.SetFallback(current.GetFallback()) + } + break + } + prev = current + } +} + // reArrangeFallback merge two action chains and rearrange them by priority in descending order. func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed { if a == nil { From 47af79411ee92a52f8a0970077c727680a37f31a Mon Sep 17 00:00:00 2001 From: XuHuaiyu <391585975@qq.com> Date: Thu, 27 Jun 2024 12:02:25 +0800 Subject: [PATCH 2/2] resolve conflicts --- pkg/executor/join.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/pkg/executor/join.go b/pkg/executor/join.go index 722e5a872bfd6..f9e745b3b8c25 100644 --- a/pkg/executor/join.go +++ b/pkg/executor/join.go @@ -174,14 +174,9 @@ func (e *HashJoinExec) Close() error { close(e.probeWorkers[i].joinChkResourceCh) channel.Clear(e.probeWorkers[i].joinChkResourceCh) } -<<<<<<< HEAD:pkg/executor/join.go e.probeSideTupleFetcher.probeChkResourceCh = nil terror.Call(e.rowContainer.Close) -======= - e.ProbeSideTupleFetcher.probeChkResourceCh = nil - terror.Call(e.RowContainer.Close) - e.HashJoinCtxV1.SessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.RowContainer.ActionSpill()) ->>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go + e.hashJoinCtx.sessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.rowContainer.ActionSpill()) e.waiterWg.Wait() } e.outerMatchedStatus = e.outerMatchedStatus[:0] @@ -220,12 +215,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error { } e.hashJoinCtx.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) - if e.HashJoinCtxV1.diskTracker != nil { - e.HashJoinCtxV1.diskTracker.Reset() + if e.hashJoinCtx.diskTracker != nil { + e.hashJoinCtx.diskTracker.Reset() } else { - e.HashJoinCtxV1.diskTracker = disk.NewTracker(e.ID(), -1) + e.hashJoinCtx.diskTracker = disk.NewTracker(e.ID(), -1) } - e.HashJoinCtxV1.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) + e.hashJoinCtx.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker) e.workerWg = util.WaitGroupWrapper{} e.waiterWg = util.WaitGroupWrapper{} @@ -1478,11 +1473,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error { if e.canUseCache { // create a new one since it may be in the cache -<<<<<<< HEAD:pkg/executor/join.go - e.innerList = chunk.NewList(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize()) -======= - e.InnerList = chunk.NewListWithMemTracker(exec.RetTypes(e.InnerExec), e.InitCap(), e.MaxChunkSize(), e.InnerList.GetMemTracker()) ->>>>>>> 374f7b0a5ee (*: support memTracker.detach for HashJoin, Apply and IndexLookUp in Close func (#54095)):pkg/executor/join/hash_join_v1.go + e.innerList = chunk.NewListWithMemTracker(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize(), e.innerList.GetMemTracker()) } else { e.innerList.Reset() }