Skip to content

Commit

Permalink
*: extend the semantic of mem-quota-query to mem-quota-session (#38423)
Browse files Browse the repository at this point in the history
close #38429
  • Loading branch information
XuHuaiyu authored Nov 2, 2022
1 parent e245b84 commit ef95612
Show file tree
Hide file tree
Showing 38 changed files with 188 additions and 168 deletions.
6 changes: 3 additions & 3 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,12 +741,12 @@ func TestStmtHints(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(1 GB) */ * from t use index(idx)")
tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(2 GB) */ * from t use index(idx)")
tk.MustQuery("select * from t")
require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().StmtCtx.MemQuotaQuery)
require.Equal(t, int64(2147483648), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(100), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
tk.MustQuery("select a, b from t")
require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.MemQuotaQuery)
require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().MemTracker.GetBytesLimit())
require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime)
}

Expand Down
2 changes: 2 additions & 0 deletions ddl/concurrentddltest/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestConcurrentDDLSwitch(t *testing.T) {
count++
if b {
tk := testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().MemQuotaQuery = -1
tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0"))
tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0"))
}
Expand All @@ -121,6 +122,7 @@ func TestConcurrentDDLSwitch(t *testing.T) {
require.Greater(t, count, 0)

tk = testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().MemQuotaQuery = -1
tk.MustExec("use test")
for i, tbl := range tables {
tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx)))
Expand Down
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie

ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
option := &kv.ClientSendOption{
SessionMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
SessionMemTracker: sctx.GetSessionVars().MemTracker,
EnabledRateLimitAction: enabledRateLimitAction,
EventCb: eventCb,
EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(),
Expand Down
12 changes: 6 additions & 6 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}

if sctx.GetSessionVars().StmtCtx.HasMemQuotaHint {
sctx.GetSessionVars().StmtCtx.MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery)
sctx.GetSessionVars().MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery)
}

e, err := a.buildExecutor()
Expand Down Expand Up @@ -697,7 +697,7 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
// If the stmt have no rs like `insert`, The session tracker detachment will be directly
// done in the `defer` function. If the rs is not nil, the detachment will be done in
// `rs.Close` in `handleStmt`
if sc != nil && rs == nil {
if handled && sc != nil && rs == nil {
if sc.MemTracker != nil {
sc.MemTracker.Detach()
}
Expand Down Expand Up @@ -1443,8 +1443,8 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
execDetail := stmtCtx.GetExecDetails()
copTaskInfo := stmtCtx.CopTasksDetails()
statsInfos := plannercore.GetStatsInfoFromFlatPlan(flat)
memMax := stmtCtx.MemTracker.MaxConsumed()
diskMax := stmtCtx.DiskTracker.MaxConsumed()
memMax := sessVars.MemTracker.MaxConsumed()
diskMax := sessVars.DiskTracker.MaxConsumed()
_, planDigest := getPlanDigest(stmtCtx)

binaryPlan := ""
Expand Down Expand Up @@ -1714,8 +1714,8 @@ func (a *ExecStmt) SummaryStmt(succ bool) {

execDetail := stmtCtx.GetExecDetails()
copTaskInfo := stmtCtx.CopTasksDetails()
memMax := stmtCtx.MemTracker.MaxConsumed()
diskMax := stmtCtx.DiskTracker.MaxConsumed()
memMax := sessVars.MemTracker.MaxConsumed()
diskMax := sessVars.DiskTracker.MaxConsumed()
sql := a.GetTextToLog()
var stmtDetail execdetails.StmtExecDetails
stmtDetailRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey)
Expand Down
9 changes: 3 additions & 6 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ func (e *HashAggExec) initForUnparallelExec() {
e.executed, e.isChildDrained = false, false
e.listInDisk = chunk.NewListInDisk(retTypes(e.children[0]))
e.tmpChkForSpill = newFirstChunk(e.children[0])
if e.ctx.GetSessionVars().TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
if vars := e.ctx.GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
e.diskTracker = disk.NewTracker(e.id, -1)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)
e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker)
e.listInDisk.GetDiskTracker().AttachTo(e.diskTracker)
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
}
}

