Skip to content

Commit

Permalink
infoschema: add plan field to the statement summary tables (#14182) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored and bb7133 committed Jan 2, 2020
1 parent d84aab8 commit e1d8b41
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 62 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ var defaultConf = Config{
},
StmtSummary: StmtSummary{
Enable: false,
MaxStmtCount: 100,
MaxStmtCount: 200,
MaxSQLLength: 4096,
RefreshInterval: 1800,
HistorySize: 24,
Expand Down
2 changes: 1 addition & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ max-retry-count = 256
enable = false

# max number of statements kept in memory.
max-stmt-count = 100
max-stmt-count = 200

# max length of displayed normalized sql and sample sql.
max-sql-length = 4096
Expand Down
8 changes: 8 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,12 @@ func (a *ExecStmt) SummaryStmt() {
}
sessVars.SetPrevStmtDigest(digest)

// No need to encode every time, so encode lazily.
planGenerator := func() string {
return plannercore.EncodePlan(a.Plan)
}
_, planDigest := getPlanDigest(a.Ctx, a.Plan)

execDetail := stmtCtx.GetExecDetails()
copTaskInfo := stmtCtx.CopTasksDetails()
memMax := stmtCtx.MemTracker.MaxConsumed()
Expand All @@ -802,6 +808,8 @@ func (a *ExecStmt) SummaryStmt() {
Digest: digest,
PrevSQL: prevSQL,
PrevSQLDigest: prevSQLDigest,
PlanGenerator: planGenerator,
PlanDigest: planDigest,
User: userString,
TotalLatency: costTime,
ParseLatency: sessVars.DurationParse,
Expand Down
4 changes: 3 additions & 1 deletion infoschema/perfschema/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ const fieldsInEventsStatementsSummary = " (" +
"FIRST_SEEN TIMESTAMP(6) NOT NULL," +
"LAST_SEEN TIMESTAMP(6) NOT NULL," +
"QUERY_SAMPLE_TEXT LONGTEXT DEFAULT NULL," +
"PREV_SAMPLE_TEXT LONGTEXT DEFAULT NULL);"
"PREV_SAMPLE_TEXT LONGTEXT DEFAULT NULL," +
"PLAN_DIGEST VARCHAR(64) DEFAULT NULL," +
"PLAN LONGTEXT DEFAULT NULL);"

// tableEventsStatementsSummaryByDigest contains the column name definitions for table
// events_statements_summary_by_digest, same as MySQL.
Expand Down
60 changes: 41 additions & 19 deletions infoschema/perfschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -95,27 +96,44 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
tk.MustExec("/**/insert into t values(4, 'd')")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'insert into t%'`,
).Check(testkit.Rows("insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 /**/insert into t values(4, 'd')"))
).Check(testkit.Rows("insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into t values(1, 'a') "))

// Test SELECT.
const failpointName = "github.com/pingcap/tidb/planner/core/mockPlanRowCount"
c.Assert(failpoint.Enable(failpointName, "return(100)"), IsNil)
defer func() { c.Assert(failpoint.Disable(failpointName), IsNil) }()
tk.MustQuery("select * from t where a=2")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select * from t%'`,
).Check(testkit.Rows("select test test.t t:k 1 2 0 0 0 0 0 0 0 0 0 select * from t where a=2"))
).Check(testkit.Rows("select test test.t t:k 1 2 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tIndexLookUp_10\troot\t100\t\n" +
"\t├─IndexScan_8 \tcop \t100\ttable:t, index:a, range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableScan_9 \tcop \t100\ttable:t, keep order:false, stats:pseudo"))

// select ... order by
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest
order by exec_count desc limit 1`,
).Check(testkit.Rows("insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 /**/insert into t values(4, 'd')"))
).Check(testkit.Rows("insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into t values(1, 'a') "))

// Test different plans with same digest.
c.Assert(failpoint.Enable(failpointName, "return(1000)"), IsNil)
tk.MustQuery("select * from t where a=3")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select * from t%'`,
).Check(testkit.Rows("select test test.t t:k 2 4 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tIndexLookUp_10\troot\t100\t\n" +
"\t├─IndexScan_8 \tcop \t100\ttable:t, index:a, range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableScan_9 \tcop \t100\ttable:t, keep order:false, stats:pseudo"))

