From fc4686598e8e7a84a65f8c15af6864e8420f22e3 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 10 Jan 2023 14:18:22 +0800 Subject: [PATCH] *: add a reference count for StmtCtx (#39368) (#39426) close pingcap/tidb#27725 --- session/session.go | 31 +++++++++--------- sessionctx/stmtctx/stmtctx.go | 36 +++++++++++++++++++++ sessionctx/variable/session.go | 26 ++++++++++----- util/expensivequery/expensivequerey_test.go | 4 ++- util/expensivequery/expensivequery.go | 10 +++++- util/processinfo.go | 29 +++++++++-------- 6 files changed, 97 insertions(+), 39 deletions(-) diff --git a/session/session.go b/session/session.go index 91b9c97ff765a..999215977d26a 100644 --- a/session/session.go +++ b/session/session.go @@ -1394,21 +1394,22 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu p = explain.TargetPlan } pi := util.ProcessInfo{ - ID: s.sessionVars.ConnectionID, - Port: s.sessionVars.Port, - DB: s.sessionVars.CurrentDB, - Command: command, - Plan: p, - PlanExplainRows: plannercore.GetExplainRowsForPlan(p), - RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, - Time: t, - State: s.Status(), - Info: sql, - CurTxnStartTS: curTxnStartTS, - StmtCtx: s.sessionVars.StmtCtx, - StatsInfo: plannercore.GetStatsInfo, - MaxExecutionTime: maxExecutionTime, - RedactSQL: s.sessionVars.EnableRedactLog, + ID: s.sessionVars.ConnectionID, + Port: s.sessionVars.Port, + DB: s.sessionVars.CurrentDB, + Command: command, + Plan: p, + PlanExplainRows: plannercore.GetExplainRowsForPlan(p), + RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, + Time: t, + State: s.Status(), + Info: sql, + CurTxnStartTS: curTxnStartTS, + StmtCtx: s.sessionVars.StmtCtx, + RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx, + StatsInfo: plannercore.GetStatsInfo, + MaxExecutionTime: maxExecutionTime, + RedactSQL: s.sessionVars.EnableRedactLog, } oldPi := s.ShowProcess() if p == nil { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 97c589d365164..d5f19e8fe6dd5 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -60,6 +60,42 @@ type SQLWarn struct { Err error } +// ReferenceCount indicates the reference count of StmtCtx. +type ReferenceCount int32 + +const ( + // ReferenceCountIsFrozen indicates the current StmtCtx is resetting, it'll refuse all the access from other sessions. + ReferenceCountIsFrozen int32 = -1 + // ReferenceCountNoReference indicates the current StmtCtx is not accessed by other sessions. + ReferenceCountNoReference int32 = 0 +) + +// TryIncrease tries to increase the reference count. +// There is a small chance that TryIncrease returns true while TryFreeze and +// UnFreeze are invoked successfully during the execution of TryIncrease. +func (rf *ReferenceCount) TryIncrease() bool { + refCnt := atomic.LoadInt32((*int32)(rf)) + for ; refCnt != ReferenceCountIsFrozen && !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt+1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } + return refCnt != ReferenceCountIsFrozen +} + +// Decrease decreases the reference count. +func (rf *ReferenceCount) Decrease() { + for refCnt := atomic.LoadInt32((*int32)(rf)); !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt-1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } +} + +// TryFreeze tries to freeze the StmtCtx to frozen before resetting the old StmtCtx. +func (rf *ReferenceCount) TryFreeze() bool { + return atomic.LoadInt32((*int32)(rf)) == ReferenceCountNoReference && atomic.CompareAndSwapInt32((*int32)(rf), ReferenceCountNoReference, ReferenceCountIsFrozen) +} + +// UnFreeze unfreeze the frozen StmtCtx thus the other session can access this StmtCtx. +func (rf *ReferenceCount) UnFreeze() { + atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference) +} + // StatementContext contains variables for a statement. // It should be reset before executing a statement. type StatementContext struct { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index bcc8820d9aa00..d77d14e351556 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -586,6 +586,11 @@ type SessionVars struct { // StmtCtx holds variables for current executing statement. StmtCtx *stmtctx.StatementContext + // RefCountOfStmtCtx indicates the reference count of StmtCtx. When the + // StmtCtx is accessed by other sessions, e.g. oom-alarm-handler/expensive-query-handler, add one first. + // Note: this variable should be accessed and updated by atomic operations. + RefCountOfStmtCtx stmtctx.ReferenceCount + // AllowAggPushDown can be set to false to forbid aggregation push down. AllowAggPushDown bool @@ -1001,11 +1006,8 @@ type SessionVars struct { // ReadStaleness indicates the staleness duration for the following query ReadStaleness time.Duration - // cached is used to optimze the object allocation. - cached struct { - curr int8 - data [2]stmtctx.StatementContext - } + // cachedStmtCtx is used to optimze the object allocation. + cachedStmtCtx [2]stmtctx.StatementContext // Rng stores the rand_seed1 and rand_seed2 for Rand() function Rng *mathutil.MysqlRng @@ -1049,9 +1051,17 @@ type SessionVars struct { // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { - s.cached.curr = (s.cached.curr + 1) % 2 - s.cached.data[s.cached.curr] = stmtctx.StatementContext{} - return &s.cached.data[s.cached.curr] + sc := &s.cachedStmtCtx[0] + if sc == s.StmtCtx { + sc = &s.cachedStmtCtx[1] + } + if s.RefCountOfStmtCtx.TryFreeze() { + *sc = stmtctx.StatementContext{} + s.RefCountOfStmtCtx.UnFreeze() + } else { + sc = &stmtctx.StatementContext{} + } + return sc } // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's diff --git a/util/expensivequery/expensivequerey_test.go b/util/expensivequery/expensivequerey_test.go index c1a8464fdfeb9..7f64a8406122e 100644 --- a/util/expensivequery/expensivequerey_test.go +++ b/util/expensivequery/expensivequerey_test.go @@ -39,6 +39,7 @@ func TestMain(m *testing.M) { func TestLogFormat(t *testing.T) { mem := memory.NewTracker(-1, -1) mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27) + var refCount stmtctx.ReferenceCount = 0 info := &util.ProcessInfo{ ID: 233, User: "PingCAP", @@ -52,7 +53,8 @@ func TestLogFormat(t *testing.T) { StmtCtx: &stmtctx.StatementContext{ MemTracker: mem, }, - RedactSQL: false, + RefCountOfStmtCtx: &refCount, + RedactSQL: false, } costTime := time.Second * 233 logFields := genLogFields(costTime, info) diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index 035e90aec0190..ce7cc44980240 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -119,6 +119,10 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) { } func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { + if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() { + return nil + } + defer info.RefCountOfStmtCtx.Decrease() logFields := make([]zap.Field, 0, 20) logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s")) execDetail := info.StmtCtx.GetExecDetails() @@ -185,5 +189,9 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) { - logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...) + fields := genLogFields(costTime, info) + if fields == nil { + return + } + logutil.BgLogger().Warn("expensive_query", fields...) } diff --git a/util/processinfo.go b/util/processinfo.go index e9f496f73f60a..996136d77619e 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -31,20 +31,21 @@ import ( // ProcessInfo is a struct used for show processlist statement. type ProcessInfo struct { - ID uint64 - User string - Host string - Port string - DB string - Digest string - Plan interface{} - PlanExplainRows [][]string - RuntimeStatsColl *execdetails.RuntimeStatsColl - Time time.Time - Info string - CurTxnStartTS uint64 - StmtCtx *stmtctx.StatementContext - StatsInfo func(interface{}) map[string]uint64 + ID uint64 + User string + Host string + Port string + DB string + Digest string + Plan interface{} + PlanExplainRows [][]string + RuntimeStatsColl *execdetails.RuntimeStatsColl + Time time.Time + Info string + CurTxnStartTS uint64 + StmtCtx *stmtctx.StatementContext + RefCountOfStmtCtx *stmtctx.ReferenceCount + StatsInfo func(interface{}) map[string]uint64 // MaxExecutionTime is the timeout for select statement, in milliseconds. // If the query takes too long, kill it. MaxExecutionTime uint64