Skip to content

Commit

Permalink
*: add a reference count for StmtCtx (#39368) (#39426)
Browse files Browse the repository at this point in the history
close #27725
  • Loading branch information
ti-chi-bot committed Jan 10, 2023
1 parent 1d0122a commit fc46865
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 39 deletions.
31 changes: 16 additions & 15 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,21 +1394,22 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
p = explain.TargetPlan
}
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: p,
PlanExplainRows: plannercore.GetExplainRowsForPlan(p),
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Info: sql,
CurTxnStartTS: curTxnStartTS,
StmtCtx: s.sessionVars.StmtCtx,
StatsInfo: plannercore.GetStatsInfo,
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: p,
PlanExplainRows: plannercore.GetExplainRowsForPlan(p),
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Info: sql,
CurTxnStartTS: curTxnStartTS,
StmtCtx: s.sessionVars.StmtCtx,
RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx,
StatsInfo: plannercore.GetStatsInfo,
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
}
oldPi := s.ShowProcess()
if p == nil {
Expand Down
36 changes: 36 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,42 @@ type SQLWarn struct {
Err error
}

// ReferenceCount indicates the reference count of StmtCtx.
type ReferenceCount int32

const (
// ReferenceCountIsFrozen indicates the current StmtCtx is resetting, it'll refuse all the access from other sessions.
ReferenceCountIsFrozen int32 = -1
// ReferenceCountNoReference indicates the current StmtCtx is not accessed by other sessions.
ReferenceCountNoReference int32 = 0
)

// TryIncrease tries to increase the reference count.
// There is a small chance that TryIncrease returns true while TryFreeze and
// UnFreeze are invoked successfully during the execution of TryIncrease.
func (rf *ReferenceCount) TryIncrease() bool {
refCnt := atomic.LoadInt32((*int32)(rf))
for ; refCnt != ReferenceCountIsFrozen && !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt+1); refCnt = atomic.LoadInt32((*int32)(rf)) {
}
return refCnt != ReferenceCountIsFrozen
}

// Decrease decreases the reference count.
func (rf *ReferenceCount) Decrease() {
for refCnt := atomic.LoadInt32((*int32)(rf)); !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt-1); refCnt = atomic.LoadInt32((*int32)(rf)) {
}
}

// TryFreeze tries to freeze the StmtCtx to frozen before resetting the old StmtCtx.
func (rf *ReferenceCount) TryFreeze() bool {
return atomic.LoadInt32((*int32)(rf)) == ReferenceCountNoReference && atomic.CompareAndSwapInt32((*int32)(rf), ReferenceCountNoReference, ReferenceCountIsFrozen)
}

// UnFreeze unfreeze the frozen StmtCtx thus the other session can access this StmtCtx.
func (rf *ReferenceCount) UnFreeze() {
atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference)
}

// StatementContext contains variables for a statement.
// It should be reset before executing a statement.
type StatementContext struct {
Expand Down
26 changes: 18 additions & 8 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,11 @@ type SessionVars struct {
// StmtCtx holds variables for current executing statement.
StmtCtx *stmtctx.StatementContext

// RefCountOfStmtCtx indicates the reference count of StmtCtx. When the
// StmtCtx is accessed by other sessions, e.g. oom-alarm-handler/expensive-query-handler, add one first.
// Note: this variable should be accessed and updated by atomic operations.
RefCountOfStmtCtx stmtctx.ReferenceCount

// AllowAggPushDown can be set to false to forbid aggregation push down.
AllowAggPushDown bool

Expand Down Expand Up @@ -1001,11 +1006,8 @@ type SessionVars struct {
// ReadStaleness indicates the staleness duration for the following query
ReadStaleness time.Duration

// cached is used to optimze the object allocation.
cached struct {
curr int8
data [2]stmtctx.StatementContext
}
// cachedStmtCtx is used to optimze the object allocation.
cachedStmtCtx [2]stmtctx.StatementContext

// Rng stores the rand_seed1 and rand_seed2 for Rand() function
Rng *mathutil.MysqlRng
Expand Down Expand Up @@ -1049,9 +1051,17 @@ type SessionVars struct {

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext {
s.cached.curr = (s.cached.curr + 1) % 2
s.cached.data[s.cached.curr] = stmtctx.StatementContext{}
return &s.cached.data[s.cached.curr]
sc := &s.cachedStmtCtx[0]
if sc == s.StmtCtx {
sc = &s.cachedStmtCtx[1]
}
if s.RefCountOfStmtCtx.TryFreeze() {
*sc = stmtctx.StatementContext{}
s.RefCountOfStmtCtx.UnFreeze()
} else {
sc = &stmtctx.StatementContext{}
}
return sc
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's
Expand Down
4 changes: 3 additions & 1 deletion util/expensivequery/expensivequerey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestMain(m *testing.M) {
func TestLogFormat(t *testing.T) {
mem := memory.NewTracker(-1, -1)
mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27)
var refCount stmtctx.ReferenceCount = 0
info := &util.ProcessInfo{
ID: 233,
User: "PingCAP",
Expand All @@ -52,7 +53,8 @@ func TestLogFormat(t *testing.T) {
StmtCtx: &stmtctx.StatementContext{
MemTracker: mem,
},
RedactSQL: false,
RefCountOfStmtCtx: &refCount,
RedactSQL: false,
}
costTime := time.Second * 233
logFields := genLogFields(costTime, info)
Expand Down
10 changes: 9 additions & 1 deletion util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) {
}

func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field {
if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() {
return nil
}
defer info.RefCountOfStmtCtx.Decrease()
logFields := make([]zap.Field, 0, 20)
logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s"))
execDetail := info.StmtCtx.GetExecDetails()
Expand Down Expand Up @@ -185,5 +189,9 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field {

// logExpensiveQuery logs the queries which exceed the time threshold or memory threshold.
func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) {
logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...)
fields := genLogFields(costTime, info)
if fields == nil {
return
}
logutil.BgLogger().Warn("expensive_query", fields...)
}
29 changes: 15 additions & 14 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ import (

// ProcessInfo is a struct used for show processlist statement.
type ProcessInfo struct {
ID uint64
User string
Host string
Port string
DB string
Digest string
Plan interface{}
PlanExplainRows [][]string
RuntimeStatsColl *execdetails.RuntimeStatsColl
Time time.Time
Info string
CurTxnStartTS uint64
StmtCtx *stmtctx.StatementContext
StatsInfo func(interface{}) map[string]uint64
ID uint64
User string
Host string
Port string
DB string
Digest string
Plan interface{}
PlanExplainRows [][]string
RuntimeStatsColl *execdetails.RuntimeStatsColl
Time time.Time
Info string
CurTxnStartTS uint64
StmtCtx *stmtctx.StatementContext
RefCountOfStmtCtx *stmtctx.ReferenceCount
StatsInfo func(interface{}) map[string]uint64
// MaxExecutionTime is the timeout for select statement, in milliseconds.
// If the query takes too long, kill it.
MaxExecutionTime uint64
Expand Down

0 comments on commit fc46865

Please sign in to comment.