Skip to content

Commit

Permalink
session: make plan replayer capture support prepared stmt (#40167)
Browse files Browse the repository at this point in the history
close #40161
  • Loading branch information
Yisaer authored Jan 3, 2023
1 parent 702a559 commit 25a2479
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 155 deletions.
2 changes: 1 addition & 1 deletion domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//meta",
"//metrics",
"//owner",
"//parser",
"//parser/ast",
"//parser/model",
"//parser/mysql",
Expand All @@ -45,7 +46,6 @@ go_library(
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
"//statistics/handle",
"//telemetry",
"//ttl/ttlworker",
Expand Down
47 changes: 18 additions & 29 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ import (
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand Down Expand Up @@ -101,7 +100,7 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
logutil.BgLogger().Error("[dumpFileGcChecker] parseTime failed", zap.Error(err), zap.String("filename", fileName))
continue
}
isPlanReplayer := parseType(fileName) == "replayer"
isPlanReplayer := strings.Contains(fileName, "replayer")
if !createTime.After(gcTime) {
err := os.Remove(filepath.Join(path, f.Name()))
if err != nil {
Expand Down Expand Up @@ -410,7 +409,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
return true
}

file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsCapture)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -421,29 +420,18 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(w.sctx).InfoSchema()
if task.IsCapture && !task.IsContinuesCapture {
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
jsStats[tblID] = r
if task.InExecute && len(task.NormalizedSQL) > 0 {
p := parser.New()
stmts, _, err := p.ParseSQL(task.NormalizedSQL)
if err != nil {
logutil.BgLogger().Warn("[plan-replayer-capture] parse normalized sql failed",
zap.String("sql", task.NormalizedSQL),
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
task.JSONTblStats = jsStats
task.ExecStmts = stmts
}
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
Expand Down Expand Up @@ -538,15 +526,16 @@ type PlanReplayerDumpTask struct {
replayer.PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}
InExecute bool
NormalizedSQL string

// variables used to dump the plan
StartTS uint64
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
SessionVars *variable.SessionVars
JSONTblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool

Expand Down
19 changes: 7 additions & 12 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
return err
}

// For continues capture, we don't dump stats
if !task.IsContinuesCapture {
// For capture task, we don't dump stats
if !task.IsCapture {
// Dump stats
if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil {
if err = dumpStats(zw, pairs, do); err != nil {
return err
}
}
Expand Down Expand Up @@ -415,12 +415,12 @@ func dumpSchemaMeta(zw *zip.Writer, tables map[tableNamePair]struct{}) error {
return nil
}

func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, tblJSONStats map[int64]*handle.JSONTable, do *Domain) error {
func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *Domain) error {
for pair := range pairs {
if pair.IsView {
continue
}
jsonTbl, err := getStatsForTable(do, tblJSONStats, pair)
jsonTbl, err := getStatsForTable(do, pair)
if err != nil {
return err
}
Expand Down Expand Up @@ -653,19 +653,14 @@ func extractTableNames(ctx context.Context, sctx sessionctx.Context,
return r, nil
}

func getStatsForTable(do *Domain, tblJSONStats map[int64]*handle.JSONTable, pair tableNamePair) (*handle.JSONTable, error) {
func getStatsForTable(do *Domain, pair tableNamePair) (*handle.JSONTable, error) {
is := do.InfoSchema()
h := do.StatsHandle()
tbl, err := is.TableByName(model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName))
if err != nil {
return nil, err
}
js, ok := tblJSONStats[tbl.Meta().ID]
if ok && js != nil {
return js, nil
}
js, err = h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
return js, err
return h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
}

func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error {
Expand Down
97 changes: 97 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -58,6 +59,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
Expand Down Expand Up @@ -1365,6 +1367,18 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}

sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
Expand Down Expand Up @@ -1953,3 +1967,86 @@ func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model.
}
return r
}

// only allow select/delete/update/insert/execute stmt captured by continues capture
func checkPlanReplayerContinuesCaptureValidStmt(stmtNode ast.StmtNode) bool {
switch stmtNode.(type) {
case *ast.SelectStmt, *ast.DeleteStmt, *ast.UpdateStmt, *ast.InsertStmt, *ast.ExecuteStmt:
return true
default:
return false
}
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
}
}

func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
ExecStmts: []ast.StmtNode{stmtNode},
Analyze: false,
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
if _, ok := stmtNode.(*ast.ExecuteStmt); ok {
nsql, _ := sctx.GetSessionVars().StmtCtx.SQLDigest()
dumpTask.InExecute = true
dumpTask.NormalizedSQL = nsql
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu

// `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts
// Please notice that in RC isolation, the above two ts are the same
func (b *executorBuilder) getSnapshotTS() (uint64, error) {
func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) {
if b.forDataReaderBuilder {
return b.dataReaderTS, nil
}
Expand Down
Loading

0 comments on commit 25a2479

Please sign in to comment.