Expand Down Expand Up @@ -1952,6 +1952,3 @@ func (a *AggSpillDiskAction) Action(t *memory.Tracker) {
func (*AggSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
func (*AggSpillDiskAction) SetLogHook(_ func(uint64)) {}
6 changes: 4 additions & 2 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,10 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
t := memory.NewTracker(-1, memLimit)
t.SetActionOnExceed(nil)
t2 := disk.NewTracker(-1, -1)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
e.ctx.GetSessionVars().StmtCtx.DiskTracker = t2
e.ctx.GetSessionVars().MemTracker = t
e.ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(t)
e.ctx.GetSessionVars().DiskTracker = t2
e.ctx.GetSessionVars().StmtCtx.DiskTracker.AttachTo(t2)
return e
}

Expand Down
2 changes: 1 addition & 1 deletion executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM
actionSpill = tbl.(*cteutil.StorageRC).ActionSpillForTest()
}
})
ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill)
ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill)
}
return actionSpill
}
Expand Down
47 changes: 25 additions & 22 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ func init() {
schematracker.ConstructResultOfShowCreateTable = ConstructResultOfShowCreateTable
}

// SetLogHook sets a hook for PanicOnExceed.
func (a *globalPanicOnExceed) SetLogHook(hook func(uint64)) {}

// Action panics when storage usage exceeds storage quota.
func (a *globalPanicOnExceed) Action(t *memory.Tracker) {
a.mutex.Lock()
Expand Down Expand Up @@ -1947,31 +1944,37 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

vars.MemTracker.UnbindActions()
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
vars.MemTracker.ResetMaxConsumed()
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID = vars.ConnectionID

if _, ok := s.(*ast.AnalyzeTableStmt); ok {
sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1)
sc.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker)
vars.MemTracker.SetBytesLimit(-1)
} else {
sc.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery)
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
sc.MemTracker.IsRootTrackerOfSess, sc.MemTracker.SessionID = true, vars.ConnectionID
sc.InitMemTracker(memory.LabelForSQLText, -1)
logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota
switch variable.OOMAction.Load() {
case variable.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: vars.ConnectionID}
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
case variable.OOMActionLog:
fallthrough
default:
action := &memory.LogOnExceed{ConnID: vars.ConnectionID}
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
}
}

