Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support plan replayer multi sqls #37867

Merged
merged 17 commits into from
Sep 16, 2022
Merged
12 changes: 10 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,16 @@ func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executo
}
e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
ExecStmts: []ast.StmtNode{v.ExecStmt},
Analyze: v.Analyze,
DumpInfo: &PlanReplayerDumpInfo{
Analyze: v.Analyze,
Path: v.File,
ctx: b.ctx,
},
}
if v.ExecStmt != nil {
e.DumpInfo.ExecStmts = []ast.StmtNode{v.ExecStmt}
} else {
e.baseExecutor = newBaseExecutor(b.ctx, nil, v.ID())
}
return e
}
Expand Down
148 changes: 107 additions & 41 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,18 @@ const (
// PlanReplayerExec represents a plan replayer executor.
type PlanReplayerExec struct {
baseExecutor
DumpInfo *PlanReplayerDumpInfo
endFlag bool
}

// PlanReplayerDumpInfo indicates dump info
type PlanReplayerDumpInfo struct {
ExecStmts []ast.StmtNode
Analyze bool

endFlag bool
Path string
File *os.File
FileName string
ctx sessionctx.Context
}

type tableNamePair struct {
Expand Down Expand Up @@ -146,16 +154,55 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.endFlag {
return nil
}
if e.ExecStmts == nil {
err := e.createFile(domain.GetPlanReplayerDirName())
if err != nil {
return err
}
if len(e.DumpInfo.Path) > 0 {
err = e.prepare()
if err != nil {
return err
}
// As we can only read file from handleSpecialQuery, thus we store the file token in the session var during `dump`
// and return nil here.
e.endFlag = true
return nil
}
if e.DumpInfo.ExecStmts == nil {
return errors.New("plan replayer: sql is empty")
}
res, err := e.dump(ctx, domain.GetPlanReplayerDirName())
err = e.DumpInfo.dump(ctx)
if err != nil {
return err
}
req.AppendString(0, res)
req.AppendString(0, e.DumpInfo.FileName)
e.endFlag = true
e.ctx.GetSessionVars().LastPlanReplayerToken = res
return nil
}

func (e *PlanReplayerExec) createFile(path string) error {
// Create path
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return errors.AddStack(err)
}

// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err = rand.Read(b)
if err != nil {
return err
}
key := base64.URLEncoding.EncodeToString(b)
fileName := fmt.Sprintf("replayer_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return errors.AddStack(err)
}
e.DumpInfo.File = zf
e.DumpInfo.FileName = fileName
return nil
}

Expand Down Expand Up @@ -188,28 +235,9 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
|-explain2.txt
|-....
*/
func (e *PlanReplayerExec) dump(ctx context.Context, path string) (fileName string, err error) {
// Create path
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
return "", errors.AddStack(err)
}

// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err = rand.Read(b)
if err != nil {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
fileName = fmt.Sprintf("replayer_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return "", errors.AddStack(err)
}

func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
// Create zip writer
zw := zip.NewWriter(zf)
defer func() {
Expand All @@ -225,12 +253,12 @@ func (e *PlanReplayerExec) dump(ctx context.Context, path string) (fileName stri

// Dump config
if err = dumpConfig(zw); err != nil {
return "", err
return err
}

// Dump meta
if err = dumpMeta(zw); err != nil {
return "", err
return err
}

// Retrieve current DB
Expand All @@ -241,50 +269,51 @@ func (e *PlanReplayerExec) dump(ctx context.Context, path string) (fileName stri
// Retrieve all tables
pairs, err := e.extractTableNames(ctx, e.ExecStmts, dbName)
if err != nil {
return "", errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
return errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
}

// Dump Schema and View
if err = dumpSchemas(e.ctx, zw, pairs); err != nil {
return "", err
return err
}

// Dump tables tiflash replicas
if err = dumpTiFlashReplica(e.ctx, zw, pairs); err != nil {
return "", err
return err
}

// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return "", err
return err
}

// Dump variables
if err = dumpVariables(e.ctx, zw); err != nil {
return "", err
return err
}

// Dump sql
if err = dumpSQLs(e.ExecStmts, zw); err != nil {
return "", err
return err
}

// Dump session bindings
if err = dumpSessionBindings(e.ctx, zw); err != nil {
return "", err
return err
}

// Dump global bindings
if err = dumpGlobalBindings(e.ctx, zw); err != nil {
return "", err
return err
}

// Dump explain
if err = dumpExplain(e.ctx, zw, e.ExecStmts, e.Analyze); err != nil {
return "", err
return err
}

return fileName, nil
e.ctx.GetSessionVars().LastPlanReplayerToken = e.FileName
return nil
}

func dumpConfig(zw *zip.Writer) error {
Expand Down Expand Up @@ -498,7 +527,7 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod
return nil
}

func (e *PlanReplayerExec) extractTableNames(ctx context.Context,
func (e *PlanReplayerDumpInfo) extractTableNames(ctx context.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
ctx: ctx,
Expand Down Expand Up @@ -529,6 +558,34 @@ func (e *PlanReplayerExec) extractTableNames(ctx context.Context,
return r, nil
}

func (e *PlanReplayerExec) prepare() error {
val := e.ctx.Value(PlanReplayerDumpVarKey)
if val != nil {
e.ctx.SetValue(PlanReplayerDumpVarKey, nil)
return errors.New("plan replayer: previous plan replayer dump option isn't closed normally, please try again")
}
e.ctx.SetValue(PlanReplayerDumpVarKey, e.DumpInfo)
return nil
}

// DumpSQLsFromFile dumps plan replayer results for sqls from file
func (e *PlanReplayerDumpInfo) DumpSQLsFromFile(ctx context.Context, b []byte) error {
sqls := strings.Split(string(b), ";")
e.ExecStmts = make([]ast.StmtNode, 0)
for _, sql := range sqls {
s := strings.Trim(sql, "\n")
if len(s) < 1 {
continue
}
node, err := e.ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(ctx, s)
if err != nil {
return fmt.Errorf("parse sql error, sql:%v, err:%v", s, err)
}
e.ExecStmts = append(e.ExecStmts, node)
}
return e.dump(ctx)
}

func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable, error) {
is := do.InfoSchema()
h := do.StatsHandle()
Expand Down Expand Up @@ -644,6 +701,12 @@ type PlanReplayerLoadInfo struct {
Ctx sessionctx.Context
}

type planReplayerDumpKeyType int

func (k planReplayerDumpKeyType) String() string {
return "plan_replayer_dump_var"
}

type planReplayerLoadKeyType int

func (k planReplayerLoadKeyType) String() string {
Expand All @@ -653,6 +716,9 @@ func (k planReplayerLoadKeyType) String() string {
// PlanReplayerLoadVarKey is a variable key for plan replayer load.
const PlanReplayerLoadVarKey planReplayerLoadKeyType = 0

// PlanReplayerDumpVarKey is a variable key for plan replayer dump.
const PlanReplayerDumpVarKey planReplayerDumpKeyType = 1

// Next implements the Executor Next interface.
func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
Expand Down
27 changes: 27 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,24 @@ func (cc *clientConn) handlePlanReplayerLoad(ctx context.Context, planReplayerLo
return planReplayerLoadInfo.Update(data)
}

func (cc *clientConn) handlePlanReplayerDump(ctx context.Context, e *executor.PlanReplayerDumpInfo) error {
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if e == nil {
return errors.New("plan replayer dump: executor is empty")
}
data, err := cc.getDataFromPath(ctx, e.Path)
if err != nil {
logutil.BgLogger().Error(err.Error())
return err
}
if len(data) == 0 {
return nil
}
return e.DumpSQLsFromFile(ctx, data)
}

func (cc *clientConn) audit(eventType plugin.GeneralEvent) {
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
Expand Down Expand Up @@ -2117,6 +2135,15 @@ func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bo
}
}

planReplayerDump := cc.ctx.Value(executor.PlanReplayerDumpVarKey)
if planReplayerDump != nil {
handled = true
defer cc.ctx.SetValue(executor.PlanReplayerDumpVarKey, nil)
//nolint:forcetypeassert
if err := cc.handlePlanReplayerDump(ctx, planReplayerDump.(*executor.PlanReplayerDumpInfo)); err != nil {
return handled, err
}
}
return handled, cc.writeOkWith(ctx, mysql.OKHeader, true, status)
}

Expand Down