Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: change record analyze to explain in oom alarm #39099

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail
if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) {
copStats = runtimeStatsColl.GetCopStats(explainID)
}
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithLock(p.ID())
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID())
return
}

Expand Down
5 changes: 0 additions & 5 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,11 +881,6 @@ func (p *basePlan) SCtx() sessionctx.Context {
return p.ctx
}

// SetSCtx Context implements Plan Set Context interface.
func (p *basePlan) SetSCtx(ctx sessionctx.Context) {
p.ctx = ctx
}

// buildPlanTrace implements Plan
func (p *basePhysicalPlan) buildPlanTrace() *tracing.PlanTrace {
planTrace := &tracing.PlanTrace{ID: p.ID(), TP: p.self.TP(), ExplainInfo: p.self.ExplainInfo()}
Expand Down
19 changes: 0 additions & 19 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ import (
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -1573,18 +1572,13 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
if explain, ok := p.(*plannercore.Explain); ok && explain.Analyze && explain.TargetPlan != nil {
p = explain.TargetPlan
}
canExplainAnalyze := false
if _, ok := p.(plannercore.PhysicalPlan); ok {
canExplainAnalyze = true
}
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: p,
PlanExplainRows: plannercore.GetExplainRowsForPlan(p),
CurrentAnalyzeRows: s.getCurrentAnalyzePlan,
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Expand All @@ -1597,7 +1591,6 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(),
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
CanExplainAnalyze: canExplainAnalyze,
}
oldPi := s.ShowProcess()
if p == nil {
Expand All @@ -1607,7 +1600,6 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
pi.Plan = oldPi.Plan
pi.PlanExplainRows = oldPi.PlanExplainRows
pi.RuntimeStatsColl = oldPi.RuntimeStatsColl
_, pi.CanExplainAnalyze = pi.Plan.(plannercore.PhysicalPlan)
}
}
// We set process info before building plan, so we extended execution time.
Expand Down Expand Up @@ -1636,17 +1628,6 @@ func (s *session) getOomAlarmVariablesInfo() util.OOMAlarmVariablesInfo {
}
}

func (s *session) getCurrentAnalyzePlan(p interface{}, runtimeStatsColl *execdetails.RuntimeStatsColl) [][]string {
explain := &plannercore.Explain{
TargetPlan: p.(plannercore.Plan),
Format: types.ExplainFormatROW,
Analyze: false,
RuntimeStatsColl: runtimeStatsColl,
}
explain.SetSCtx(s)
return plannercore.GetExplainAnalyzeRowsForPlan(explain)
}

func (s *session) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
s.diskFullOpt = level
}
Expand Down
25 changes: 12 additions & 13 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import (

// ExecDetails contains execution detail information.
type ExecDetails struct {
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
CommitDetail *util.CommitDetails
LockKeysDetail *util.LockKeysDetails
ScanDetail *util.ScanDetail
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
CalleeAddress string
TimeDetail util.TimeDetail
RequestCount int
CopTime time.Duration
BackoffTime time.Duration
LockKeysDuration time.Duration
RequestCount int
}

type stmtExecDetailKeyType struct{}
Expand Down Expand Up @@ -368,36 +368,33 @@ type CopRuntimeStats struct {
scanDetail *util.ScanDetail
// do not use kv.StoreType because it will meet cycle import error
storeType string
// count CopRuntimeStats total rows
totalRows int64
sync.Mutex
}

// RecordOneCopTask records a specific cop tasks's execution detail.
func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.ExecutorExecutionSummary) {
crs.Lock()
defer crs.Unlock()
currentRows := int64(*summary.NumProducedRows)
crs.totalRows += currentRows
crs.stats[address] = append(crs.stats[address],
&basicCopRuntimeStats{BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations),
consume: int64(*summary.TimeProcessedNs),
rows: currentRows},
rows: int64(*summary.NumProducedRows)},
threads: int32(summary.GetConcurrency()),
storeType: crs.storeType})
}

// GetActRows return total rows of CopRuntimeStats.
func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
crs.Lock()
defer crs.Unlock()
return crs.totalRows
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
totalRows += stat.rows
}
}
return totalRows
}

// MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information.
func (crs *CopRuntimeStats) MergeBasicStats() (procTimes []time.Duration, totalTime time.Duration, totalTasks, totalLoops, totalThreads int32) {
crs.Lock()
defer crs.Unlock()
procTimes = make([]time.Duration, 0, 32)
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
Expand Down Expand Up @@ -788,7 +785,9 @@ func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo {

// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo.
type RuntimeStatsWithConcurrencyInfo struct {
// executor concurrency information
concurrency []*ConcurrencyInfo
// protect concurrency
sync.Mutex
}

Expand Down
14 changes: 0 additions & 14 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,20 +573,6 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker {
return nil
}

// SearchTrackerWithLock searches the specific tracker under this tracker with lock.
func (t *Tracker) SearchTrackerWithLock(label int) *Tracker {
t.mu.Lock()
defer t.mu.Unlock()
if t.label == label {
return t
}
children := t.mu.children[label]
if len(children) > 0 {
return children[0]
}
return nil
}

// SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes.
func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) {
t.mu.Lock()
Expand Down
1 change: 0 additions & 1 deletion util/memoryusagealarm/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_test(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//util",
"//util/execdetails",
"//util/memory",
"@com_github_stretchr_testify//assert",
],
Expand Down
12 changes: 6 additions & 6 deletions util/memoryusagealarm/memoryusagealarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ func (record *memoryUsageAlarm) tryRemoveRedundantRecords() {
}
}

func getCurrentAnalyzePlan(info *util.ProcessInfo) string {
func getPlanString(info *util.ProcessInfo) string {
var buf strings.Builder
rows := info.CurrentAnalyzeRows(info.Plan, info.RuntimeStatsColl)
buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|%v|%v|%v|%v|", "id", "estRows", "actRows", "task", "access object", "execution info", "operator info", "memory", "disk"))
rows := info.PlanExplainRows
buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|", "id", "estRows", "task", "access object", "operator info"))
for _, row := range rows {
buf.WriteString(fmt.Sprintf("\n|%v|%v|%v|%v|%v|%v|%v|%v|%v|", row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8]))
buf.WriteString(fmt.Sprintf("\n|%v|%v|%v|%v|%v|", row[0], row[1], row[2], row[3], row[4]))
}
return buf.String()
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo)
fields = append(fields, zap.Int64("tidb_mem_quota_query", info.OOMAlarmVariablesInfo.SessionMemQuotaQuery))
fields = append(fields, zap.Int("tidb_analyze_version", info.OOMAlarmVariablesInfo.SessionAnalyzeVersion))
fields = append(fields, zap.Bool("tidb_enable_rate_limit_action", info.OOMAlarmVariablesInfo.SessionEnabledRateLimitAction))
fields = append(fields, zap.String("current_analyze_plan", getCurrentAnalyzePlan(info)))
fields = append(fields, zap.String("current_analyze_plan", getPlanString(info)))
for _, field := range fields {
switch field.Type {
case zapcore.StringType:
Expand Down Expand Up @@ -315,7 +315,7 @@ func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager, recordDir stri
processInfo := sm.ShowProcessList()
pinfo := make([]*util.ProcessInfo, 0, len(processInfo))
for _, info := range processInfo {
if len(info.Info) != 0 && info.CanExplainAnalyze {
if len(info.Info) != 0 {
pinfo = append(pinfo, info)
}
}
Expand Down
Loading