Skip to content

Commit

Permalink
executor: fix the fail of triggering topn fallback action (#56187)
Browse files Browse the repository at this point in the history
close #56185
  • Loading branch information
xzhangxian1008 authored Sep 23, 2024
1 parent a70cc19 commit 7202c38
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 10 deletions.
8 changes: 1 addition & 7 deletions pkg/executor/aggregate/agg_spill.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (p *ParallelAggSpillDiskAction) Action(t *memory.Tracker) {
return
}

p.triggerFallBackAction(t)
p.TriggerFallBackAction(t)
}

// Return true if we successfully set flag
Expand All @@ -386,12 +386,6 @@ func (p *ParallelAggSpillDiskAction) actionImpl(t *memory.Tracker) bool {
return p.spillHelper.setNeedSpill(p.e.memTracker, t)
}

func (p *ParallelAggSpillDiskAction) triggerFallBackAction(t *memory.Tracker) {
if fallback := p.GetFallback(); fallback != nil {
fallback.Action(t)
}
}

// GetPriority get the priority of the Action
func (*ParallelAggSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/sortexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
timeout = "short",
srcs = ["sort_test.go"],
flaky = True,
shard_count = 15,
shard_count = 16,
deps = [
"//pkg/config",
"//pkg/sessionctx/variable",
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/sortexec/sort_spill.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *parallelSortSpillAction) Action(t *memory.Tracker) {
}

if t.CheckExceed() && !hasEnoughDataToSpill(s.spillHelper.sortExec.memTracker, t) {
s.GetFallback().Action(t)
s.TriggerFallBackAction(t)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/sortexec/topn_spill.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,6 @@ func (t *topNSpillAction) Action(tracker *memory.Tracker) {
}

if tracker.CheckExceed() && !hasEnoughData {
t.GetFallback()
t.TriggerFallBackAction(tracker)
}
}
30 changes: 30 additions & 0 deletions pkg/executor/sortexec/topn_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var totalRowNum = 10000
var noSpillCaseHardLimit = hardLimit2
var spillCase1HardLimit = hardLimit1
var spillCase2HardLimit = hardLimit1
var fallBackHardLimit = hardLimit1
var inMemoryThenSpillHardLimit = hardLimit1 * 2

// Test is successful if there is no hang
Expand Down Expand Up @@ -492,3 +493,32 @@ func TestIssue54206(t *testing.T) {
tk.MustExec("insert into t1 values(1, 1);")
tk.MustQuery("select t1.a+t1.b as result from t1 left join t2 on 1 = 0 order by result limit 1;")
}

func TestTopNFallBackAction(t *testing.T) {
sortexec.SetSmallSpillChunkSizeForTest()
ctx := mock.NewContext()
topNCase := &testutil.SortCase{Rows: totalRowNum, OrderByIdx: []int{0, 1}, Ndvs: []int{0, 0}, Ctx: ctx}
newRootExceedAction := new(testutil.MockActionOnExceed)

ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, fallBackHardLimit)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction)
ctx.GetSessionVars().MemTracker.Consume(int64(float64(fallBackHardLimit) * 0.99999))

schema := expression.NewSchema(topNCase.Columns()...)
dataSource := buildDataSource(topNCase, schema)

count := uint64(totalRowNum / 5)
offset := count / 5

exe := buildTopNExec(topNCase, dataSource, offset, count)

dataSource.PrepareChunks()
_ = executeTopNExecutor(t, exe)
err := exe.Close()
require.NoError(t, err)

require.Less(t, 0, newRootExceedAction.GetTriggeredNum())
}
7 changes: 7 additions & 0 deletions pkg/util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func (b *BaseOOMAction) GetFallback() ActionOnExceed {
return b.fallbackAction
}

// TriggerFallBackAction triggers the fallback action of the current action
func (b *BaseOOMAction) TriggerFallBackAction(tracker *Tracker) {
if fallback := b.GetFallback(); fallback != nil {
fallback.Action(tracker)
}
}

// Default OOM Action priority.
const (
DefPanicPriority = iota
Expand Down

0 comments on commit 7202c38

Please sign in to comment.