sc.MemTracker.SessionID = vars.ConnectionID
sc.MemTracker.AttachTo(vars.MemTracker)
sc.InitDiskTracker(memory.LabelForSQLText, -1)
globalConfig := config.GetGlobalConfig()
if variable.EnableTmpStorageOnOOM.Load() && GlobalDiskUsageTracker != nil {
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
}
switch variable.OOMAction.Load() {
case variable.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
sc.MemTracker.SetActionOnExceed(action)
case variable.OOMActionLog:
fallthrough
default:
action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
sc.MemTracker.SetActionOnExceed(action)
if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil {
sc.DiskTracker.AttachTo(vars.DiskTracker)
}
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
Expand Down
18 changes: 13 additions & 5 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func TestSortSpillDisk(t *testing.T) {
ctx.GetSessionVars().MemQuota.MemQuotaQuery = 1
ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
cas := &sortCase{rows: 2048, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
opt := mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
Expand Down Expand Up @@ -342,7 +344,9 @@ func TestSortSpillDisk(t *testing.T) {
err = exec.Close()
require.NoError(t, err)

ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 1)
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
dataSource.prepareChunks()
err = exec.Open(tmpCtx)
require.NoError(t, err)
Expand Down Expand Up @@ -372,7 +376,9 @@ func TestSortSpillDisk(t *testing.T) {
err = exec.Close()
require.NoError(t, err)

ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 28000)
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 28000)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
dataSource.prepareChunks()
err = exec.Open(tmpCtx)
require.NoError(t, err)
Expand All @@ -394,8 +400,10 @@ func TestSortSpillDisk(t *testing.T) {
ctx = mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 16864*50)
ctx.GetSessionVars().StmtCtx.MemTracker.Consume(16864 * 45)
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 16864*50)
ctx.GetSessionVars().MemTracker.Consume(16864 * 45)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
cas = &sortCase{rows: 20480, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
opt = mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
Expand Down
8 changes: 4 additions & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6071,13 +6071,13 @@ func TestGlobalMemoryControl(t *testing.T) {
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

tk1 := testkit.NewTestKit(t, store)
tracker1 := tk1.Session().GetSessionVars().StmtCtx.MemTracker
tracker1 := tk1.Session().GetSessionVars().MemTracker

tk2 := testkit.NewTestKit(t, store)
tracker2 := tk2.Session().GetSessionVars().StmtCtx.MemTracker
tracker2 := tk2.Session().GetSessionVars().MemTracker

tk3 := testkit.NewTestKit(t, store)
tracker3 := tk3.Session().GetSessionVars().StmtCtx.MemTracker
tracker3 := tk3.Session().GetSessionVars().MemTracker

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk1.Session().ShowProcess(), tk2.Session().ShowProcess(), tk3.Session().ShowProcess()},
Expand Down Expand Up @@ -6156,7 +6156,7 @@ func TestGlobalMemoryControl2(t *testing.T) {
}()
sql := "select * from t t1 join t t2 join t t3 on t1.a=t2.a and t1.a=t3.a order by t1.a;" // Need 500MB
require.True(t, strings.Contains(tk0.QueryToErr(sql).Error(), "Out Of Memory Quota!"))
require.Equal(t, tk0.Session().GetSessionVars().StmtCtx.DiskTracker.MaxConsumed(), int64(0))
require.Equal(t, tk0.Session().GetSessionVars().DiskTracker.MaxConsumed(), int64(0))
wg.Wait()
test[0] = 0
runtime.GC()
Expand Down
2 changes: 1 addition & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
minHeapInUse: mathutil.Abs(minHeapInUse),
alarmRatio: alarmRatio,
autoGC: minHeapInUse > 0,
memTracker: e.ctx.GetSessionVars().StmtCtx.MemTracker,
memTracker: e.ctx.GetSessionVars().MemTracker,
wg: &waitGroup,
}).run()
}
Expand Down
8 changes: 3 additions & 5 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,17 +446,15 @@ func TestIndexMergeReaderMemTracker(t *testing.T) {
insertStr += fmt.Sprintf(" ,(%d, %d, %d)", i, i, i)
}
insertStr += ";"
memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker
memTracker := tk.Session().GetSessionVars().MemTracker

tk.MustExec(insertStr)

oriMaxUsage := memTracker.MaxConsumed()

// We select all rows in t1, so the mem usage is more clear.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where c1 > 1 or c2 > 1")

newMaxUsage := memTracker.MaxConsumed()
require.Greater(t, newMaxUsage, oriMaxUsage)
memUsage := memTracker.MaxConsumed()
require.Greater(t, memUsage, int64(0))

res := tk.MustQuery("explain analyze select /*+ use_index_merge(t1) */ * from t1 where c1 > 1 or c2 > 1")
require.Len(t, res.Rows(), 4)
Expand Down
2 changes: 1 addition & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu
defer actionSpill.(*chunk.SpillDiskAction).WaitForTest()
}
})
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill)
e.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill)
}
for chk := range buildSideResultCh {
if e.finished.Load().(bool) {
Expand Down
2 changes: 1 addition & 1 deletion executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (t *mergeJoinTable) init(exec *MergeJoinExec) {
actionSpill = t.rowContainer.ActionSpillForTest()
}
})
exec.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill)
exec.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill)
}
t.memTracker = memory.NewTracker(memory.LabelForInnerTable, -1)
} else {
Expand Down
8 changes: 4 additions & 4 deletions executor/merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ func TestShuffleMergeJoinInDisk(t *testing.T) {

result := checkMergeAndRun(tk, t, "select /*+ TIDB_SMJ(t) */ * from t1 left outer join t on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20")
result.Check(testkit.Rows("1 3 1 1"))
require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed())
require.Greater(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.MaxConsumed(), int64(0))
require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.DiskTracker.BytesConsumed())
require.Greater(t, tk.Session().GetSessionVars().StmtCtx.DiskTracker.MaxConsumed(), int64(0))
require.Equal(t, int64(0), tk.Session().GetSessionVars().MemTracker.BytesConsumed())
require.Greater(t, tk.Session().GetSessionVars().MemTracker.MaxConsumed(), int64(0))
require.Equal(t, int64(0), tk.Session().GetSessionVars().DiskTracker.BytesConsumed())
require.GreaterOrEqual(t, tk.Session().GetSessionVars().DiskTracker.MaxConsumed(), int64(0))
}