// Disable it again.
tk.MustExec("set global tidb_enable_stmt_summary = false")
Expand All @@ -130,7 +148,7 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
// The table should be cleared.
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest`,
).Check(testkit.Rows())

Expand All @@ -142,24 +160,26 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
tk.MustExec("commit")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'insert into t%'`,
).Check(testkit.Rows("insert test test.t <nil> 1 0 0 0 0 0 0 0 0 0 1 insert into t values(1, 'a') "))
).Check(testkit.Rows("insert test test.t <nil> 1 0 0 0 0 0 0 0 0 0 1 insert into t values(1, 'a') "))
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text, plan
from performance_schema.events_statements_summary_by_digest
where digest_text='commit'`,
).Check(testkit.Rows("commit test <nil> <nil> 1 0 0 0 0 0 2 2 1 1 0 commit insert into t values(1, 'a')"))
).Check(testkit.Rows("commit test <nil> <nil> 1 0 0 0 0 0 2 2 1 1 0 commit insert into t values(1, 'a') "))

tk.MustQuery("select * from t where a=2")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select * from t%'`,
).Check(testkit.Rows("select test test.t t:k 1 2 0 0 0 0 0 0 0 0 0 select * from t where a=2"))
).Check(testkit.Rows("select test test.t t:k 1 2 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tIndexLookUp_10\troot\t1000\t\n" +
"\t├─IndexScan_8 \tcop \t1000\ttable:t, index:a, range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableScan_9 \tcop \t1000\ttable:t, keep order:false, stats:pseudo"))

// Disable it in global scope.
tk.MustExec("set global tidb_enable_stmt_summary = off")
Expand All @@ -172,10 +192,12 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
// Statement summary is still enabled.
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest
where digest_text like 'select * from t%'`,
).Check(testkit.Rows("select test test.t t:k 2 4 0 0 0 0 0 0 0 0 0 select * from t where a=2"))
).Check(testkit.Rows("select test test.t t:k 2 4 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tIndexLookUp_10\troot\t1000\t\n" +
"\t├─IndexScan_8 \tcop \t1000\ttable:t, index:a, range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableScan_9 \tcop \t1000\ttable:t, keep order:false, stats:pseudo"))

// Unset session variable.
tk.MustExec("set session tidb_enable_stmt_summary = ''")
Expand All @@ -184,7 +206,7 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
// Statement summary is disabled.
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest`,
).Check(testkit.Rows())
}
Expand Down Expand Up @@ -221,15 +243,15 @@ func (s *testTableSuite) TestStmtSummaryHistoryTable(c *C) {
tk.MustExec("/**/insert into t values(4, 'd')")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest_history
where digest_text like 'insert into t%'`,
).Check(testkit.Rows("insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 /**/insert into t values(4, 'd')"))
).Check(testkit.Rows("insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into t values(1, 'a') "))

tk.MustExec("set global tidb_stmt_summary_history_size = 0")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
from performance_schema.events_statements_summary_by_digest_history`,
).Check(testkit.Rows())
}
4 changes: 4 additions & 0 deletions planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"hash"
"sync"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/util/plancodec"
)

Expand All @@ -42,6 +43,9 @@ func EncodePlan(p Plan) string {
if selectPlan == nil {
return ""
}
failpoint.Inject("mockPlanRowCount", func(val failpoint.Value) {
selectPlan.statsInfo().RowCount = float64(val.(int))
})
return pn.encodePlanTree(selectPlan)
}

