Skip to content

Commit

Permalink
domain: support dump plan replayer capture task during query (#39125)
Browse files Browse the repository at this point in the history
ref #38779
  • Loading branch information
Yisaer authored Nov 16, 2022
1 parent 3e8899e commit 5aa66cb
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 97 deletions.
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ go_library(
"//privilege/privileges",
"//sessionctx",
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
"//statistics/handle",
"//telemetry",
"//types",
Expand Down
39 changes: 30 additions & 9 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,11 +1532,17 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{
planReplayerTaskCollectorHandle: &planReplayerTaskCollectorHandle{
sctx: ctx,
},
func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{
ctx: ctx,
sctx: collectorSctx,
}
do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{
ctx: ctx,
sctx: dumperSctx,
taskCH: make(chan *PlanReplayerDumpTask, 16),
}
}

Expand All @@ -1557,27 +1563,42 @@ func (do *Domain) StartPlanReplayerHandle() {
if planReplayerHandleLease < 1 {
return
}
do.wg.Add(1)
do.wg.Add(2)
go func() {
tikcer := time.NewTicker(planReplayerHandleLease)
defer func() {
tikcer.Stop()
do.wg.Done()
logutil.BgLogger().Info("PlanReplayerHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerHandle", nil, false)
logutil.BgLogger().Info("PlanReplayerTaskCollectHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerTaskCollectHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case <-tikcer.C:
err := do.planReplayerHandle.CollectPlanReplayerTask(context.Background())
err := do.planReplayerHandle.CollectPlanReplayerTask()
if err != nil {
logutil.BgLogger().Warn("plan replayer handle collect tasks failed", zap.Error(err))
}
}
}
}()
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH:
do.planReplayerHandle.HandlePlanReplayerDumpTask(task)
}
}
}()
}

// GetPlanReplayerHandle returns plan replayer handle
Expand Down
188 changes: 163 additions & 25 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package domain

import (
"context"
"encoding/base64"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
Expand All @@ -33,7 +35,9 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -122,13 +126,24 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {

type planReplayerHandle struct {
*planReplayerTaskCollectorHandle
*planReplayerTaskDumpHandle
}

// HandlePlanReplayerDumpTask handle dump task
func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool {
success := h.dumpPlanReplayerDumpTask(task)
if success {
h.removeTask(task.PlanReplayerTaskKey)
}
return success
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
}

Expand All @@ -153,48 +168,45 @@ func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, reco
instance = fmt.Sprintf("%s:%d", serverInfo.IP, serverInfo.Port)
}
for _, record := range records {
if !record.Internal {
if len(record.FailedReason) > 0 {
insertExternalPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record)
} else {
insertExternalPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record)
}
if len(record.FailedReason) > 0 {
insertPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record)
} else {
insertPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record)
}
}
}

func insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.Error(err))
}
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
allKeys, err := h.collectAllPlanReplayerTask(ctx1)
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
allKeys, err := h.collectAllPlanReplayerTask(h.ctx)
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(ctx1, h.sctx, key)
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
return err
}
Expand Down Expand Up @@ -227,6 +239,12 @@ func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey
h.taskMu.tasks = r
}

func (h *planReplayerTaskCollectorHandle) removeTask(taskKey PlanReplayerTaskKey) {
h.taskMu.Lock()
defer h.taskMu.Unlock()
delete(h.taskMu.tasks, taskKey)
}

func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
exec := h.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
Expand All @@ -245,16 +263,96 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context
for _, row := range rows {
sqlDigest, planDigest := row.GetString(0), row.GetString(1)
allKeys = append(allKeys, PlanReplayerTaskKey{
sqlDigest: sqlDigest,
planDigest: planDigest,
SQLDigest: sqlDigest,
PlanDigest: planDigest,
})
}
return allKeys, nil
}

type planReplayerTaskDumpHandle struct {
ctx context.Context
sctx sessionctx.Context
taskCH chan *PlanReplayerDumpTask
}

// DrainTask drain a task for unit test
func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
}

// HandlePlanReplayerDumpTask handled the task
func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) {
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
// the task is processed, thus we directly skip it.
if !unhandled {
return true
}

file, fileName, err := GeneratePlanReplayerFile()
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return
}
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(h.sctx).InfoSchema()
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
jsStats[tblID] = r
}
err = DumpPlanReplayerInfo(h.ctx, h.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
return true
}

// SendTask send dumpTask in background task handler
func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) {
select {
case h.taskCH <- task:
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
}
}

func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest))
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.SQLDigest, task.PlanDigest))
if err != nil {
return false, err
}
Expand All @@ -274,26 +372,66 @@ func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, ta

// PlanReplayerStatusRecord indicates record in mysql.plan_replayer_status
type PlanReplayerStatusRecord struct {
Internal bool
SQLDigest string
PlanDigest string
OriginSQL string
Token string
FailedReason string
}

// PlanReplayerTaskKey indicates key of a plan replayer task
type PlanReplayerTaskKey struct {
sqlDigest string
planDigest string
SQLDigest string
PlanDigest string
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}

// variables used to dump the plan
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
JSONTblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool

FileName string
Zf *os.File
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
path := GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
if err != nil {
return nil, "", errors.AddStack(err)
}
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err := rand.Read(b)
if err != nil {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
}
9 changes: 5 additions & 4 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
}

// Dump stats
if err = dumpStats(zw, pairs, task.TblStats, do); err != nil {
if err = dumpStats(zw, pairs, task.JSONTblStats, do); err != nil {
return err
}

Expand Down Expand Up @@ -252,9 +252,10 @@ func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord {
if len(task.ExecStmts) > 0 {
for _, execStmt := range task.ExecStmts {
records = append(records, PlanReplayerStatusRecord{
OriginSQL: execStmt.Text(),
Token: task.FileName,
Internal: false,
SQLDigest: task.SQLDigest,
PlanDigest: task.PlanDigest,
OriginSQL: execStmt.Text(),
Token: task.FileName,
})
}
}
Expand Down
Loading

0 comments on commit 5aa66cb

Please sign in to comment.