From 5c336fc062d68ef89ad8641b1a59bff8ff2c2f48 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 12 Oct 2022 15:39:45 +0800 Subject: [PATCH 01/19] *: extend the semantic of mem-quota-query to mem-quota-session --- distsql/distsql.go | 2 +- executor/adapter.go | 10 ++++----- executor/aggregate.go | 9 +++----- executor/cte.go | 2 +- executor/executor.go | 25 +++++----------------- executor/explain.go | 2 +- executor/join.go | 2 +- executor/merge_join.go | 2 +- executor/sort.go | 4 ++-- planner/core/common_plans.go | 4 ++-- planner/core/plan_cost_ver1.go | 4 ++-- planner/core/plan_cost_ver2.go | 2 +- server/rpc_server.go | 9 ++++---- session/nontransactional.go | 23 +++----------------- session/session.go | 19 ++++++++++++++++- sessionctx/variable/session.go | 12 +++++++++++ store/copr/coprocessor.go | 5 ----- util/chunk/row_container.go | 6 ------ util/expensivequery/expensivequery.go | 10 ++++----- util/expensivequery/memory_usage_alarm.go | 2 +- util/memory/action.go | 12 +++++------ util/memory/tracker.go | 2 ++ util/memory/tracker_test.go | 3 --- util/mock/context.go | 26 +++++++++++++---------- util/processinfo.go | 12 +++++++---- 25 files changed, 99 insertions(+), 110 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index 7db99424dc91f..58da33eb12c7f 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -88,7 +88,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) option := &kv.ClientSendOption{ - SessionMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker, + SessionMemTracker: sctx.GetSessionVars().MemTracker, EnabledRateLimitAction: enabledRateLimitAction, EventCb: eventCb, EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo, diff --git a/executor/adapter.go b/executor/adapter.go index 27f1824bc07f5..fb87f9058e4f4 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -479,7 +479,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { } if sctx.GetSessionVars().StmtCtx.HasMemQuotaHint { - sctx.GetSessionVars().StmtCtx.MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery) + sctx.GetSessionVars().MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery) } e, err := a.buildExecutor() @@ -1324,8 +1324,8 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { execDetail := stmtCtx.GetExecDetails() copTaskInfo := stmtCtx.CopTasksDetails() statsInfos := plannercore.GetStatsInfoFromFlatPlan(flat) - memMax := stmtCtx.MemTracker.MaxConsumed() - diskMax := stmtCtx.DiskTracker.MaxConsumed() + memMax := sessVars.MemTracker.MaxConsumed() + diskMax := sessVars.DiskTracker.MaxConsumed() _, planDigest := getPlanDigest(stmtCtx) binaryPlan := "" @@ -1595,8 +1595,8 @@ func (a *ExecStmt) SummaryStmt(succ bool) { execDetail := stmtCtx.GetExecDetails() copTaskInfo := stmtCtx.CopTasksDetails() - memMax := stmtCtx.MemTracker.MaxConsumed() - diskMax := stmtCtx.DiskTracker.MaxConsumed() + memMax := sessVars.MemTracker.MaxConsumed() + diskMax := sessVars.DiskTracker.MaxConsumed() sql := a.GetTextToLog() var stmtDetail execdetails.StmtExecDetails stmtDetailRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey) diff --git a/executor/aggregate.go b/executor/aggregate.go index d6ec79412b7f6..8433328eb1e07 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -330,11 +330,11 @@ func (e *HashAggExec) initForUnparallelExec() { e.executed, e.isChildDrained = false, false e.listInDisk = chunk.NewListInDisk(retTypes(e.children[0])) e.tmpChkForSpill = newFirstChunk(e.children[0]) - if e.ctx.GetSessionVars().TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() { + if vars := e.ctx.GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() { e.diskTracker = disk.NewTracker(e.id, -1) - e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker) e.listInDisk.GetDiskTracker().AttachTo(e.diskTracker) - e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill()) + vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill()) } } @@ -1951,6 +1951,3 @@ func (a *AggSpillDiskAction) Action(t *memory.Tracker) { func (*AggSpillDiskAction) GetPriority() int64 { return memory.DefSpillPriority } - -// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface. -func (*AggSpillDiskAction) SetLogHook(_ func(uint64)) {} diff --git a/executor/cte.go b/executor/cte.go index 84389f9439214..b21ddf15c9ccc 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -438,7 +438,7 @@ func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentM actionSpill = tbl.(*cteutil.StorageRC).ActionSpillForTest() } }) - ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill) + ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill) } return actionSpill } diff --git a/executor/executor.go b/executor/executor.go index 5e13783a489a3..bdee804baec40 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -167,9 +167,6 @@ func init() { schematracker.ConstructResultOfShowCreateTable = ConstructResultOfShowCreateTable } -// SetLogHook sets a hook for PanicOnExceed. -func (a *globalPanicOnExceed) SetLogHook(hook func(uint64)) {} - // Action panics when storage usage exceeds storage quota. func (a *globalPanicOnExceed) Action(t *memory.Tracker) { a.mutex.Lock() @@ -1939,29 +1936,17 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { if _, ok := s.(*ast.AnalyzeTableStmt); ok { sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1) - sc.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker) + sc.MemTracker.AttachTo(vars.MemTracker) } else { sc.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery) - sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker) - sc.MemTracker.IsRootTrackerOfSess, sc.MemTracker.SessionID = true, vars.ConnectionID + sc.MemTracker.AttachTo(vars.MemTracker) + sc.MemTracker.SessionID = vars.ConnectionID } sc.InitDiskTracker(memory.LabelForSQLText, -1) globalConfig := config.GetGlobalConfig() - if variable.EnableTmpStorageOnOOM.Load() && GlobalDiskUsageTracker != nil { - sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker) - } - switch variable.OOMAction.Load() { - case variable.OOMActionCancel: - action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID} - action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) - sc.MemTracker.SetActionOnExceed(action) - case variable.OOMActionLog: - fallthrough - default: - action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID} - action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) - sc.MemTracker.SetActionOnExceed(action) + if variable.EnableTmpStorageOnOOM.Load() && vars.DiskTracker != nil { + sc.DiskTracker.AttachTo(vars.DiskTracker) } if execStmt, ok := s.(*ast.ExecuteStmt); ok { prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) diff --git a/executor/explain.go b/executor/explain.go index 1fef25865bc59..4de3f49831898 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -114,7 +114,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) { minHeapInUse: mathutil.Abs(minHeapInUse), alarmRatio: alarmRatio, autoGC: minHeapInUse > 0, - memTracker: e.ctx.GetSessionVars().StmtCtx.MemTracker, + memTracker: e.ctx.GetSessionVars().MemTracker, wg: &waitGroup, }).run() } diff --git a/executor/join.go b/executor/join.go index 706895e44d06f..656a79c4dfc84 100644 --- a/executor/join.go +++ b/executor/join.go @@ -1204,7 +1204,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu defer actionSpill.(*chunk.SpillDiskAction).WaitForTest() } }) - e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill) + e.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill) } for chk := range buildSideResultCh { if e.finished.Load().(bool) { diff --git a/executor/merge_join.go b/executor/merge_join.go index e8d195e3085ae..233d140ade678 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -100,7 +100,7 @@ func (t *mergeJoinTable) init(exec *MergeJoinExec) { actionSpill = t.rowContainer.ActionSpillForTest() } }) - exec.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill) + exec.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(actionSpill) } t.memTracker = memory.NewTracker(memory.LabelForInnerTable, -1) } else { diff --git a/executor/sort.go b/executor/sort.go index efc56aa058d2a..0574798d12bce 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -189,7 +189,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { defer e.spillAction.WaitForTest() } }) - e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(e.spillAction) + e.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(e.spillAction) e.rowChunks.GetDiskTracker().AttachTo(e.diskTracker) e.rowChunks.GetDiskTracker().SetLabel(memory.LabelForRowChunks) } @@ -218,7 +218,7 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { defer e.spillAction.WaitForTest() } }) - e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(e.spillAction) + e.ctx.GetSessionVars().MemTracker.FallbackOldAndSetNewAction(e.spillAction) err = e.rowChunks.Add(chk) } if err != nil { diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index a814089ecd2bb..256299b555009 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -721,8 +721,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) { copStats = runtimeStatsColl.GetCopStats(explainID) } - memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID()) - diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID()) + memTracker = ctx.GetSessionVars().MemTracker.SearchTrackerWithoutLock(p.ID()) + diskTracker = ctx.GetSessionVars().DiskTracker.SearchTrackerWithoutLock(p.ID()) return } diff --git a/planner/core/plan_cost_ver1.go b/planner/core/plan_cost_ver1.go index 49a4cd32618c5..9b8808a72843c 100644 --- a/planner/core/plan_cost_ver1.go +++ b/planner/core/plan_cost_ver1.go @@ -848,7 +848,7 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint } sessVars := p.ctx.GetSessionVars() oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load() - memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint + memQuota := sessVars.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint rowSize := getAvgRowSize(build.statsInfo(), build.Schema()) spill := oomUseTmpStorage && memQuota > 0 && rowSize*buildCnt > float64(memQuota) && p.storeTp != kv.TiFlash // Cost of building hash table. @@ -1045,7 +1045,7 @@ func (p *PhysicalSort) GetCost(count float64, schema *expression.Schema) float64 memoryCost := count * sessVars.GetMemoryFactor() oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load() - memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint + memQuota := sessVars.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint rowSize := getAvgRowSize(p.statsInfo(), schema) spill := oomUseTmpStorage && memQuota > 0 && rowSize*count > float64(memQuota) diskCost := count * sessVars.GetDiskFactor() * rowSize diff --git a/planner/core/plan_cost_ver2.go b/planner/core/plan_cost_ver2.go index b7f09c32bdce2..736cef38977e0 100644 --- a/planner/core/plan_cost_ver2.go +++ b/planner/core/plan_cost_ver2.go @@ -315,7 +315,7 @@ func (p *PhysicalSort) getPlanCostVer2(taskType property.TaskType, option *PlanC memFactor := getTaskMemFactorVer2(p, taskType) diskFactor := defaultVer2Factors.TiDBDisk oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load() - memQuota := p.ctx.GetSessionVars().StmtCtx.MemTracker.GetBytesLimit() + memQuota := p.ctx.GetSessionVars().MemTracker.GetBytesLimit() spill := taskType == property.RootTaskType && // only TiDB can spill oomUseTmpStorage && // spill is enabled memQuota > 0 && // mem-quota is set diff --git a/server/rpc_server.go b/server/rpc_server.go index 7053872f19aa7..893b923d41a77 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -192,7 +192,7 @@ func (s *rpcServer) handleCopRequest(ctx context.Context, req *coprocessor.Reque defer func() { sc := se.GetSessionVars().StmtCtx if sc.MemTracker != nil { - sc.MemTracker.DetachFromGlobalTracker() + sc.MemTracker.Detach() } se.Close() }() @@ -222,13 +222,12 @@ func (s *rpcServer) createSession() (session.Session, error) { // TODO: remove this. vars.SetHashAggPartialConcurrency(1) vars.SetHashAggFinalConcurrency(1) - vars.StmtCtx.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery) - vars.StmtCtx.MemTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker) + vars.StmtCtx.InitMemTracker(memory.LabelForSQLText, -1) + vars.StmtCtx.MemTracker.AttachTo(vars.MemTracker) switch variable.OOMAction.Load() { case variable.OOMActionCancel: action := &memory.PanicOnExceed{} - action.SetLogHook(domain.GetDomain(se).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) - vars.StmtCtx.MemTracker.SetActionOnExceed(action) + vars.MemTracker.SetActionOnExceed(action) } se.SetSessionManager(s.sm) return se, nil diff --git a/session/nontransactional.go b/session/nontransactional.go index ef8adb541203e..3606b2aeee297 100644 --- a/session/nontransactional.go +++ b/session/nontransactional.go @@ -94,8 +94,9 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona // TODO: choose an appropriate quota. // Use the mem-quota-query as a workaround. As a result, a NT-DML may consume 2x of the memory quota. - memTracker := setMemTracker(se) - defer memTracker.DetachFromGlobalTracker() + memTracker := memory.NewTracker(memory.LabelForNonTransactionalDML, -1) + memTracker.AttachTo(se.GetSessionVars().MemTracker) + defer memTracker.Detach() jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo, memTracker) if err != nil { return nil, err @@ -111,24 +112,6 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona return buildExecuteResults(ctx, jobs, se.GetSessionVars().BatchSize.MaxChunkSize, se.GetSessionVars().EnableRedactLog) } -func setMemTracker(se Session) *memory.Tracker { - memTracker := memory.NewTracker(memory.LabelForNonTransactionalDML, se.GetSessionVars().MemQuotaQuery) - switch variable.OOMAction.Load() { - case variable.OOMActionCancel: - action := &memory.PanicOnExceed{ConnID: se.GetSessionVars().ConnectionID} - action.SetLogHook(domain.GetDomain(se).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) - memTracker.SetActionOnExceed(action) - case variable.OOMActionLog: - fallthrough - default: - action := &memory.LogOnExceed{ConnID: se.GetSessionVars().ConnectionID} - action.SetLogHook(domain.GetDomain(se).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) - memTracker.SetActionOnExceed(action) - } - memTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker) - return memTracker -} - func checkConstraint(stmt *ast.NonTransactionalDeleteStmt, se Session) error { sessVars := se.GetSessionVars() if !(sessVars.IsAutocommit() && !sessVars.InTxn()) { diff --git a/session/session.go b/session/session.go index e40b82cf07052..d214763adc0a0 100644 --- a/session/session.go +++ b/session/session.go @@ -90,6 +90,7 @@ import ( "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/logutil/consistency" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -1501,6 +1502,8 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu Info: sql, CurTxnStartTS: curTxnStartTS, StmtCtx: s.sessionVars.StmtCtx, + MemTracker: s.sessionVars.MemTracker, + DiskTracker: s.sessionVars.DiskTracker, StatsInfo: plannercore.GetStatsInfo, MaxExecutionTime: maxExecutionTime, RedactSQL: s.sessionVars.EnableRedactLog, @@ -1994,7 +1997,8 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex return nil, err } - s.sessionVars.StartTime = time.Now() + sessVars := s.sessionVars + sessVars.StartTime = time.Now() // Some executions are done in compile stage, so we reset them before compile. if err := executor.ResetContextOfStmt(s, stmtNode); err != nil { @@ -2947,6 +2951,19 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er s.mu.values = make(map[fmt.Stringer]interface{}) s.lockedTables = make(map[int64]model.TableLockTpInfo) domain.BindDomain(s, dom) + logOnQueryExceedMemQuota := dom.ExpensiveQueryHandle().LogOnQueryExceedMemQuota + switch config.GetGlobalConfig().OOMAction { + case variable.OOMActionCancel: + action := &memory.PanicOnExceed{ConnID: s.sessionVars.ConnectionID} + action.SetLogHook(logOnQueryExceedMemQuota) + s.sessionVars.MemTracker.SetActionOnExceed(action) + case variable.OOMActionLog: + fallthrough + default: + action := &memory.LogOnExceed{ConnID: s.sessionVars.ConnectionID} + action.SetLogHook(logOnQueryExceedMemQuota) + s.sessionVars.MemTracker.SetActionOnExceed(action) + } // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s s.txn.init() diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 4d3d16f351965..6e5c86191bbe5 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -45,10 +45,13 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" @@ -601,6 +604,7 @@ func (s *SessionVars) GetUserVarType(name string) (*types.FieldType, bool) { // HookContext contains the necessary variables for executing set/get hook type HookContext interface { GetStore() kv.Storage + GetSessionManager() util.SessionManager } // SessionVars is to handle user-defined or global variables in the current session. @@ -1272,6 +1276,10 @@ type SessionVars struct { LastPlanReplayerToken string HookContext + + // MemTracker indicates the memory tracker of current session. + MemTracker *memory.Tracker + DiskTracker *memory.Tracker } // GetPreparedStmtByName returns the prepared statement specified by stmtName. @@ -1600,6 +1608,10 @@ func NewSessionVars(hctx HookContext) *SessionVars { vars.enforceMPPExecution = DefTiDBEnforceMPPExecution vars.TiFlashMaxThreads = DefTiFlashMaxThreads vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL + vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1) + vars.DiskTracker.IsRootTrackerOfSess = true + vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery) + vars.MemTracker.IsRootTrackerOfSess = true for _, engine := range config.GetGlobalConfig().IsolationRead.Engines { switch engine { diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index aa52fa3356c2e..cb7729d33dfcc 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -1308,11 +1308,6 @@ func (e *rateLimitAction) Action(t *memory.Tracker) { }) } -// SetLogHook implements ActionOnExceed.SetLogHook -func (e *rateLimitAction) SetLogHook(hook func(uint64)) { - -} - // GetPriority get the priority of the Action. func (e *rateLimitAction) GetPriority() int64 { return memory.DefRateLimitPriority diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index b4c73ae8e3cab..2053d08ab90f5 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -411,9 +411,6 @@ func (a *SpillDiskAction) Reset() { a.once = sync.Once{} } -// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface. -func (*SpillDiskAction) SetLogHook(_ func(uint64)) {} - // GetPriority get the priority of the Action. func (*SpillDiskAction) GetPriority() int64 { return memory.DefSpillPriority @@ -607,9 +604,6 @@ func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) { } } -// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface. -func (*SortAndSpillDiskAction) SetLogHook(_ func(uint64)) {} - // WaitForTest waits all goroutine have gone. func (a *SortAndSpillDiskAction) WaitForTest() { a.testWg.Wait() diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index c2dab7582fa5c..deaf7061639fc 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -68,7 +68,7 @@ func (eqh *Handle) Run() { costTime := time.Since(info.Time) if !info.ExceedExpensiveTimeThresh && costTime >= time.Second*time.Duration(threshold) && log.GetLevel() <= zapcore.WarnLevel { - logExpensiveQuery(costTime, info) + logExpensiveQuery(costTime, info, "expensive_query") info.ExceedExpensiveTimeThresh = true } if info.MaxExecutionTime > 0 && costTime > time.Duration(info.MaxExecutionTime)*time.Millisecond { @@ -116,7 +116,7 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) { if !ok { return } - logExpensiveQuery(time.Since(info.Time), info) + logExpensiveQuery(time.Since(info.Time), info, "memory exceeds quota") } func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { @@ -165,7 +165,7 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { logFields = append(logFields, zap.String("index_names", indexNames)) } logFields = append(logFields, zap.Uint64("txn_start_ts", info.CurTxnStartTS)) - if memTracker := info.StmtCtx.MemTracker; memTracker != nil { + if memTracker := info.MemTracker; memTracker != nil { logFields = append(logFields, zap.String("mem_max", fmt.Sprintf("%d Bytes (%v)", memTracker.MaxConsumed(), memTracker.FormatBytes(memTracker.MaxConsumed())))) } @@ -185,6 +185,6 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { } // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. -func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) { - logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...) +func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) { + logutil.BgLogger().Warn(msg, genLogFields(costTime, info)...) } diff --git a/util/expensivequery/memory_usage_alarm.go b/util/expensivequery/memory_usage_alarm.go index 675d9cc9e2ef9..527324f2b92e9 100644 --- a/util/expensivequery/memory_usage_alarm.go +++ b/util/expensivequery/memory_usage_alarm.go @@ -210,7 +210,7 @@ func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager) { _, err = f.WriteString("The 10 SQLs with the most memory usage for OOM analysis\n") printTop10(func(i, j *util.ProcessInfo) bool { - return i.StmtCtx.MemTracker.MaxConsumed() > j.StmtCtx.MemTracker.MaxConsumed() + return i.MemTracker.MaxConsumed() > j.MemTracker.MaxConsumed() }) _, err = f.WriteString("The 10 SQLs with the most time usage for OOM analysis\n") diff --git a/util/memory/action.go b/util/memory/action.go index 2ad4f76dcb695..591b4d1d61c73 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -31,9 +31,6 @@ type ActionOnExceed interface { // Action will be called when memory usage exceeds memory quota by the // corresponding Tracker. Action(t *Tracker) - // SetLogHook binds a log hook which will be triggered and log an detailed - // message for the out-of-memory sql. - SetLogHook(hook func(uint64)) // SetFallback sets a fallback action which will be triggered if itself has // already been triggered. SetFallback(a ActionOnExceed) @@ -134,12 +131,15 @@ func (a *PanicOnExceed) SetLogHook(hook func(uint64)) { } // Action panics when memory usage exceeds memory quota. -func (a *PanicOnExceed) Action(_ *Tracker) { +func (a *PanicOnExceed) Action(t *Tracker) { a.mutex.Lock() if !a.acted { - if a.logHook != nil { - a.logHook(a.ConnID) + if a.logHook == nil { + logutil.BgLogger().Warn("memory exceeds quota", + zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String()))) + return } + a.logHook(a.ConnID) } a.acted = true a.mutex.Unlock() diff --git a/util/memory/tracker.go b/util/memory/tracker.go index c967f47a05509..16c4af44fc44b 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -763,6 +763,8 @@ const ( LabelForAnalyzeMemory int = -24 // LabelForGlobalAnalyzeMemory represents the label of the global memory of all analyze jobs LabelForGlobalAnalyzeMemory int = -25 + // LabelForSession represents the label of a session. + LabelForSession int = -26 ) // MetricsTypes is used to get label for metrics diff --git a/util/memory/tracker_test.go b/util/memory/tracker_test.go index baa6461ac76c5..cdfc4ff353435 100644 --- a/util/memory/tracker_test.go +++ b/util/memory/tracker_test.go @@ -251,9 +251,6 @@ type mockAction struct { priority int64 } -func (a *mockAction) SetLogHook(hook func(uint64)) { -} - func (a *mockAction) Action(t *Tracker) { if a.called && a.fallbackAction != nil { a.fallbackAction.Action(t) diff --git a/util/mock/context.go b/util/mock/context.go index fd4c4d8bef376..b362db8ac4e77 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -443,17 +443,21 @@ func NewContext() *Context { ctx: ctx, cancel: cancel, } - sctx.sessionVars = variable.NewSessionVars(sctx) - sctx.sessionVars.InitChunkSize = 2 - sctx.sessionVars.MaxChunkSize = 32 - sctx.sessionVars.StmtCtx.TimeZone = time.UTC - sctx.sessionVars.StmtCtx.MemTracker = memory.NewTracker(-1, -1) - sctx.sessionVars.StmtCtx.DiskTracker = disk.NewTracker(-1, -1) - sctx.sessionVars.GlobalVarsAccessor = variable.NewMockGlobalAccessor() - sctx.sessionVars.EnablePaging = variable.DefTiDBEnablePaging - sctx.sessionVars.MinPagingSize = variable.DefMinPagingSize - sctx.sessionVars.CostModelVersion = variable.DefTiDBCostModelVer - sctx.sessionVars.EnableChunkRPC = true + vars := variable.NewSessionVars(sctx) + sctx.sessionVars = vars + vars.InitChunkSize = 2 + vars.MaxChunkSize = 32 + vars.StmtCtx.TimeZone = time.UTC + vars.MemTracker.SetBytesLimit(-1) + vars.DiskTracker.SetBytesLimit(-1) + vars.StmtCtx.MemTracker, vars.StmtCtx.DiskTracker = memory.NewTracker(-1, -1), disk.NewTracker(-1, -1) + vars.MemTracker.AttachTo(vars.MemTracker) + vars.DiskTracker.AttachTo(vars.DiskTracker) + vars.GlobalVarsAccessor = variable.NewMockGlobalAccessor() + vars.EnablePaging = variable.DefTiDBEnablePaging + vars.MinPagingSize = variable.DefMinPagingSize + vars.CostModelVersion = variable.DefTiDBCostModelVer + vars.EnableChunkRPC = true if err := sctx.GetSessionVars().SetSystemVar(variable.MaxAllowedPacket, "67108864"); err != nil { panic(err) } diff --git a/util/processinfo.go b/util/processinfo.go index f3ac796bb4dbb..accb96b8fefb7 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -25,7 +25,9 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/memory" "github.com/tikv/client-go/v2/oracle" ) @@ -34,6 +36,8 @@ type ProcessInfo struct { Time time.Time Plan interface{} StmtCtx *stmtctx.StatementContext + MemTracker *memory.Tracker + DiskTracker *disk.Tracker StatsInfo func(interface{}) map[string]uint64 RuntimeStatsColl *execdetails.RuntimeStatsColl DB string @@ -106,11 +110,11 @@ func (pi *ProcessInfo) ToRow(tz *time.Location) []interface{} { bytesConsumed := int64(0) diskConsumed := int64(0) if pi.StmtCtx != nil { - if pi.StmtCtx.MemTracker != nil { - bytesConsumed = pi.StmtCtx.MemTracker.BytesConsumed() + if pi.MemTracker != nil { + bytesConsumed = pi.MemTracker.BytesConsumed() } - if pi.StmtCtx.DiskTracker != nil { - diskConsumed = pi.StmtCtx.DiskTracker.BytesConsumed() + if pi.DiskTracker != nil { + diskConsumed = pi.DiskTracker.BytesConsumed() } } return append(pi.ToRowForShow(true), pi.Digest, bytesConsumed, diskConsumed, pi.txnStartTs(tz)) From 1efdaeb7a74d08cc89f9e17a298cc9cffdd4d668 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 12 Oct 2022 15:51:50 +0800 Subject: [PATCH 02/19] resolve conflicts --- session/session.go | 1 + util/expensivequery/memory_usage_alarm.go | 262 ---------------------- util/processinfo.go | 2 +- 3 files changed, 2 insertions(+), 263 deletions(-) delete mode 100644 util/expensivequery/memory_usage_alarm.go diff --git a/session/session.go b/session/session.go index 11078567b468e..a01c391dc7561 100644 --- a/session/session.go +++ b/session/session.go @@ -1561,6 +1561,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu Command: command, Plan: p, PlanExplainRows: plannercore.GetExplainRowsForPlan(p), + CurrentAnalyzeRows: s.getCurrentAnalyzePlan, RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, Time: t, State: s.Status(), diff --git a/util/expensivequery/memory_usage_alarm.go b/util/expensivequery/memory_usage_alarm.go deleted file mode 100644 index 527324f2b92e9..0000000000000 --- a/util/expensivequery/memory_usage_alarm.go +++ /dev/null @@ -1,262 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package expensivequery - -import ( - "fmt" - "os" - "path/filepath" - "runtime" - rpprof "runtime/pprof" - "strings" - "time" - - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/disk" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/memory" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "golang.org/x/exp/slices" -) - -type memoryUsageAlarm struct { - lastCheckTime time.Time - err error - tmpDir string - lastLogFileName []string - lastProfileFileName [][]string - serverMemoryQuota uint64 - memoryUsageAlarmRatio float64 - initialized bool - isServerMemoryQuotaSet bool -} - -func (record *memoryUsageAlarm) initMemoryUsageAlarmRecord() { - if quota := config.GetGlobalConfig().Performance.ServerMemoryQuota; quota != 0 { - record.serverMemoryQuota = quota - record.isServerMemoryQuotaSet = true - } else { - record.serverMemoryQuota, record.err = memory.MemTotal() - if record.err != nil { - logutil.BgLogger().Error("get system total memory fail", zap.Error(record.err)) - return - } - record.isServerMemoryQuotaSet = false - } - record.lastCheckTime = time.Time{} - record.tmpDir = filepath.Join(config.GetGlobalConfig().TempStoragePath, "record") - if record.err = disk.CheckAndCreateDir(record.tmpDir); record.err != nil { - return - } - record.lastProfileFileName = make([][]string, 2) - // Read last records - files, err := os.ReadDir(record.tmpDir) - if err != nil { - record.err = err - return - } - for _, f := range files { - name := filepath.Join(record.tmpDir, f.Name()) - if strings.Contains(f.Name(), "running_sql") { - record.lastLogFileName = append(record.lastLogFileName, name) - } - if strings.Contains(f.Name(), "heap") { - record.lastProfileFileName[0] = append(record.lastProfileFileName[0], name) - } - if strings.Contains(f.Name(), "goroutine") { - record.lastProfileFileName[1] = append(record.lastProfileFileName[1], name) - } - } - record.initialized = true -} - -// If Performance.ServerMemoryQuota is set, use `ServerMemoryQuota * MemoryUsageAlarmRatio` to check oom risk. -// If Performance.ServerMemoryQuota is not set, use `system total memory size * MemoryUsageAlarmRatio` to check oom risk. -func (record *memoryUsageAlarm) alarm4ExcessiveMemUsage(sm util.SessionManager) { - if record.memoryUsageAlarmRatio <= 0.0 || record.memoryUsageAlarmRatio >= 1.0 { - return - } - if !record.initialized { - record.initMemoryUsageAlarmRecord() - if record.err != nil { - return - } - } - - var memoryUsage uint64 - instanceStats := &runtime.MemStats{} - runtime.ReadMemStats(instanceStats) - if record.isServerMemoryQuotaSet { - memoryUsage = instanceStats.HeapAlloc - } else { - memoryUsage, record.err = memory.MemUsed() - if record.err != nil { - logutil.BgLogger().Error("get system memory usage fail", zap.Error(record.err)) - return - } - } - - // TODO: Consider NextGC to record SQLs. - if float64(memoryUsage) > float64(record.serverMemoryQuota)*record.memoryUsageAlarmRatio { - // At least ten seconds between two recordings that memory usage is less than threshold (default 80% system memory). - // If the memory is still exceeded, only records once. - interval := time.Since(record.lastCheckTime) - record.lastCheckTime = time.Now() - if interval > 10*time.Second { - record.doRecord(memoryUsage, instanceStats.HeapAlloc, sm) - } - } -} - -func (record *memoryUsageAlarm) doRecord(memUsage uint64, instanceMemoryUsage uint64, sm util.SessionManager) { - fields := make([]zap.Field, 0, 6) - fields = append(fields, zap.Bool("is server-memory-quota set", record.isServerMemoryQuotaSet)) - if record.isServerMemoryQuotaSet { - fields = append(fields, zap.Any("server-memory-quota", record.serverMemoryQuota)) - fields = append(fields, zap.Any("tidb-server memory usage", memUsage)) - } else { - fields = append(fields, zap.Any("system memory total", record.serverMemoryQuota)) - fields = append(fields, zap.Any("system memory usage", memUsage)) - fields = append(fields, zap.Any("tidb-server memory usage", instanceMemoryUsage)) - } - fields = append(fields, zap.Any("memory-usage-alarm-ratio", record.memoryUsageAlarmRatio)) - fields = append(fields, zap.Any("record path", record.tmpDir)) - - logutil.BgLogger().Warn("tidb-server has the risk of OOM. Running SQLs and heap profile will be recorded in record path", fields...) - - if record.err = disk.CheckAndCreateDir(record.tmpDir); record.err != nil { - return - } - record.recordSQL(sm) - record.recordProfile() - - tryRemove := func(filename *[]string) { - // Keep the last 5 files - for len(*filename) > 5 { - err := os.Remove((*filename)[0]) - if err != nil { - logutil.BgLogger().Error("remove temp files failed", zap.Error(err)) - } - *filename = (*filename)[1:] - } - } - tryRemove(&record.lastLogFileName) - for i := range record.lastProfileFileName { - tryRemove(&record.lastProfileFileName[i]) - } -} - -func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager) { - processInfo := sm.ShowProcessList() - pinfo := make([]*util.ProcessInfo, 0, len(processInfo)) - for _, info := range processInfo { - if len(info.Info) != 0 { - pinfo = append(pinfo, info) - } - } - - fileName := filepath.Join(record.tmpDir, "running_sql"+record.lastCheckTime.Format(time.RFC3339)) - record.lastLogFileName = append(record.lastLogFileName, fileName) - f, err := os.Create(fileName) - if err != nil { - logutil.BgLogger().Error("create oom record file fail", zap.Error(err)) - return - } - defer func() { - err := f.Close() - if err != nil { - logutil.BgLogger().Error("close oom record file fail", zap.Error(err)) - } - }() - printTop10 := func(cmp func(i, j *util.ProcessInfo) bool) { - slices.SortFunc(pinfo, cmp) - list := pinfo - if len(list) > 10 { - list = list[:10] - } - var buf strings.Builder - for i, info := range list { - buf.WriteString(fmt.Sprintf("SQL %v: \n", i)) - fields := genLogFields(record.lastCheckTime.Sub(info.Time), info) - for _, field := range fields { - switch field.Type { - case zapcore.StringType: - buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.String)) - case zapcore.Uint8Type, zapcore.Uint16Type, zapcore.Uint32Type, zapcore.Uint64Type: - buf.WriteString(fmt.Sprintf("%v: %v", field.Key, uint64(field.Integer))) - case zapcore.Int8Type, zapcore.Int16Type, zapcore.Int32Type, zapcore.Int64Type: - buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer)) - } - buf.WriteString("\n") - } - } - buf.WriteString("\n") - _, err = f.WriteString(buf.String()) - } - - _, err = f.WriteString("The 10 SQLs with the most memory usage for OOM analysis\n") - printTop10(func(i, j *util.ProcessInfo) bool { - return i.MemTracker.MaxConsumed() > j.MemTracker.MaxConsumed() - }) - - _, err = f.WriteString("The 10 SQLs with the most time usage for OOM analysis\n") - printTop10(func(i, j *util.ProcessInfo) bool { - return i.Time.Before(j.Time) - }) -} - -type item struct { - Name string - Debug int -} - -func (record *memoryUsageAlarm) recordProfile() { - items := []item{ - {Name: "heap"}, - {Name: "goroutine", Debug: 2}, - } - for i, item := range items { - err := record.write(i, item) - if err != nil { - return - } - } -} - -func (record *memoryUsageAlarm) write(i int, item item) error { - fileName := filepath.Join(record.tmpDir, item.Name+record.lastCheckTime.Format(time.RFC3339)) - record.lastProfileFileName[i] = append(record.lastProfileFileName[i], fileName) - f, err := os.Create(fileName) - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("create %v profile file fail", item.Name), zap.Error(err)) - return err - } - //nolint: revive - defer func() { - err := f.Close() - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("close %v profile file fail", item.Name), zap.Error(err)) - } - }() - p := rpprof.Lookup(item.Name) - err = p.WriteTo(f, item.Debug) - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("write %v profile file fail", item.Name), zap.Error(err)) - return err - } - return nil -} diff --git a/util/processinfo.go b/util/processinfo.go index 3f3cc0bba2310..a5b307841252f 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -15,7 +15,6 @@ package util import ( - "context" "crypto/tls" "errors" "fmt" @@ -54,6 +53,7 @@ type ProcessInfo struct { Info string Port string PlanExplainRows [][]string + CurrentAnalyzeRows func(interface{}, *execdetails.RuntimeStatsColl) [][]string OOMAlarmVariablesInfo OOMAlarmVariablesInfo CurTxnStartTS uint64 ID uint64 From c21b0aab81637c51bc0ed170429d62c21983e767 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 12 Oct 2022 15:54:54 +0800 Subject: [PATCH 03/19] polish --- sessionctx/variable/session.go | 1 - util/processinfo.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 6e5c86191bbe5..cd9300e97e42a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -604,7 +604,6 @@ func (s *SessionVars) GetUserVarType(name string) (*types.FieldType, bool) { // HookContext contains the necessary variables for executing set/get hook type HookContext interface { GetStore() kv.Storage - GetSessionManager() util.SessionManager } // SessionVars is to handle user-defined or global variables in the current session. diff --git a/util/processinfo.go b/util/processinfo.go index a5b307841252f..7034edb3f5fa7 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -42,6 +42,7 @@ type ProcessInfo struct { Time time.Time Plan interface{} StmtCtx *stmtctx.StatementContext + CurrentAnalyzeRows func(interface{}, *execdetails.RuntimeStatsColl) [][]string MemTracker *memory.Tracker DiskTracker *disk.Tracker StatsInfo func(interface{}) map[string]uint64 @@ -53,10 +54,9 @@ type ProcessInfo struct { Info string Port string PlanExplainRows [][]string - CurrentAnalyzeRows func(interface{}, *execdetails.RuntimeStatsColl) [][]string OOMAlarmVariablesInfo OOMAlarmVariablesInfo - CurTxnStartTS uint64 ID uint64 + CurTxnStartTS uint64 // MaxExecutionTime is the timeout for select statement, in milliseconds. // If the query takes too long, kill it. MaxExecutionTime uint64 From bd81b43a5b8f6fecb126796f4707f1945af0d140 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 12 Oct 2022 17:05:46 +0800 Subject: [PATCH 04/19] remove useless code --- sessionctx/variable/session.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cd9300e97e42a..30e0b5479d1e1 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -45,7 +45,6 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/execdetails" From e46def746bd404937fae376cb2a86dd8086e1cc2 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 12 Oct 2022 17:11:17 +0800 Subject: [PATCH 05/19] remove useless code --- session/nontransactional.go | 1 - 1 file changed, 1 deletion(-) diff --git a/session/nontransactional.go b/session/nontransactional.go index 3606b2aeee297..867e84157c99a 100644 --- a/session/nontransactional.go +++ b/session/nontransactional.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/errno" - "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" From c246165b943c82b2ec2804885fb65eeb26f322b6 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Thu, 13 Oct 2022 11:47:12 +0800 Subject: [PATCH 06/19] fix explain_test --- executor/executor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/executor.go b/executor/executor.go index bdee804baec40..90b9c2710e2c1 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1938,7 +1938,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1) sc.MemTracker.AttachTo(vars.MemTracker) } else { - sc.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery) + sc.InitMemTracker(memory.LabelForSQLText, -1) + vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) sc.MemTracker.AttachTo(vars.MemTracker) sc.MemTracker.SessionID = vars.ConnectionID } From 6a240ddcf07ad212e92d28bb5bd13775a5af74b4 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 17 Oct 2022 14:12:57 +0800 Subject: [PATCH 07/19] test --- executor/executor.go | 20 ++++++++++++++++++++ executor/oomtest/oom_test.go | 1 + planner/core/common_plans.go | 2 +- session/nontransactional.go | 1 + session/session.go | 14 -------------- sessionctx/variable/session.go | 3 +++ sessionctx/variable/sysvar.go | 1 + util/expensivequery/expensivequery.go | 1 + util/memory/action.go | 2 ++ util/memory/tracker.go | 7 +++++++ 10 files changed, 37 insertions(+), 15 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 26d65673295b7..77a0407aa29b8 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1938,9 +1938,29 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1) sc.MemTracker.AttachTo(vars.MemTracker) } else { + if vars.ConnectionID != 0 { + logutil.BgLogger().Error("reset context of stmt", zap.Int64("mem_quota_query", vars.MemQuotaQuery)) + } sc.InitMemTracker(memory.LabelForSQLText, -1) vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) sc.MemTracker.AttachTo(vars.MemTracker) + vars.MemTracker.SessionID = vars.ConnectionID + if vars.ConnectionID != 0 { + vars.MemTracker.SearchTrackerWithoutLock(-1) + } + logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota + switch variable.OOMAction.Load() { + case variable.OOMActionCancel: + action := &memory.PanicOnExceed{ConnID: vars.ConnectionID} + action.SetLogHook(logOnQueryExceedMemQuota) + vars.MemTracker.SetActionOnExceed(action) + case variable.OOMActionLog: + fallthrough + default: + action := &memory.LogOnExceed{ConnID: vars.ConnectionID} + action.SetLogHook(logOnQueryExceedMemQuota) + vars.MemTracker.SetActionOnExceed(action) + } sc.MemTracker.SessionID = vars.ConnectionID } diff --git a/executor/oomtest/oom_test.go b/executor/oomtest/oom_test.go index a8c86388c7bfe..6a1e4c5b46363 100644 --- a/executor/oomtest/oom_test.go +++ b/executor/oomtest/oom_test.go @@ -148,6 +148,7 @@ func TestMemTracker4DeleteExec(t *testing.T) { require.Equal(t, "", oom.GetTracker()) tk.MustExec("insert into MemTracker4DeleteExec1 values (1,1,1), (2,2,2), (3,3,3)") tk.Session().GetSessionVars().MemQuotaQuery = 1 + tk.Session().GetSessionVars().MemTracker.SetBytesLimit(1) tk.MustExec("delete from MemTracker4DeleteExec1") require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker()) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index ec98e8669a327..b6273228c5469 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -747,7 +747,7 @@ func (e *Explain) RenderResult() error { e.SCtx().GetSessionVars().MemoryDebugModeMinHeapInUse != 0 && e.SCtx().GetSessionVars().MemoryDebugModeAlarmRatio > 0 { row := e.Rows[0] - tracker := e.SCtx().GetSessionVars().StmtCtx.MemTracker + tracker := e.SCtx().GetSessionVars().MemTracker row[7] = row[7] + "(Total: " + tracker.FormatBytes(tracker.MaxConsumed()) + ")" } } diff --git a/session/nontransactional.go b/session/nontransactional.go index 867e84157c99a..2f85491005625 100644 --- a/session/nontransactional.go +++ b/session/nontransactional.go @@ -95,6 +95,7 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona // Use the mem-quota-query as a workaround. As a result, a NT-DML may consume 2x of the memory quota. memTracker := memory.NewTracker(memory.LabelForNonTransactionalDML, -1) memTracker.AttachTo(se.GetSessionVars().MemTracker) + se.GetSessionVars().MemTracker.SetBytesLimit(se.GetSessionVars().MemQuotaQuery) defer memTracker.Detach() jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo, memTracker) if err != nil { diff --git a/session/session.go b/session/session.go index a01c391dc7561..6390344db48ab 100644 --- a/session/session.go +++ b/session/session.go @@ -91,7 +91,6 @@ import ( "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/logutil/consistency" - "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -3036,19 +3035,6 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er s.mu.values = make(map[fmt.Stringer]interface{}) s.lockedTables = make(map[int64]model.TableLockTpInfo) domain.BindDomain(s, dom) - logOnQueryExceedMemQuota := dom.ExpensiveQueryHandle().LogOnQueryExceedMemQuota - switch config.GetGlobalConfig().OOMAction { - case variable.OOMActionCancel: - action := &memory.PanicOnExceed{ConnID: s.sessionVars.ConnectionID} - action.SetLogHook(logOnQueryExceedMemQuota) - s.sessionVars.MemTracker.SetActionOnExceed(action) - case variable.OOMActionLog: - fallthrough - default: - action := &memory.LogOnExceed{ConnID: s.sessionVars.ConnectionID} - action.SetLogHook(logOnQueryExceedMemQuota) - s.sessionVars.MemTracker.SetActionOnExceed(action) - } // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s s.txn.init() diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 30e0b5479d1e1..0c16a3b7701a0 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -20,6 +20,8 @@ import ( "crypto/tls" "encoding/binary" "fmt" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" "math" "math/rand" "net" @@ -1608,6 +1610,7 @@ func NewSessionVars(hctx HookContext) *SessionVars { vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1) vars.DiskTracker.IsRootTrackerOfSess = true + logutil.BgLogger().Error("stack", zap.Stack("stack")) vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery) vars.MemTracker.IsRootTrackerOfSess = true diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 42690602b32cf..1e9761b8dea9c 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1820,6 +1820,7 @@ var defaultSysVars = []*SysVar{ }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBMemQuotaQuery, Value: strconv.Itoa(DefTiDBMemQuotaQuery), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { s.MemQuotaQuery = TidbOptInt64(val, DefTiDBMemQuotaQuery) + s.MemTracker.SetBytesLimit(s.MemQuotaQuery) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { intVal := TidbOptInt64(normalizedValue, DefTiDBMemQuotaQuery) diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index e761d98ab05a2..6f782a98b3db2 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -111,5 +111,6 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) { // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) { + logutil.BgLogger().Error("here xhy") logutil.BgLogger().Warn(msg, util.GenLogFields(costTime, info, true)...) } diff --git a/util/memory/action.go b/util/memory/action.go index 591b4d1d61c73..159d7ef2a9576 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -103,6 +103,7 @@ func (a *LogOnExceed) Action(t *Tracker) { if !a.acted { a.acted = true if a.logHook == nil { + logutil.BgLogger().Error("LogOnExceed xhy") logutil.BgLogger().Warn("memory exceeds quota", zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String()))) return @@ -135,6 +136,7 @@ func (a *PanicOnExceed) Action(t *Tracker) { a.mutex.Lock() if !a.acted { if a.logHook == nil { + logutil.BgLogger().Error("PanicOnExceed xhy") logutil.BgLogger().Warn("memory exceeds quota", zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String()))) return diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 16c4af44fc44b..3de82218b5e41 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -17,6 +17,8 @@ package memory import ( "bytes" "fmt" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" "runtime" "strconv" "sync" @@ -366,6 +368,9 @@ func (t *Tracker) Consume(bs int64) { if bs == 0 { return } + if t.IsRootTrackerOfSess && t.SessionID != 0 { + logutil.BgLogger().Error("root sess tracker", zap.Int64("bytesConsumed", atomic.LoadInt64(&t.bytesConsumed))) + } var rootExceed, rootExceedForSoftLimit, sessionRootTracker *Tracker for tracker := t; tracker != nil; tracker = tracker.getParent() { if tracker.IsRootTrackerOfSess { @@ -530,11 +535,13 @@ func (t *Tracker) MaxConsumed() int64 { // SearchTrackerWithoutLock searches the specific tracker under this tracker without lock. func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker { + logutil.BgLogger().Error("t.label", zap.Int("t.label", t.label), zap.Int("label", label)) if t.label == label { return t } children := t.mu.children[label] if len(children) > 0 { + logutil.BgLogger().Error("label") return children[0] } return nil From 4597c2ca66d874052e87b536ee62ffdc0d09f501 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 19 Oct 2022 11:46:50 +0800 Subject: [PATCH 08/19] test --- executor/executor.go | 20 +++++++++++--------- sessionctx/variable/session.go | 4 +--- util/memory/action.go | 1 - util/memory/tracker.go | 17 ++++++++++++++++- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 77a0407aa29b8..6ac7fc5e187f4 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1934,20 +1934,18 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow + vars.MemTracker.UnbindActions() + vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) + vars.MemTracker.SessionID = vars.ConnectionID + if _, ok := s.(*ast.AnalyzeTableStmt); ok { sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1) - sc.MemTracker.AttachTo(vars.MemTracker) + vars.MemTracker.SetBytesLimit(-1) } else { if vars.ConnectionID != 0 { logutil.BgLogger().Error("reset context of stmt", zap.Int64("mem_quota_query", vars.MemQuotaQuery)) } - sc.InitMemTracker(memory.LabelForSQLText, -1) - vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) - sc.MemTracker.AttachTo(vars.MemTracker) - vars.MemTracker.SessionID = vars.ConnectionID - if vars.ConnectionID != 0 { - vars.MemTracker.SearchTrackerWithoutLock(-1) - } + sc.InitMemTracker(memory.LabelForSQLText, 128) logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota switch variable.OOMAction.Load() { case variable.OOMActionCancel: @@ -1961,7 +1959,11 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { action.SetLogHook(logOnQueryExceedMemQuota) vars.MemTracker.SetActionOnExceed(action) } - sc.MemTracker.SessionID = vars.ConnectionID + } + sc.MemTracker.SessionID = vars.ConnectionID + sc.MemTracker.AttachTo(vars.MemTracker) + if vars.ConnectionID != 0 { + vars.MemTracker.SearchTrackerWithoutLock(-1) } sc.InitDiskTracker(memory.LabelForSQLText, -1) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0c16a3b7701a0..cd28ff7c83250 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -20,8 +20,6 @@ import ( "crypto/tls" "encoding/binary" "fmt" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" "math" "math/rand" "net" @@ -1610,7 +1608,7 @@ func NewSessionVars(hctx HookContext) *SessionVars { vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1) vars.DiskTracker.IsRootTrackerOfSess = true - logutil.BgLogger().Error("stack", zap.Stack("stack")) + //logutil.BgLogger().Error("stack", zap.Stack("stack")) vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery) vars.MemTracker.IsRootTrackerOfSess = true diff --git a/util/memory/action.go b/util/memory/action.go index 159d7ef2a9576..38f3a695d63eb 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -103,7 +103,6 @@ func (a *LogOnExceed) Action(t *Tracker) { if !a.acted { a.acted = true if a.logHook == nil { - logutil.BgLogger().Error("LogOnExceed xhy") logutil.BgLogger().Warn("memory exceeds quota", zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String()))) return diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 3de82218b5e41..e3165cb2eb9de 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -227,6 +227,17 @@ func (t *Tracker) GetFallbackForTest(ignoreFinishedAction bool) ActionOnExceed { return t.actionMuForHardLimit.actionOnExceed } +// UnbindActions unbinds actionForHardLimit and actionForSoftLimit. +func (t *Tracker) UnbindActions() { + t.actionMuForSoftLimit.Lock() + defer t.actionMuForSoftLimit.Unlock() + t.actionMuForSoftLimit.actionOnExceed = nil + + t.actionMuForHardLimit.Lock() + defer t.actionMuForHardLimit.Unlock() + t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{} +} + // reArrangeFallback merge two action chains and rearrange them by priority in descending order. func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed { if a == nil { @@ -368,6 +379,10 @@ func (t *Tracker) Consume(bs int64) { if bs == 0 { return } + if t.SessionID != 0 { + //logutil.BgLogger().Error("tracker.Consume", zap.Int64("bytesConsumed", atomic.LoadInt64(&t.bytesConsumed)), zap.Int("label", t.label), zap.Int("parent label", t.getParent().label)) + logutil.BgLogger().Error("tracker.Consume", zap.Int64("bytesConsumed", atomic.LoadInt64(&t.bytesConsumed)), zap.Int("label", t.label)) + } if t.IsRootTrackerOfSess && t.SessionID != 0 { logutil.BgLogger().Error("root sess tracker", zap.Int64("bytesConsumed", atomic.LoadInt64(&t.bytesConsumed))) } @@ -535,7 +550,7 @@ func (t *Tracker) MaxConsumed() int64 { // SearchTrackerWithoutLock searches the specific tracker under this tracker without lock. func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker { - logutil.BgLogger().Error("t.label", zap.Int("t.label", t.label), zap.Int("label", label)) + //logutil.BgLogger().Error("t.label", zap.Int("t.label", t.label), zap.Int("label", label)) if t.label == label { return t } From 104761f790ea2b10a9914388d4e028f0233ce7f7 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 19 Oct 2022 16:00:36 +0800 Subject: [PATCH 09/19] fix --- executor/adapter.go | 2 +- executor/executor.go | 9 +-------- util/util.go | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index b134f32ce1235..612b9ecb7dbdc 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -582,7 +582,7 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic // If the stmt have no rs like `insert`, The session tracker detachment will be directly // done in the `defer` function. If the rs is not nil, the detachment will be done in // `rs.Close` in `handleStmt` - if sc != nil && rs == nil { + if handled && sc != nil && rs == nil { if sc.MemTracker != nil { sc.MemTracker.Detach() } diff --git a/executor/executor.go b/executor/executor.go index 6ac7fc5e187f4..109c3e7df8f57 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1942,10 +1942,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1) vars.MemTracker.SetBytesLimit(-1) } else { - if vars.ConnectionID != 0 { - logutil.BgLogger().Error("reset context of stmt", zap.Int64("mem_quota_query", vars.MemQuotaQuery)) - } - sc.InitMemTracker(memory.LabelForSQLText, 128) + sc.InitMemTracker(memory.LabelForSQLText, -1) logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota switch variable.OOMAction.Load() { case variable.OOMActionCancel: @@ -1962,10 +1959,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } sc.MemTracker.SessionID = vars.ConnectionID sc.MemTracker.AttachTo(vars.MemTracker) - if vars.ConnectionID != 0 { - vars.MemTracker.SearchTrackerWithoutLock(-1) - } - sc.InitDiskTracker(memory.LabelForSQLText, -1) globalConfig := config.GetGlobalConfig() if variable.EnableTmpStorageOnOOM.Load() && vars.DiskTracker != nil { diff --git a/util/util.go b/util/util.go index 485869adae2e3..8af2876240486 100644 --- a/util/util.go +++ b/util/util.go @@ -151,7 +151,7 @@ func GenLogFields(costTime time.Duration, info *ProcessInfo, needTruncateSQL boo logFields = append(logFields, zap.String("index_names", indexNames)) } logFields = append(logFields, zap.Uint64("txn_start_ts", info.CurTxnStartTS)) - if memTracker := info.StmtCtx.MemTracker; memTracker != nil { + if memTracker := info.MemTracker; memTracker != nil { logFields = append(logFields, zap.String("mem_max", fmt.Sprintf("%d Bytes (%v)", memTracker.MaxConsumed(), memTracker.FormatBytes(memTracker.MaxConsumed())))) } From 7b0f23f3fd95200b38b523ebfbe0a4012a600390 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 19 Oct 2022 16:02:56 +0800 Subject: [PATCH 10/19] ready for review --- util/memory/tracker.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index e3165cb2eb9de..63a79b05bc8f8 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -17,9 +17,6 @@ package memory import ( "bytes" "fmt" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" - "runtime" "strconv" "sync" "sync/atomic" @@ -379,13 +376,6 @@ func (t *Tracker) Consume(bs int64) { if bs == 0 { return } - if t.SessionID != 0 { - //logutil.BgLogger().Error("tracker.Consume", zap.Int64("bytesConsumed", atomic.LoadInt64(&t.bytesConsumed)), zap.Int("label", t.label), zap.Int("parent label", t.getParent().label)) - logutil.BgLogger().Error("tracker.Consume", zap.Int64("bytesConsumed", atomic.LoadInt64(&t.bytesConsumed)), zap.Int("label", t.label)) - } - if t.IsRootTrackerOfSess && t.SessionID != 0 { - logutil.BgLogger().Error("root sess tracker", zap.Int64("bytesConsumed", atomic.LoadInt64(&t.bytesConsumed))) - } var rootExceed, rootExceedForSoftLimit, sessionRootTracker *Tracker for tracker := t; tracker != nil; tracker = tracker.getParent() { if tracker.IsRootTrackerOfSess { @@ -550,13 +540,11 @@ func (t *Tracker) MaxConsumed() int64 { // SearchTrackerWithoutLock searches the specific tracker under this tracker without lock. func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker { - //logutil.BgLogger().Error("t.label", zap.Int("t.label", t.label), zap.Int("label", label)) if t.label == label { return t } children := t.mu.children[label] if len(children) > 0 { - logutil.BgLogger().Error("label") return children[0] } return nil From 69e045f298699702f7c8f6d08ed64f0e38a724dd Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 19 Oct 2022 16:10:47 +0800 Subject: [PATCH 11/19] remove useless code --- util/memory/tracker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 63a79b05bc8f8..d3facf1fb9df2 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -17,6 +17,7 @@ package memory import ( "bytes" "fmt" + "runtime" "strconv" "sync" "sync/atomic" From e641c2b358ee74d5f01b5d0c55cf0782d8474593 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 24 Oct 2022 14:27:28 +0800 Subject: [PATCH 12/19] fix ci --- util/memoryusagealarm/memoryusagealarm.go | 2 +- util/memoryusagealarm/memoryusagealarm_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/util/memoryusagealarm/memoryusagealarm.go b/util/memoryusagealarm/memoryusagealarm.go index 1415d27bb076a..3ee250e9cedcc 100644 --- a/util/memoryusagealarm/memoryusagealarm.go +++ b/util/memoryusagealarm/memoryusagealarm.go @@ -303,7 +303,7 @@ func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo) func (record *memoryUsageAlarm) getTop10SqlInfoByMemoryUsage(pinfo []*util.ProcessInfo) strings.Builder { return record.getTop10SqlInfo(func(i, j *util.ProcessInfo) bool { - return i.StmtCtx.MemTracker.MaxConsumed() > j.StmtCtx.MemTracker.MaxConsumed() + return i.MemTracker.MaxConsumed() > j.MemTracker.MaxConsumed() }, pinfo) } diff --git a/util/memoryusagealarm/memoryusagealarm_test.go b/util/memoryusagealarm/memoryusagealarm_test.go index 897ff82b3a22a..d98d3bb822475 100644 --- a/util/memoryusagealarm/memoryusagealarm_test.go +++ b/util/memoryusagealarm/memoryusagealarm_test.go @@ -106,7 +106,8 @@ func genMockProcessInfoList(memConsumeList []int64, startTimeList []time.Time, s tracker := memory.NewTracker(0, 0) tracker.Consume(memConsumeList[i]) processInfo := util.ProcessInfo{Time: startTimeList[i], - StmtCtx: &stmtctx.StatementContext{MemTracker: tracker}, + StmtCtx: &stmtctx.StatementContext{}, + MemTracker: tracker, StatsInfo: func(interface{}) map[string]uint64 { return map[string]uint64{} }, From 040d2a5368fe6bb8b6921243ce4b99c2b72ab2a5 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 24 Oct 2022 14:57:28 +0800 Subject: [PATCH 13/19] fix unit test --- util/util_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/util/util_test.go b/util/util_test.go index 152d5fe3e651f..7eb06e1071073 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -38,10 +38,9 @@ func TestLogFormat(t *testing.T) { StatsInfo: func(interface{}) map[string]uint64 { return nil }, - StmtCtx: &stmtctx.StatementContext{ - MemTracker: mem, - }, - RedactSQL: false, + StmtCtx: &stmtctx.StatementContext{}, + MemTracker: mem, + RedactSQL: false, } costTime := time.Second * 233 logSQLTruncateLen := 1024 * 8 From 485ab5db65d4693d5d5d92350a743704fb0cb22e Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 25 Oct 2022 11:26:04 +0800 Subject: [PATCH 14/19] fix unit test --- executor/benchmark_test.go | 6 ++++-- executor/executor.go | 1 + executor/index_merge_reader_test.go | 8 +++----- planner/core/common_plans.go | 4 ++-- util/memory/tracker.go | 5 +++++ 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index d80f788ae533a..5ea706c81fc61 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -942,8 +942,10 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) t := memory.NewTracker(-1, memLimit) t.SetActionOnExceed(nil) t2 := disk.NewTracker(-1, -1) - e.ctx.GetSessionVars().StmtCtx.MemTracker = t - e.ctx.GetSessionVars().StmtCtx.DiskTracker = t2 + e.ctx.GetSessionVars().MemTracker = t + e.ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(t) + e.ctx.GetSessionVars().DiskTracker = t2 + e.ctx.GetSessionVars().StmtCtx.DiskTracker.AttachTo(t2) return e } diff --git a/executor/executor.go b/executor/executor.go index 2e6b053654041..5796a0e0ee127 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1936,6 +1936,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { vars.MemTracker.UnbindActions() vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) + vars.MemTracker.ResetMaxConsumed() vars.MemTracker.SessionID = vars.ConnectionID if _, ok := s.(*ast.AnalyzeTableStmt); ok { diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 2e33eef27d12d..ae53e1e6c86e2 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -446,17 +446,15 @@ func TestIndexMergeReaderMemTracker(t *testing.T) { insertStr += fmt.Sprintf(" ,(%d, %d, %d)", i, i, i) } insertStr += ";" - memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker + memTracker := tk.Session().GetSessionVars().MemTracker tk.MustExec(insertStr) - oriMaxUsage := memTracker.MaxConsumed() - // We select all rows in t1, so the mem usage is more clear. tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where c1 > 1 or c2 > 1") - newMaxUsage := memTracker.MaxConsumed() - require.Greater(t, newMaxUsage, oriMaxUsage) + memUsage := memTracker.MaxConsumed() + require.Greater(t, memUsage, int64(0)) res := tk.MustQuery("explain analyze select /*+ use_index_merge(t1) */ * from t1 where c1 > 1 or c2 > 1") require.Len(t, res.Rows(), 4) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index d09c54e843722..b17b8555a10e0 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -852,8 +852,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) { copStats = runtimeStatsColl.GetCopStats(explainID) } - memTracker = ctx.GetSessionVars().MemTracker.SearchTrackerWithoutLock(p.ID()) - diskTracker = ctx.GetSessionVars().DiskTracker.SearchTrackerWithoutLock(p.ID()) + memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID()) + diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID()) return } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 2faf805fc5592..485a87a6aa804 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -539,6 +539,11 @@ func (t *Tracker) MaxConsumed() int64 { return t.maxConsumed.Load() } +// ResetMaxConsumed should be invoked before executing a new statement in a session. +func (t *Tracker) ResetMaxConsumed() { + t.maxConsumed.Store(t.BytesConsumed()) +} + // SearchTrackerWithoutLock searches the specific tracker under this tracker without lock. func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker { if t.label == label { From a81718e3abf9b9973266373b5c6ec413a71ca63d Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 25 Oct 2022 14:42:16 +0800 Subject: [PATCH 15/19] fix unit test --- bindinfo/bind_test.go | 2 +- executor/executor_pkg_test.go | 18 +++++++++++++----- executor/executor_test.go | 8 ++++---- executor/merge_join_test.go | 8 ++++---- session/session_test/session_test.go | 4 ++-- 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index f34c94ae44b5a..9ea3fead5afc6 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -743,7 +743,7 @@ func TestStmtHints(t *testing.T) { tk.MustExec("create table t(a int, b int, index idx(a))") tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(1 GB) */ * from t use index(idx)") tk.MustQuery("select * from t") - require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().StmtCtx.MemQuotaQuery) + require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().MemQuotaQuery) require.Equal(t, uint64(100), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime) tk.MustQuery("select a, b from t") require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.MemQuotaQuery) diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 6e8f2ba1e2921..44e985288556b 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -306,7 +306,9 @@ func TestSortSpillDisk(t *testing.T) { ctx.GetSessionVars().MemQuota.MemQuotaQuery = 1 ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize - ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1) + ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, -1) + ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) + ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) cas := &sortCase{rows: 2048, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx} opt := mockDataSourceParameters{ schema: expression.NewSchema(cas.columns()...), @@ -342,7 +344,9 @@ func TestSortSpillDisk(t *testing.T) { err = exec.Close() require.NoError(t, err) - ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 1) + ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 1) + ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) + ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) dataSource.prepareChunks() err = exec.Open(tmpCtx) require.NoError(t, err) @@ -372,7 +376,9 @@ func TestSortSpillDisk(t *testing.T) { err = exec.Close() require.NoError(t, err) - ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 28000) + ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 28000) + ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) + ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) dataSource.prepareChunks() err = exec.Open(tmpCtx) require.NoError(t, err) @@ -394,8 +400,10 @@ func TestSortSpillDisk(t *testing.T) { ctx = mock.NewContext() ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize - ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 16864*50) - ctx.GetSessionVars().StmtCtx.MemTracker.Consume(16864 * 45) + ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 16864*50) + ctx.GetSessionVars().MemTracker.Consume(16864 * 45) + ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) + ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) cas = &sortCase{rows: 20480, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx} opt = mockDataSourceParameters{ schema: expression.NewSchema(cas.columns()...), diff --git a/executor/executor_test.go b/executor/executor_test.go index fded9c2661279..f03ea400f6ab0 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6071,13 +6071,13 @@ func TestGlobalMemoryControl(t *testing.T) { tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") tk1 := testkit.NewTestKit(t, store) - tracker1 := tk1.Session().GetSessionVars().StmtCtx.MemTracker + tracker1 := tk1.Session().GetSessionVars().MemTracker tk2 := testkit.NewTestKit(t, store) - tracker2 := tk2.Session().GetSessionVars().StmtCtx.MemTracker + tracker2 := tk2.Session().GetSessionVars().MemTracker tk3 := testkit.NewTestKit(t, store) - tracker3 := tk3.Session().GetSessionVars().StmtCtx.MemTracker + tracker3 := tk3.Session().GetSessionVars().MemTracker sm := &testkit.MockSessionManager{ PS: []*util.ProcessInfo{tk1.Session().ShowProcess(), tk2.Session().ShowProcess(), tk3.Session().ShowProcess()}, @@ -6156,7 +6156,7 @@ func TestGlobalMemoryControl2(t *testing.T) { }() sql := "select * from t t1 join t t2 join t t3 on t1.a=t2.a and t1.a=t3.a order by t1.a;" // Need 500MB require.True(t, strings.Contains(tk0.QueryToErr(sql).Error(), "Out Of Memory Quota!")) - require.Equal(t, tk0.Session().GetSessionVars().StmtCtx.DiskTracker.MaxConsumed(), int64(0)) + require.Equal(t, tk0.Session().GetSessionVars().DiskTracker.MaxConsumed(), int64(0)) wg.Wait() test[0] = 0 runtime.GC() diff --git a/executor/merge_join_test.go b/executor/merge_join_test.go index 01d6d37a8cd65..d6a1bce3ea7c9 100644 --- a/executor/merge_join_test.go +++ b/executor/merge_join_test.go @@ -264,10 +264,10 @@ func TestShuffleMergeJoinInDisk(t *testing.T) { result := checkMergeAndRun(tk, t, "select /*+ TIDB_SMJ(t) */ * from t1 left outer join t on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20") result.Check(testkit.Rows("1 3 1 1")) - require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()) - require.Greater(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.MaxConsumed(), int64(0)) - require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.DiskTracker.BytesConsumed()) - require.Greater(t, tk.Session().GetSessionVars().StmtCtx.DiskTracker.MaxConsumed(), int64(0)) + require.Equal(t, int64(0), tk.Session().GetSessionVars().MemTracker.BytesConsumed()) + require.Greater(t, tk.Session().GetSessionVars().MemTracker.MaxConsumed(), int64(0)) + require.Equal(t, int64(0), tk.Session().GetSessionVars().DiskTracker.BytesConsumed()) + require.Greater(t, tk.Session().GetSessionVars().DiskTracker.MaxConsumed(), int64(0)) } func TestMergeJoinInDisk(t *testing.T) { diff --git a/session/session_test/session_test.go b/session/session_test/session_test.go index dcbb0242c353b..3344be4cd6d85 100644 --- a/session/session_test/session_test.go +++ b/session/session_test/session_test.go @@ -2114,7 +2114,7 @@ func TestSetEnableRateLimitAction(t *testing.T) { tk.MustExec("create table tmp123(id int)") tk.MustQuery("select * from tmp123;") haveRateLimitAction := false - action := tk.Session().GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest(false) + action := tk.Session().GetSessionVars().MemTracker.GetFallbackForTest(false) for ; action != nil; action = action.GetFallback() { if action.GetPriority() == memory.DefRateLimitPriority { haveRateLimitAction = true @@ -2132,7 +2132,7 @@ func TestSetEnableRateLimitAction(t *testing.T) { result.Check(testkit.Rows("0")) haveRateLimitAction = false - action = tk.Session().GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest(false) + action = tk.Session().GetSessionVars().MemTracker.GetFallbackForTest(false) for ; action != nil; action = action.GetFallback() { if action.GetPriority() == memory.DefRateLimitPriority { haveRateLimitAction = true From db12d5453fbb669696e0b8b075214ac0a007bc84 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 25 Oct 2022 16:18:34 +0800 Subject: [PATCH 16/19] fix unit test --- bindinfo/bind_test.go | 6 +++--- executor/executor.go | 1 + sessionctx/variable/session.go | 2 -- util/expensivequery/expensivequery.go | 1 - util/memory/action.go | 1 - 5 files changed, 4 insertions(+), 7 deletions(-) diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index 9ea3fead5afc6..aca4aa0785273 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -741,12 +741,12 @@ func TestStmtHints(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, b int, index idx(a))") - tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(1 GB) */ * from t use index(idx)") + tk.MustExec("create global binding for select * from t using select /*+ MAX_EXECUTION_TIME(100), MEMORY_QUOTA(2 GB) */ * from t use index(idx)") tk.MustQuery("select * from t") - require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().MemQuotaQuery) + require.Equal(t, int64(2147483648), tk.Session().GetSessionVars().MemTracker.GetBytesLimit()) require.Equal(t, uint64(100), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime) tk.MustQuery("select a, b from t") - require.Equal(t, int64(0), tk.Session().GetSessionVars().StmtCtx.MemQuotaQuery) + require.Equal(t, int64(0), tk.Session().GetSessionVars().MemTracker.GetBytesLimit()) require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime) } diff --git a/executor/executor.go b/executor/executor.go index 5796a0e0ee127..09eff4739e447 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1937,6 +1937,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { vars.MemTracker.UnbindActions() vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) vars.MemTracker.ResetMaxConsumed() + vars.DiskTracker.ResetMaxConsumed() vars.MemTracker.SessionID = vars.ConnectionID if _, ok := s.(*ast.AnalyzeTableStmt); ok { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 724390add26f7..08e315407906d 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1614,8 +1614,6 @@ func NewSessionVars(hctx HookContext) *SessionVars { vars.TiFlashMaxThreads = DefTiFlashMaxThreads vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1) - vars.DiskTracker.IsRootTrackerOfSess = true - //logutil.BgLogger().Error("stack", zap.Stack("stack")) vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery) vars.MemTracker.IsRootTrackerOfSess = true diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index 6f782a98b3db2..e761d98ab05a2 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -111,6 +111,5 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) { // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) { - logutil.BgLogger().Error("here xhy") logutil.BgLogger().Warn(msg, util.GenLogFields(costTime, info, true)...) } diff --git a/util/memory/action.go b/util/memory/action.go index 38f3a695d63eb..591b4d1d61c73 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -135,7 +135,6 @@ func (a *PanicOnExceed) Action(t *Tracker) { a.mutex.Lock() if !a.acted { if a.logHook == nil { - logutil.BgLogger().Error("PanicOnExceed xhy") logutil.BgLogger().Warn("memory exceeds quota", zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String()))) return From 7922e115a54086bd3f085983315c2c91bb63b09e Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 25 Oct 2022 17:24:28 +0800 Subject: [PATCH 17/19] fix unit test --- session/session_test/session_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/session/session_test/session_test.go b/session/session_test/session_test.go index 3344be4cd6d85..79c1fd00d6a2c 100644 --- a/session/session_test/session_test.go +++ b/session/session_test/session_test.go @@ -2151,29 +2151,29 @@ func TestStmtHints(t *testing.T) { // Test MEMORY_QUOTA hint tk.MustExec("select /*+ MEMORY_QUOTA(1 MB) */ 1;") val := int64(1) * 1024 * 1024 - require.True(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val)) + require.True(t, tk.Session().GetSessionVars().MemTracker.CheckBytesLimit(val)) tk.MustExec("select /*+ MEMORY_QUOTA(1 GB) */ 1;") val = int64(1) * 1024 * 1024 * 1024 - require.True(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val)) + require.True(t, tk.Session().GetSessionVars().MemTracker.CheckBytesLimit(val)) tk.MustExec("select /*+ MEMORY_QUOTA(1 GB), MEMORY_QUOTA(1 MB) */ 1;") val = int64(1) * 1024 * 1024 require.Len(t, tk.Session().GetSessionVars().StmtCtx.GetWarnings(), 1) - require.True(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val)) + require.True(t, tk.Session().GetSessionVars().MemTracker.CheckBytesLimit(val)) tk.MustExec("select /*+ MEMORY_QUOTA(0 GB) */ 1;") val = int64(0) require.Len(t, tk.Session().GetSessionVars().StmtCtx.GetWarnings(), 1) - require.True(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val)) + require.True(t, tk.Session().GetSessionVars().MemTracker.CheckBytesLimit(val)) require.EqualError(t, tk.Session().GetSessionVars().StmtCtx.GetWarnings()[0].Err, "Setting the MEMORY_QUOTA to 0 means no memory limit") tk.MustExec("use test") tk.MustExec("create table t1(a int);") tk.MustExec("insert /*+ MEMORY_QUOTA(1 MB) */ into t1 (a) values (1);") val = int64(1) * 1024 * 1024 - require.True(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val)) + require.True(t, tk.Session().GetSessionVars().MemTracker.CheckBytesLimit(val)) tk.MustExec("insert /*+ MEMORY_QUOTA(1 MB) */ into t1 select /*+ MEMORY_QUOTA(3 MB) */ * from t1;") val = int64(1) * 1024 * 1024 - require.True(t, tk.Session().GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val)) + require.True(t, tk.Session().GetSessionVars().MemTracker.CheckBytesLimit(val)) require.Len(t, tk.Session().GetSessionVars().StmtCtx.GetWarnings(), 1) require.EqualError(t, tk.Session().GetSessionVars().StmtCtx.GetWarnings()[0].Err, "[util:3126]Hint MEMORY_QUOTA(`3145728`) is ignored as conflicting/duplicated.") From 6568436274b3892297ca3b9bb7ebd0d9ce445acc Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 26 Oct 2022 17:17:24 +0800 Subject: [PATCH 18/19] fix unit test --- bindinfo/bind_test.go | 2 +- executor/merge_join_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index aca4aa0785273..f71d13d08f420 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -746,7 +746,7 @@ func TestStmtHints(t *testing.T) { require.Equal(t, int64(2147483648), tk.Session().GetSessionVars().MemTracker.GetBytesLimit()) require.Equal(t, uint64(100), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime) tk.MustQuery("select a, b from t") - require.Equal(t, int64(0), tk.Session().GetSessionVars().MemTracker.GetBytesLimit()) + require.Equal(t, int64(1073741824), tk.Session().GetSessionVars().MemTracker.GetBytesLimit()) require.Equal(t, uint64(0), tk.Session().GetSessionVars().StmtCtx.MaxExecutionTime) } diff --git a/executor/merge_join_test.go b/executor/merge_join_test.go index d6a1bce3ea7c9..63e8b8f9d0b3e 100644 --- a/executor/merge_join_test.go +++ b/executor/merge_join_test.go @@ -267,7 +267,7 @@ func TestShuffleMergeJoinInDisk(t *testing.T) { require.Equal(t, int64(0), tk.Session().GetSessionVars().MemTracker.BytesConsumed()) require.Greater(t, tk.Session().GetSessionVars().MemTracker.MaxConsumed(), int64(0)) require.Equal(t, int64(0), tk.Session().GetSessionVars().DiskTracker.BytesConsumed()) - require.Greater(t, tk.Session().GetSessionVars().DiskTracker.MaxConsumed(), int64(0)) + require.GreaterOrEqual(t, tk.Session().GetSessionVars().DiskTracker.MaxConsumed(), int64(0)) } func TestMergeJoinInDisk(t *testing.T) { From 691311dec0e8fa62cd3ebc43a2167ce7acb24ad3 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 1 Nov 2022 17:02:27 +0800 Subject: [PATCH 19/19] fix ut --- ddl/concurrentddltest/switch_test.go | 2 ++ executor/executor.go | 2 +- util/memory/action.go | 10 ++++++---- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ddl/concurrentddltest/switch_test.go b/ddl/concurrentddltest/switch_test.go index 5d77dcb8f7359..c43e269c47a1f 100644 --- a/ddl/concurrentddltest/switch_test.go +++ b/ddl/concurrentddltest/switch_test.go @@ -110,6 +110,7 @@ func TestConcurrentDDLSwitch(t *testing.T) { count++ if b { tk := testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().MemQuotaQuery = -1 tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0")) tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0")) } @@ -121,6 +122,7 @@ func TestConcurrentDDLSwitch(t *testing.T) { require.Greater(t, count, 0) tk = testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().MemQuotaQuery = -1 tk.MustExec("use test") for i, tbl := range tables { tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx))) diff --git a/executor/executor.go b/executor/executor.go index 09eff4739e447..7edc2e690c6cf 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1963,7 +1963,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.MemTracker.AttachTo(vars.MemTracker) sc.InitDiskTracker(memory.LabelForSQLText, -1) globalConfig := config.GetGlobalConfig() - if variable.EnableTmpStorageOnOOM.Load() && vars.DiskTracker != nil { + if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil { sc.DiskTracker.AttachTo(vars.DiskTracker) } if execStmt, ok := s.(*ast.ExecuteStmt); ok { diff --git a/util/memory/action.go b/util/memory/action.go index 591b4d1d61c73..75b587e2157f9 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -133,16 +133,18 @@ func (a *PanicOnExceed) SetLogHook(hook func(uint64)) { // Action panics when memory usage exceeds memory quota. func (a *PanicOnExceed) Action(t *Tracker) { a.mutex.Lock() + defer func() { + a.mutex.Unlock() + }() if !a.acted { if a.logHook == nil { logutil.BgLogger().Warn("memory exceeds quota", - zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String()))) - return + zap.Uint64("connID", t.SessionID), zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String()))) + } else { + a.logHook(a.ConnID) } - a.logHook(a.ConnID) } a.acted = true - a.mutex.Unlock() panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID)) }