Expand Down
71 changes: 43 additions & 28 deletions util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package stmtsummary
import (
"bytes"
"container/list"
"context"
"fmt"
"sort"
"strconv"
Expand All @@ -31,6 +32,9 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/plancodec"
"go.uber.org/zap"
)

// There're many types of statement summary tables in MySQL, but we have
Expand All @@ -43,6 +47,8 @@ type stmtSummaryByDigestKey struct {
digest string
// The digest of the previous statement.
prevDigest string
// The digest of the plan of this SQL.
planDigest string
// `hash` is the hash value of this object.
hash []byte
}
Expand All @@ -52,10 +58,11 @@ type stmtSummaryByDigestKey struct {
// `prevSQL` is included in the key To distinguish different transactions.
func (key *stmtSummaryByDigestKey) Hash() []byte {
if len(key.hash) == 0 {
key.hash = make([]byte, 0, len(key.schemaName)+len(key.digest)+len(key.prevDigest))
key.hash = make([]byte, 0, len(key.schemaName)+len(key.digest)+len(key.prevDigest)+len(key.planDigest))
key.hash = append(key.hash, hack.Slice(key.digest)...)
key.hash = append(key.hash, hack.Slice(key.schemaName)...)
key.hash = append(key.hash, hack.Slice(key.prevDigest)...)
key.hash = append(key.hash, hack.Slice(key.planDigest)...)
}
return key.hash
}
Expand Down Expand Up @@ -106,6 +113,7 @@ type stmtSummaryByDigest struct {
// They won't change once this object is created, so locking is not needed.
schemaName string
digest string
planDigest string
stmtType string
normalizedSQL string
tableNames string
Expand All @@ -120,6 +128,7 @@ type stmtSummaryByDigestElement struct {
// basic
sampleSQL string
prevSQL string
samplePlan string
sampleUser string
indexNames []string
execCount int64
Expand Down Expand Up @@ -192,6 +201,8 @@ type StmtExecInfo struct {
Digest string
PrevSQL string
PrevSQLDigest string
PlanGenerator func() string
PlanDigest string
User string
TotalLatency time.Duration
ParseLatency time.Duration
Expand Down Expand Up @@ -231,6 +242,7 @@ func (ssMap *stmtSummaryByDigestMap) AddStatement(sei *StmtExecInfo) {
schemaName: sei.SchemaName,
digest: sei.Digest,
prevDigest: sei.PrevSQLDigest,
planDigest: sei.PlanDigest,
}

// Enclose the block in a function to ensure the lock will always be released.
Expand Down Expand Up @@ -476,14 +488,6 @@ func (ssMap *stmtSummaryByDigestMap) historySize() int {

// newStmtSummaryByDigest creates a stmtSummaryByDigest from StmtExecInfo.
func newStmtSummaryByDigest(sei *StmtExecInfo, beginTime int64, intervalSeconds int64, historySize int) *stmtSummaryByDigest {
// Trim SQL to size MaxSQLLength.
maxSQLLength := config.GetGlobalConfig().StmtSummary.MaxSQLLength
normalizedSQL := sei.NormalizedSQL
if len(normalizedSQL) > int(maxSQLLength) {
// Make sure the memory of original `normalizedSQL` will be released.
normalizedSQL = string([]byte(normalizedSQL[:maxSQLLength]))
}

// Use "," to separate table names to support FIND_IN_SET.
var buffer bytes.Buffer
for i, value := range sei.StmtCtx.Tables {
Expand All @@ -499,8 +503,9 @@ func newStmtSummaryByDigest(sei *StmtExecInfo, beginTime int64, intervalSeconds
ssbd := &stmtSummaryByDigest{
schemaName: sei.SchemaName,
digest: sei.Digest,
planDigest: sei.PlanDigest,
stmtType: strings.ToLower(sei.StmtCtx.StmtType),
normalizedSQL: normalizedSQL,
normalizedSQL: formatSQL(sei.NormalizedSQL),
tableNames: tableNames,
history: list.New(),
}
Expand Down Expand Up @@ -589,8 +594,17 @@ func (ssbd *stmtSummaryByDigest) collectHistorySummaries(historySize int) []*stm
}

func newStmtSummaryByDigestElement(sei *StmtExecInfo, beginTime int64, intervalSeconds int64) *stmtSummaryByDigestElement {
// sampleSQL / sampleUser / samplePlan / prevSQL / indexNames store the values shown at the first time,
// because it compacts performance to update every time.
ssElement := &stmtSummaryByDigestElement{
beginTime: beginTime,
beginTime: beginTime,
sampleSQL: formatSQL(sei.OriginalSQL),
// PrevSQL is already truncated to cfg.Log.QueryLogMaxLen.
prevSQL: sei.PrevSQL,
// samplePlan needs to be decoded so it can't be truncated.
samplePlan: sei.PlanGenerator(),
sampleUser: sei.User,
indexNames: sei.StmtCtx.IndexNames,
minLatency: sei.TotalLatency,
firstSeen: sei.StartTime,
lastSeen: sei.StartTime,
Expand Down Expand Up @@ -619,28 +633,11 @@ func (ssElement *stmtSummaryByDigestElement) onExpire(intervalSeconds int64) {
}

func (ssElement *stmtSummaryByDigestElement) add(sei *StmtExecInfo, intervalSeconds int64) {
maxSQLLength := config.GetGlobalConfig().StmtSummary.MaxSQLLength
sampleSQL := sei.OriginalSQL
if len(sampleSQL) > int(maxSQLLength) {
// Make sure the memory of original `sampleSQL` will be released.
sampleSQL = string([]byte(sampleSQL[:maxSQLLength]))
}
prevSQL := sei.PrevSQL
if len(prevSQL) > int(maxSQLLength) {
prevSQL = string([]byte(prevSQL[:maxSQLLength]))
}

ssElement.Lock()
defer ssElement.Unlock()

// refreshInterval may change anytime, update endTime ASAP.
ssElement.endTime = ssElement.beginTime + intervalSeconds
if sei.User != "" {
ssElement.sampleUser = sei.User
}
ssElement.sampleSQL = sampleSQL
ssElement.prevSQL = prevSQL
ssElement.indexNames = sei.StmtCtx.IndexNames
ssElement.execCount++

// latency
Expand Down Expand Up @@ -768,6 +765,12 @@ func (ssElement *stmtSummaryByDigestElement) toDatum(ssbd *stmtSummaryByDigest)
ssElement.Lock()
defer ssElement.Unlock()

plan, err := plancodec.DecodePlan(ssElement.samplePlan)
if err != nil {
logutil.Logger(context.Background()).Error("decode plan in statement summary failed", zap.String("plan", ssElement.samplePlan), zap.Error(err))
plan = ""
}

// Actually, there's a small chance that endTime is out of date, but it's hard to keep it up to date all the time.
return types.MakeDatums(
types.Time{Time: types.FromGoTime(time.Unix(ssElement.beginTime, 0)), Type: mysql.TypeTimestamp},
Expand Down Expand Up @@ -834,9 +837,21 @@ func (ssElement *stmtSummaryByDigestElement) toDatum(ssbd *stmtSummaryByDigest)
types.Time{Time: types.FromGoTime(ssElement.lastSeen), Type: mysql.TypeTimestamp},
ssElement.sampleSQL,
ssElement.prevSQL,
ssbd.planDigest,
plan,
)
}

// Truncate SQL to maxSQLLength.
func formatSQL(sql string) string {
maxSQLLength := config.GetGlobalConfig().StmtSummary.MaxSQLLength
length := len(sql)
if length > int(maxSQLLength) {
sql = fmt.Sprintf("%.*s(len:%d)", maxSQLLength, sql, length)
}
return sql
}

// Format the backoffType map to a string or nil.
func formatBackoffTypes(backoffMap map[fmt.Stringer]int) interface{} {
type backoffStat struct {
Expand Down
Loading

0 comments on commit e1d8b41

Please sign in to comment.