From a0a16d7c273e023a94e64b831140f16f73bd8a7a Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 7 Dec 2022 13:38:03 +0800 Subject: [PATCH] domain: support plan replayer dump workers running in concurrency (#39347) ref pingcap/tidb#38779 --- config/config.go | 38 +++--- domain/domain.go | 34 +++-- domain/plan_replayer.go | 194 +++++++++++++++++++++------- domain/plan_replayer_handle_test.go | 5 +- executor/compiler.go | 11 +- session/session.go | 17 ++- 6 files changed, 219 insertions(+), 80 deletions(-) diff --git a/config/config.go b/config/config.go index d3962e917285b..4a6b28ed12a1d 100644 --- a/config/config.go +++ b/config/config.go @@ -658,14 +658,15 @@ type Performance struct { // Deprecated MemProfileInterval string `toml:"-" json:"-"` - IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` - PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` - GOGC int `toml:"gogc" json:"gogc"` - EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` - StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` - StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` - AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"` - EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` + IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` + PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` + GOGC int `toml:"gogc" json:"gogc"` + EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` + StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` + StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` + AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"` + PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"` + EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` // The following items are deprecated. We need to keep them here temporarily // to support the upgrade process. They can be removed in future. @@ -923,16 +924,17 @@ var defaultConf = Config{ CommitterConcurrency: defTiKVCfg.CommitterConcurrency, MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour // TODO: set indexUsageSyncLease to 60s. - IndexUsageSyncLease: "0s", - GOGC: 100, - EnforceMPP: false, - PlanReplayerGCLease: "10m", - StatsLoadConcurrency: 5, - StatsLoadQueueSize: 1000, - AnalyzePartitionConcurrencyQuota: 16, - EnableStatsCacheMemQuota: false, - RunAutoAnalyze: true, - EnableLoadFMSketch: false, + IndexUsageSyncLease: "0s", + GOGC: 100, + EnforceMPP: false, + PlanReplayerGCLease: "10m", + StatsLoadConcurrency: 5, + StatsLoadQueueSize: 1000, + AnalyzePartitionConcurrencyQuota: 16, + PlanReplayerDumpWorkerConcurrency: 1, + EnableStatsCacheMemQuota: false, + RunAutoAnalyze: true, + EnableLoadFMSketch: false, }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/domain/domain.go b/domain/domain.go index c7a470b1bad78..128260c324287 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1573,17 +1573,31 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) { } // SetupPlanReplayerHandle setup plan replayer handle -func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) { +func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, workersSctxs []sessionctx.Context) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) do.planReplayerHandle = &planReplayerHandle{} do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{ ctx: ctx, sctx: collectorSctx, } + taskCH := make(chan *PlanReplayerDumpTask, 16) + taskStatus := &planReplayerDumpTaskStatus{} + taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{} + taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{} + do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{ - ctx: ctx, - sctx: dumperSctx, - taskCH: make(chan *PlanReplayerDumpTask, 16), + taskCH: taskCH, + status: taskStatus, + } + do.planReplayerHandle.planReplayerTaskDumpHandle.workers = make([]*planReplayerTaskDumpWorker, 0) + for i := 0; i < len(workersSctxs); i++ { + worker := &planReplayerTaskDumpWorker{ + ctx: ctx, + sctx: workersSctxs[i], + taskCH: taskCH, + status: taskStatus, + } + do.planReplayerHandle.planReplayerTaskDumpHandle.workers = append(do.planReplayerHandle.planReplayerTaskDumpHandle.workers, worker) } } @@ -1598,6 +1612,7 @@ func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) { // SetupDumpFileGCChecker setup sctx func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) { do.dumpFileGcChecker.setupSctx(ctx) + do.dumpFileGcChecker.planReplayerTaskStatus = do.planReplayerHandle.status } var planReplayerHandleLease atomic.Uint64 @@ -1650,14 +1665,11 @@ func (do *Domain) StartPlanReplayerHandle() { logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.") util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false) }() - for { - select { - case <-do.exit: - return - case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH: - do.planReplayerHandle.HandlePlanReplayerDumpTask(task) - } + for _, worker := range do.planReplayerHandle.planReplayerTaskDumpHandle.workers { + go worker.run() } + <-do.exit + do.planReplayerHandle.planReplayerTaskDumpHandle.Close() }() } diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index f20db239a6ca4..d237445f5404d 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -49,9 +49,10 @@ import ( // For now it is used by `plan replayer` and `trace plan` statement type dumpFileGcChecker struct { sync.Mutex - gcLease time.Duration - paths []string - sctx sessionctx.Context + gcLease time.Duration + paths []string + sctx sessionctx.Context + planReplayerTaskStatus *planReplayerDumpTaskStatus } // GetPlanReplayerDirName returns plan replayer directory path. @@ -119,34 +120,12 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) { logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName)) if isPlanReplayer && p.sctx != nil { deletePlanReplayerStatus(context.Background(), p.sctx, fileName) + p.planReplayerTaskStatus.clearFinishedTask() } } } } -type planReplayerHandle struct { - *planReplayerTaskCollectorHandle - *planReplayerTaskDumpHandle -} - -// HandlePlanReplayerDumpTask handle dump task -func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool { - success := h.dumpPlanReplayerDumpTask(task) - if success { - h.removeTask(task.PlanReplayerTaskKey) - } - return success -} - -type planReplayerTaskCollectorHandle struct { - taskMu struct { - sync.RWMutex - tasks map[PlanReplayerTaskKey]struct{} - } - ctx context.Context - sctx sessionctx.Context -} - func deletePlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, token string) { ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) exec := sctx.(sqlexec.SQLExecutor) @@ -198,6 +177,35 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx. } } +type planReplayerHandle struct { + *planReplayerTaskCollectorHandle + *planReplayerTaskDumpHandle +} + +// SendTask send dumpTask in background task handler +func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) { + select { + case h.planReplayerTaskDumpHandle.taskCH <- task: + // we directly remove the task key if we put task in channel successfully, if the task was failed to dump, + // the task handle will re-add the task in next loop + h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey) + default: + // TODO: add metrics here + // directly discard the task if the task channel is full in order not to block the query process + logutil.BgLogger().Info("discard one plan replayer dump task", + zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest)) + } +} + +type planReplayerTaskCollectorHandle struct { + taskMu struct { + sync.RWMutex + tasks map[PlanReplayerTaskKey]struct{} + } + ctx context.Context + sctx sessionctx.Context +} + // CollectPlanReplayerTask collects all unhandled plan replayer task func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error { allKeys, err := h.collectAllPlanReplayerTask(h.ctx) @@ -270,21 +278,100 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context return allKeys, nil } -type planReplayerTaskDumpHandle struct { +type planReplayerDumpTaskStatus struct { + // running task records the task running by all workers in order to avoid multi workers running the same task key + runningTaskMu struct { + sync.RWMutex + runningTasks map[PlanReplayerTaskKey]struct{} + } + + // finished task records the finished task in order to avoid running finished task key + finishedTaskMu struct { + sync.RWMutex + finishedTask map[PlanReplayerTaskKey]struct{} + } +} + +// GetRunningTaskStatusLen used for unit test +func (r *planReplayerDumpTaskStatus) GetRunningTaskStatusLen() int { + r.runningTaskMu.RLock() + defer r.runningTaskMu.RUnlock() + return len(r.runningTaskMu.runningTasks) +} + +// GetFinishedTaskStatusLen used for unit test +func (r *planReplayerDumpTaskStatus) GetFinishedTaskStatusLen() int { + r.finishedTaskMu.RLock() + defer r.finishedTaskMu.RUnlock() + return len(r.finishedTaskMu.finishedTask) +} + +func (r *planReplayerDumpTaskStatus) occupyRunningTaskKey(task *PlanReplayerDumpTask) bool { + r.runningTaskMu.Lock() + defer r.runningTaskMu.Unlock() + _, ok := r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey] + if ok { + return false + } + r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey] = struct{}{} + return true +} + +func (r *planReplayerDumpTaskStatus) releaseRunningTaskKey(task *PlanReplayerDumpTask) { + r.runningTaskMu.Lock() + defer r.runningTaskMu.Unlock() + delete(r.runningTaskMu.runningTasks, task.PlanReplayerTaskKey) +} + +func (r *planReplayerDumpTaskStatus) checkTaskKeyFinishedBefore(task *PlanReplayerDumpTask) bool { + r.finishedTaskMu.RLock() + defer r.finishedTaskMu.RUnlock() + _, ok := r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey] + return ok +} + +func (r *planReplayerDumpTaskStatus) setTaskFinished(task *PlanReplayerDumpTask) { + r.finishedTaskMu.Lock() + defer r.finishedTaskMu.Unlock() + r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey] = struct{}{} +} + +func (r *planReplayerDumpTaskStatus) clearFinishedTask() { + r.finishedTaskMu.Lock() + defer r.finishedTaskMu.Unlock() + r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{} +} + +type planReplayerTaskDumpWorker struct { ctx context.Context sctx sessionctx.Context - taskCH chan *PlanReplayerDumpTask + taskCH <-chan *PlanReplayerDumpTask + status *planReplayerDumpTaskStatus } -// DrainTask drain a task for unit test -func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask { - return <-h.taskCH +func (w *planReplayerTaskDumpWorker) run() { + for task := range w.taskCH { + if w.status.checkTaskKeyFinishedBefore(task) { + continue + } + successOccupy := w.status.occupyRunningTaskKey(task) + if !successOccupy { + continue + } + w.HandleTask(task) + w.status.releaseRunningTaskKey(task) + } } -// HandlePlanReplayerDumpTask handled the task -func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) { +// HandleTask handled task +func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (success bool) { + defer func() { + if success { + w.status.setTaskFinished(task) + } + }() taskKey := task.PlanReplayerTaskKey - unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey) + unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey) if err != nil { logutil.BgLogger().Warn("check plan replayer capture task failed", zap.String("sqlDigest", taskKey.SQLDigest), @@ -303,13 +390,13 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer zap.String("sqlDigest", taskKey.SQLDigest), zap.String("planDigest", taskKey.PlanDigest), zap.Error(err)) - return + return false } task.Zf = file task.FileName = fileName task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false) jsStats := make(map[int64]*handle.JSONTable) - is := GetDomain(h.sctx).InfoSchema() + is := GetDomain(w.sctx).InfoSchema() for tblID, stat := range task.TblStats { tbl, ok := is.TableByID(tblID) if !ok { @@ -329,7 +416,8 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer } jsStats[tblID] = r } - err = DumpPlanReplayerInfo(h.ctx, h.sctx, task) + task.JSONTblStats = jsStats + err = DumpPlanReplayerInfo(w.ctx, w.sctx, task) if err != nil { logutil.BgLogger().Warn("dump plan replayer capture task result failed", zap.String("sqlDigest", taskKey.SQLDigest), @@ -340,14 +428,30 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer return true } -// SendTask send dumpTask in background task handler -func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) { - select { - case h.taskCH <- task: - default: - // TODO: add metrics here - // directly discard the task if the task channel is full in order not to block the query process - } +type planReplayerTaskDumpHandle struct { + taskCH chan *PlanReplayerDumpTask + status *planReplayerDumpTaskStatus + workers []*planReplayerTaskDumpWorker +} + +// GetTaskStatus used for test +func (h *planReplayerTaskDumpHandle) GetTaskStatus() *planReplayerDumpTaskStatus { + return h.status +} + +// GetWorker used for test +func (h *planReplayerTaskDumpHandle) GetWorker() *planReplayerTaskDumpWorker { + return h.workers[0] +} + +// Close make finished flag ture +func (h *planReplayerTaskDumpHandle) Close() { + close(h.taskCH) +} + +// DrainTask drain a task for unit test +func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask { + return <-h.taskCH } func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) { diff --git a/domain/plan_replayer_handle_test.go b/domain/plan_replayer_handle_test.go index 5a824ef4eeeb6..dccb400ecd5b6 100644 --- a/domain/plan_replayer_handle_test.go +++ b/domain/plan_replayer_handle_test.go @@ -89,8 +89,11 @@ func TestPlanReplayerHandleDumpTask(t *testing.T) { tk.MustQuery("select * from t;") task := prHandle.DrainTask() require.NotNil(t, task) - success := prHandle.HandlePlanReplayerDumpTask(task) + worker := prHandle.GetWorker() + success := worker.HandleTask(task) require.True(t, success) + require.Equal(t, prHandle.GetTaskStatus().GetRunningTaskStatusLen(), 0) + require.Equal(t, prHandle.GetTaskStatus().GetFinishedTaskStatusLen(), 1) // assert memory task consumed require.Len(t, prHandle.GetTasks(), 0) diff --git a/executor/compiler.go b/executor/compiler.go index 6d62b6e3272cf..5d16a4fbea6e7 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -164,12 +164,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) { - tasks := domain.GetDomain(sctx).GetPlanReplayerHandle().GetTasks() + dom := domain.GetDomain(sctx) + if dom == nil { + return + } + handle := dom.GetPlanReplayerHandle() + if handle == nil { + return + } + tasks := handle.GetTasks() _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) for _, task := range tasks { if task.SQLDigest == sqlDigest.String() && task.PlanDigest == planDigest.String() { sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode) + return } } } diff --git a/session/session.go b/session/session.go index c0630eed678b2..495f86feb5dc9 100644 --- a/session/session.go +++ b/session/session.go @@ -2973,7 +2973,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 10) + ses, err := createSessions(store, 9) if err != nil { return nil, err } @@ -3047,14 +3047,23 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { }() } + planReplayerWorkerCnt := config.GetGlobalConfig().Performance.PlanReplayerDumpWorkerConcurrency + planReplayerWorkersSctx := make([]sessionctx.Context, planReplayerWorkerCnt) + pworkerSes, err := createSessions(store, int(planReplayerWorkerCnt)) + if err != nil { + return nil, err + } + for i := 0; i < int(planReplayerWorkerCnt); i++ { + planReplayerWorkersSctx[i] = pworkerSes[i] + } // setup plan replayer handle - dom.SetupPlanReplayerHandle(ses[6], ses[7]) + dom.SetupPlanReplayerHandle(ses[6], planReplayerWorkersSctx) dom.StartPlanReplayerHandle() // setup dumpFileGcChecker - dom.SetupDumpFileGCChecker(ses[8]) + dom.SetupDumpFileGCChecker(ses[7]) dom.DumpFileGcCheckerLoop() // setup historical stats worker - dom.SetupHistoricalStatsWorker(ses[9]) + dom.SetupHistoricalStatsWorker(ses[8]) dom.StartHistoricalStatsWorker() // A sub context for update table stats, and other contexts for concurrent stats loading.