Skip to content

Commit

Permalink
domain: do not use fmt.Sprint to format SQL (#49620)
Browse files Browse the repository at this point in the history
close #49619
  • Loading branch information
Rustin170506 authored Dec 21, 2023
1 parent e4489f7 commit 11aefee
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ go_test(
],
embed = [":domain"],
flaky = True,
shard_count = 24,
shard_count = 25,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
43 changes: 32 additions & 11 deletions pkg/domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,33 +168,54 @@ func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, reco

func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance))
_, _, err := exec.ExecRestrictedSQL(
ctx, nil,
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values (%?,%?,%?,%?,%?)",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance,
)
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.String("sql", record.OriginSQL),
zap.String("failReason", record.FailedReason),
zap.String("instance", instance),
zap.Error(err))
}
}

func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
_, _, err := exec.ExecRestrictedSQL(
ctx,
nil,
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values (%?,%?,%?,%?,%?)",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance,
)
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.String("sql", record.OriginSQL),
zap.Error(err))
zap.String("token", record.Token),
zap.String("instance", instance),
zap.Error(err),
)
// try insert record without original sql
_, _, err = exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values ('%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.Token, instance))
_, _, err = exec.ExecRestrictedSQL(
ctx,
nil,
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values (%?,%?,%?,%?)",
record.SQLDigest, record.PlanDigest, record.Token, instance,
)
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.Error(err))
zap.String("token", record.Token),
zap.String("instance", instance),
zap.Error(err),
)
}
}
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/domain/plan_replayer_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,59 @@ func TestPlanReplayerGC(t *testing.T) {
require.NotNil(t, err)
require.True(t, os.IsNotExist(err))
}

func TestInsertPlanReplayerStatus(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
prHandle := dom.GetPlanReplayerHandle()
tk.MustExec("use test")
tk.MustExec(`
CREATE TABLE tableA (
columnA VARCHAR(255),
columnB DATETIME,
columnC VARCHAR(255)
)`)

// This is a single quote in the sql.
// We should escape it correctly.
sql := `
SELECT * from tableA where SUBSTRING_INDEX(tableA.columnC, '_', 1) = tableA.columnA
`

tk.MustQuery(sql)
_, d := tk.Session().GetSessionVars().StmtCtx.SQLDigest()
_, pd := tk.Session().GetSessionVars().StmtCtx.GetPlanDigest()
sqlDigest := d.String()
planDigest := pd.String()

// Register task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec(fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%v','%v');", sqlDigest, planDigest))
err := prHandle.CollectPlanReplayerTask()
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 1)

tk.MustExec("SET @@tidb_enable_plan_replayer_capture = ON;")

// Capture task and dump
tk.MustQuery(sql)
task := prHandle.DrainTask()
require.NotNil(t, task)
worker := prHandle.GetWorker()
success := worker.HandleTask(task)
defer os.RemoveAll(replayer.GetPlanReplayerDirName())
require.True(t, success)
require.Equal(t, prHandle.GetTaskStatus().GetRunningTaskStatusLen(), 0)
// assert memory task consumed
require.Len(t, prHandle.GetTasks(), 0)

// Check the plan_replayer_status.
// We should store the origin sql correctly.
rows := tk.MustQuery(
"select * from mysql.plan_replayer_status where sql_digest = ? and plan_digest = ? and origin_sql is not null",
sqlDigest,
planDigest,
).Rows()
require.Len(t, rows, 1)
}

0 comments on commit 11aefee

Please sign in to comment.