func TestMergeJoinInDisk(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions executor/oomtest/oom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestMemTracker4DeleteExec(t *testing.T) {
require.Equal(t, "", oom.GetTracker())
tk.MustExec("insert into MemTracker4DeleteExec1 values (1,1,1), (2,2,2), (3,3,3)")
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.Session().GetSessionVars().MemTracker.SetBytesLimit(1)
tk.MustExec("delete from MemTracker4DeleteExec1")
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())

Expand Down
4 changes: 2 additions & 2 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
defer e.spillAction.WaitForTest()
}
})
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(e.spillAction)
e.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(e.spillAction)
e.rowChunks.GetDiskTracker().AttachTo(e.diskTracker)
e.rowChunks.GetDiskTracker().SetLabel(memory.LabelForRowChunks)
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
defer e.spillAction.WaitForTest()
}
})
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(e.spillAction)
e.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(e.spillAction)
err = e.rowChunks.Add(chk)
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ func (e *Explain) RenderResult() error {
e.SCtx().GetSessionVars().MemoryDebugModeMinHeapInUse != 0 &&
e.SCtx().GetSessionVars().MemoryDebugModeAlarmRatio > 0 {
row := e.Rows[0]
tracker := e.SCtx().GetSessionVars().StmtCtx.MemTracker
tracker := e.SCtx().GetSessionVars().MemTracker
row[7] = row[7] + "(Total: " + tracker.FormatBytes(tracker.MaxConsumed()) + ")"
}
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/plan_cost_ver1.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint
}
sessVars := p.ctx.GetSessionVars()
oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load()
memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint
memQuota := sessVars.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint
rowSize := getAvgRowSize(build.statsInfo(), build.Schema())
spill := oomUseTmpStorage && memQuota > 0 && rowSize*buildCnt > float64(memQuota) && p.storeTp != kv.TiFlash
// Cost of building hash table.
Expand Down Expand Up @@ -1048,7 +1048,7 @@ func (p *PhysicalSort) GetCost(count float64, schema *expression.Schema) float64
memoryCost := count * sessVars.GetMemoryFactor()

oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load()
memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint
memQuota := sessVars.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint
rowSize := getAvgRowSize(p.statsInfo(), schema)
spill := oomUseTmpStorage && memQuota > 0 && rowSize*count > float64(memQuota)
diskCost := count * sessVars.GetDiskFactor() * rowSize
Expand Down
2 changes: 1 addition & 1 deletion planner/core/plan_cost_ver2.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (p *PhysicalSort) getPlanCostVer2(taskType property.TaskType, option *PlanC
memFactor := getTaskMemFactorVer2(p, taskType)
diskFactor := defaultVer2Factors.TiDBDisk
oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load()
memQuota := p.ctx.GetSessionVars().StmtCtx.MemTracker.GetBytesLimit()
memQuota := p.ctx.GetSessionVars().MemTracker.GetBytesLimit()
spill := taskType == property.RootTaskType && // only TiDB can spill
oomUseTmpStorage && // spill is enabled
memQuota > 0 && // mem-quota is set
Expand Down
Loading

0 comments on commit ef95612

Please sign in to comment.