Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a reference count for StmtCtx (#39368) #39426

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,21 +1394,22 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
p = explain.TargetPlan
}
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: p,
PlanExplainRows: plannercore.GetExplainRowsForPlan(p),
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Info: sql,
CurTxnStartTS: curTxnStartTS,
StmtCtx: s.sessionVars.StmtCtx,
StatsInfo: plannercore.GetStatsInfo,
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: p,
PlanExplainRows: plannercore.GetExplainRowsForPlan(p),
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Info: sql,
CurTxnStartTS: curTxnStartTS,
StmtCtx: s.sessionVars.StmtCtx,
RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx,
StatsInfo: plannercore.GetStatsInfo,
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
}
oldPi := s.ShowProcess()
if p == nil {
Expand Down
36 changes: 36 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,42 @@ type SQLWarn struct {
Err error
}

// ReferenceCount indicates the reference count of StmtCtx.
type ReferenceCount int32

const (
// ReferenceCountIsFrozen indicates the current StmtCtx is resetting, it'll refuse all the access from other sessions.
ReferenceCountIsFrozen int32 = -1
// ReferenceCountNoReference indicates the current StmtCtx is not accessed by other sessions.
ReferenceCountNoReference int32 = 0
)

// TryIncrease tries to increase the reference count.
// There is a small chance that TryIncrease returns true while TryFreeze and
// UnFreeze are invoked successfully during the execution of TryIncrease.
func (rf *ReferenceCount) TryIncrease() bool {
refCnt := atomic.LoadInt32((*int32)(rf))
for ; refCnt != ReferenceCountIsFrozen && !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt+1); refCnt = atomic.LoadInt32((*int32)(rf)) {
}
return refCnt != ReferenceCountIsFrozen
}

// Decrease decreases the reference count.
func (rf *ReferenceCount) Decrease() {
for refCnt := atomic.LoadInt32((*int32)(rf)); !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt-1); refCnt = atomic.LoadInt32((*int32)(rf)) {
}
}

// TryFreeze tries to freeze the StmtCtx to frozen before resetting the old StmtCtx.
func (rf *ReferenceCount) TryFreeze() bool {
return atomic.LoadInt32((*int32)(rf)) == ReferenceCountNoReference && atomic.CompareAndSwapInt32((*int32)(rf), ReferenceCountNoReference, ReferenceCountIsFrozen)
}

// UnFreeze unfreeze the frozen StmtCtx thus the other session can access this StmtCtx.
func (rf *ReferenceCount) UnFreeze() {
atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference)
}

// StatementContext contains variables for a statement.
// It should be reset before executing a statement.
type StatementContext struct {
Expand Down
26 changes: 18 additions & 8 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,11 @@ type SessionVars struct {
// StmtCtx holds variables for current executing statement.
StmtCtx *stmtctx.StatementContext

// RefCountOfStmtCtx indicates the reference count of StmtCtx. When the
// StmtCtx is accessed by other sessions, e.g. oom-alarm-handler/expensive-query-handler, add one first.
// Note: this variable should be accessed and updated by atomic operations.
RefCountOfStmtCtx stmtctx.ReferenceCount

// AllowAggPushDown can be set to false to forbid aggregation push down.
AllowAggPushDown bool

Expand Down Expand Up @@ -1001,11 +1006,8 @@ type SessionVars struct {
// ReadStaleness indicates the staleness duration for the following query
ReadStaleness time.Duration

// cached is used to optimze the object allocation.
cached struct {
curr int8
data [2]stmtctx.StatementContext
}
// cachedStmtCtx is used to optimze the object allocation.
cachedStmtCtx [2]stmtctx.StatementContext

// Rng stores the rand_seed1 and rand_seed2 for Rand() function
Rng *mathutil.MysqlRng
Expand Down Expand Up @@ -1049,9 +1051,17 @@ type SessionVars struct {

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext {
s.cached.curr = (s.cached.curr + 1) % 2
s.cached.data[s.cached.curr] = stmtctx.StatementContext{}
return &s.cached.data[s.cached.curr]
sc := &s.cachedStmtCtx[0]
if sc == s.StmtCtx {
sc = &s.cachedStmtCtx[1]
}
if s.RefCountOfStmtCtx.TryFreeze() {
*sc = stmtctx.StatementContext{}
s.RefCountOfStmtCtx.UnFreeze()
} else {
sc = &stmtctx.StatementContext{}
}
return sc
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's
Expand Down
4 changes: 3 additions & 1 deletion util/expensivequery/expensivequerey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestMain(m *testing.M) {
func TestLogFormat(t *testing.T) {
mem := memory.NewTracker(-1, -1)
mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27)
var refCount stmtctx.ReferenceCount = 0
info := &util.ProcessInfo{
ID: 233,
User: "PingCAP",
Expand All @@ -52,7 +53,8 @@ func TestLogFormat(t *testing.T) {
StmtCtx: &stmtctx.StatementContext{
MemTracker: mem,
},
RedactSQL: false,
RefCountOfStmtCtx: &refCount,
RedactSQL: false,
}
costTime := time.Second * 233
logFields := genLogFields(costTime, info)
Expand Down
10 changes: 9 additions & 1 deletion util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) {
}

func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field {
if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() {
return nil
}
defer info.RefCountOfStmtCtx.Decrease()
logFields := make([]zap.Field, 0, 20)
logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s"))
execDetail := info.StmtCtx.GetExecDetails()
Expand Down Expand Up @@ -185,5 +189,9 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field {

// logExpensiveQuery logs the queries which exceed the time threshold or memory threshold.
func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) {
logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...)
fields := genLogFields(costTime, info)
if fields == nil {
return
}
logutil.BgLogger().Warn("expensive_query", fields...)
}
29 changes: 15 additions & 14 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ import (

// ProcessInfo is a struct used for show processlist statement.
type ProcessInfo struct {
ID uint64
User string
Host string
Port string
DB string
Digest string
Plan interface{}
PlanExplainRows [][]string
RuntimeStatsColl *execdetails.RuntimeStatsColl
Time time.Time
Info string
CurTxnStartTS uint64
StmtCtx *stmtctx.StatementContext
StatsInfo func(interface{}) map[string]uint64
ID uint64
User string
Host string
Port string
DB string
Digest string
Plan interface{}
PlanExplainRows [][]string
RuntimeStatsColl *execdetails.RuntimeStatsColl
Time time.Time
Info string
CurTxnStartTS uint64
StmtCtx *stmtctx.StatementContext
RefCountOfStmtCtx *stmtctx.ReferenceCount
StatsInfo func(interface{}) map[string]uint64
// MaxExecutionTime is the timeout for select statement, in milliseconds.
// If the query takes too long, kill it.
MaxExecutionTime uint64
Expand Down