Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: refactor plan replayer implemention #39084

Merged
merged 20 commits into from
Nov 14, 2022
Merged
3 changes: 3 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"domainctx.go",
"optimize_trace.go",
"plan_replayer.go",
"plan_replayer_dump.go",
"schema_checker.go",
"schema_validator.go",
"sysvar_cache.go",
Expand Down Expand Up @@ -55,8 +56,10 @@ go_library(
"//util/logutil",
"//util/memory",
"//util/memoryusagealarm",
"//util/printer",
"//util/servermemorylimit",
"//util/sqlexec",
"@com_github_burntsushi_toml//:toml",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
13 changes: 10 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,9 +1533,16 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(ctx sessionctx.Context) {
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.sctxMu.sctx = ctx
do.dumpFileGcChecker.setupPlanReplayerHandle(do.planReplayerHandle)
do.planReplayerHandle = &planReplayerHandle{
planReplayerTaskCollectorHandle: &planReplayerTaskCollectorHandle{
sctx: ctx,
},
}
}

// SetupDumpFileGCChecker setup sctx
func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) {
do.dumpFileGcChecker.setupSctx(ctx)
}

var planReplayerHandleLease = 10 * time.Second
Expand Down
69 changes: 29 additions & 40 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ import (
// For now it is used by `plan replayer` and `trace plan` statement
type dumpFileGcChecker struct {
sync.Mutex
gcLease time.Duration
paths []string
planReplayerHandle *planReplayerHandle
gcLease time.Duration
paths []string
sctx sessionctx.Context
}

// GetPlanReplayerDirName returns plan replayer directory path.
Expand Down Expand Up @@ -85,8 +85,8 @@ func (p *dumpFileGcChecker) gcDumpFiles(t time.Duration) {
}
}

func (p *dumpFileGcChecker) setupPlanReplayerHandle(handle *planReplayerHandle) {
p.planReplayerHandle = handle
func (p *dumpFileGcChecker) setupSctx(sctx sessionctx.Context) {
p.sctx = sctx
}

func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
Expand All @@ -113,39 +113,36 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
continue
}
logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName))
if isPlanReplayer && p.planReplayerHandle != nil {
p.planReplayerHandle.deletePlanReplayerStatus(context.Background(), fileName)
if isPlanReplayer && p.sctx != nil {
deletePlanReplayerStatus(context.Background(), p.sctx, fileName)
}
}
}
}

type planReplayerHandle struct {
sctxMu struct {
sync.Mutex
sctx sessionctx.Context
}
*planReplayerTaskCollectorHandle
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
sctx sessionctx.Context
}

// DeletePlanReplayerStatus delete mysql.plan_replayer_status record
func (h *planReplayerHandle) deletePlanReplayerStatus(ctx context.Context, token string) {
func deletePlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token))
if err != nil {
logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err))
}
}

// InsertPlanReplayerStatus insert mysql.plan_replayer_status record
func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, records []PlanReplayerStatusRecord) {
// insertPlanReplayerStatus insert mysql.plan_replayer_status record
func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, records []PlanReplayerStatusRecord) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
var instance string
serverInfo, err := infosync.GetServerInfo()
Expand All @@ -158,18 +155,16 @@ func (h *planReplayerHandle) InsertPlanReplayerStatus(ctx context.Context, recor
for _, record := range records {
if !record.Internal {
if len(record.FailedReason) > 0 {
h.insertExternalPlanReplayerErrorStatusRecord(ctx1, instance, record)
insertExternalPlanReplayerErrorStatusRecord(ctx1, sctx, instance, record)
} else {
h.insertExternalPlanReplayerSuccessStatusRecord(ctx1, instance, record)
insertExternalPlanReplayerSuccessStatusRecord(ctx1, sctx, instance, record)
}
}
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func insertExternalPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, fail_reason, instance) values ('%s','%s','%s')",
record.OriginSQL, record.FailedReason, instance))
Expand All @@ -179,10 +174,8 @@ func (h *planReplayerHandle) insertExternalPlanReplayerErrorStatusRecord(ctx con
}
}

func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, instance string, record PlanReplayerStatusRecord) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func insertExternalPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (origin_sql, token, instance) values ('%s','%s','%s')",
record.OriginSQL, record.Token, instance))
Expand All @@ -193,15 +186,15 @@ func (h *planReplayerHandle) insertExternalPlanReplayerSuccessStatusRecord(ctx c
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerHandle) CollectPlanReplayerTask(ctx context.Context) error {
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask(ctx context.Context) error {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
allKeys, err := h.collectAllPlanReplayerTask(ctx1)
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := h.checkUnHandledReplayerTask(ctx1, key)
unhandled, err := checkUnHandledReplayerTask(ctx1, h.sctx, key)
if err != nil {
return err
}
Expand All @@ -214,7 +207,7 @@ func (h *planReplayerHandle) CollectPlanReplayerTask(ctx context.Context) error
}

// GetTasks get all tasks
func (h *planReplayerHandle) GetTasks() []PlanReplayerTaskKey {
func (h *planReplayerTaskCollectorHandle) GetTasks() []PlanReplayerTaskKey {
tasks := make([]PlanReplayerTaskKey, 0)
h.taskMu.RLock()
defer h.taskMu.RUnlock()
Expand All @@ -224,7 +217,7 @@ func (h *planReplayerHandle) GetTasks() []PlanReplayerTaskKey {
return tasks
}

func (h *planReplayerHandle) setupTasks(tasks []PlanReplayerTaskKey) {
func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey) {
r := make(map[PlanReplayerTaskKey]struct{})
for _, task := range tasks {
r[task] = struct{}{}
Expand All @@ -234,10 +227,8 @@ func (h *planReplayerHandle) setupTasks(tasks []PlanReplayerTaskKey) {
h.taskMu.tasks = r
}

func (h *planReplayerHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
exec := h.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
if err != nil {
return nil, err
Expand All @@ -261,10 +252,8 @@ func (h *planReplayerHandle) collectAllPlanReplayerTask(ctx context.Context) ([]
return allKeys, nil
}

func (h *planReplayerHandle) checkUnHandledReplayerTask(ctx context.Context, task PlanReplayerTaskKey) (bool, error) {
h.sctxMu.Lock()
defer h.sctxMu.Unlock()
exec := h.sctxMu.sctx.(sqlexec.SQLExecutor)
func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.sqlDigest, task.planDigest))
if err != nil {
return false, err
Expand Down
Loading