Skip to content

Commit

Permalink
domain: support plan replayer dump workers running in concurrency (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Dec 7, 2022
1 parent b33ff62 commit a0a16d7
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 80 deletions.
38 changes: 20 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,15 @@ type Performance struct {
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`

IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

Expand Down Expand Up @@ -923,16 +924,17 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
PlanReplayerDumpWorkerConcurrency: 1,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
34 changes: 23 additions & 11 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,17 +1573,31 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) {
func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, workersSctxs []sessionctx.Context) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{
ctx: ctx,
sctx: collectorSctx,
}
taskCH := make(chan *PlanReplayerDumpTask, 16)
taskStatus := &planReplayerDumpTaskStatus{}
taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{}

do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{
ctx: ctx,
sctx: dumperSctx,
taskCH: make(chan *PlanReplayerDumpTask, 16),
taskCH: taskCH,
status: taskStatus,
}
do.planReplayerHandle.planReplayerTaskDumpHandle.workers = make([]*planReplayerTaskDumpWorker, 0)
for i := 0; i < len(workersSctxs); i++ {
worker := &planReplayerTaskDumpWorker{
ctx: ctx,
sctx: workersSctxs[i],
taskCH: taskCH,
status: taskStatus,
}
do.planReplayerHandle.planReplayerTaskDumpHandle.workers = append(do.planReplayerHandle.planReplayerTaskDumpHandle.workers, worker)
}
}

Expand All @@ -1598,6 +1612,7 @@ func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) {
// SetupDumpFileGCChecker setup sctx
func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) {
do.dumpFileGcChecker.setupSctx(ctx)
do.dumpFileGcChecker.planReplayerTaskStatus = do.planReplayerHandle.status
}

var planReplayerHandleLease atomic.Uint64
Expand Down Expand Up @@ -1650,14 +1665,11 @@ func (do *Domain) StartPlanReplayerHandle() {
logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH:
do.planReplayerHandle.HandlePlanReplayerDumpTask(task)
}
for _, worker := range do.planReplayerHandle.planReplayerTaskDumpHandle.workers {
go worker.run()
}
<-do.exit
do.planReplayerHandle.planReplayerTaskDumpHandle.Close()
}()
}

Expand Down
194 changes: 149 additions & 45 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ import (
// For now it is used by `plan replayer` and `trace plan` statement
type dumpFileGcChecker struct {
sync.Mutex
gcLease time.Duration
paths []string
sctx sessionctx.Context
gcLease time.Duration
paths []string
sctx sessionctx.Context
planReplayerTaskStatus *planReplayerDumpTaskStatus
}

// GetPlanReplayerDirName returns plan replayer directory path.
Expand Down Expand Up @@ -119,34 +120,12 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName))
if isPlanReplayer && p.sctx != nil {
deletePlanReplayerStatus(context.Background(), p.sctx, fileName)
p.planReplayerTaskStatus.clearFinishedTask()
}
}
}
}

type planReplayerHandle struct {
*planReplayerTaskCollectorHandle
*planReplayerTaskDumpHandle
}

// HandlePlanReplayerDumpTask handle dump task
func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool {
success := h.dumpPlanReplayerDumpTask(task)
if success {
h.removeTask(task.PlanReplayerTaskKey)
}
return success
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
}

func deletePlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
exec := sctx.(sqlexec.SQLExecutor)
Expand Down Expand Up @@ -198,6 +177,35 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.
}
}

type planReplayerHandle struct {
*planReplayerTaskCollectorHandle
*planReplayerTaskDumpHandle
}

// SendTask send dumpTask in background task handler
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
select {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
logutil.BgLogger().Info("discard one plan replayer dump task",
zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest))
}
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
allKeys, err := h.collectAllPlanReplayerTask(h.ctx)
Expand Down Expand Up @@ -270,21 +278,100 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context
return allKeys, nil
}

type planReplayerTaskDumpHandle struct {
type planReplayerDumpTaskStatus struct {
// running task records the task running by all workers in order to avoid multi workers running the same task key
runningTaskMu struct {
sync.RWMutex
runningTasks map[PlanReplayerTaskKey]struct{}
}

// finished task records the finished task in order to avoid running finished task key
finishedTaskMu struct {
sync.RWMutex
finishedTask map[PlanReplayerTaskKey]struct{}
}
}

// GetRunningTaskStatusLen used for unit test
func (r *planReplayerDumpTaskStatus) GetRunningTaskStatusLen() int {
r.runningTaskMu.RLock()
defer r.runningTaskMu.RUnlock()
return len(r.runningTaskMu.runningTasks)
}

// GetFinishedTaskStatusLen used for unit test
func (r *planReplayerDumpTaskStatus) GetFinishedTaskStatusLen() int {
r.finishedTaskMu.RLock()
defer r.finishedTaskMu.RUnlock()
return len(r.finishedTaskMu.finishedTask)
}

func (r *planReplayerDumpTaskStatus) occupyRunningTaskKey(task *PlanReplayerDumpTask) bool {
r.runningTaskMu.Lock()
defer r.runningTaskMu.Unlock()
_, ok := r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey]
if ok {
return false
}
r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey] = struct{}{}
return true
}

func (r *planReplayerDumpTaskStatus) releaseRunningTaskKey(task *PlanReplayerDumpTask) {
r.runningTaskMu.Lock()
defer r.runningTaskMu.Unlock()
delete(r.runningTaskMu.runningTasks, task.PlanReplayerTaskKey)
}

func (r *planReplayerDumpTaskStatus) checkTaskKeyFinishedBefore(task *PlanReplayerDumpTask) bool {
r.finishedTaskMu.RLock()
defer r.finishedTaskMu.RUnlock()
_, ok := r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey]
return ok
}

func (r *planReplayerDumpTaskStatus) setTaskFinished(task *PlanReplayerDumpTask) {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey] = struct{}{}
}

func (r *planReplayerDumpTaskStatus) clearFinishedTask() {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
}

type planReplayerTaskDumpWorker struct {
ctx context.Context
sctx sessionctx.Context
taskCH chan *PlanReplayerDumpTask
taskCH <-chan *PlanReplayerDumpTask
status *planReplayerDumpTaskStatus
}

// DrainTask drain a task for unit test
func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
func (w *planReplayerTaskDumpWorker) run() {
for task := range w.taskCH {
if w.status.checkTaskKeyFinishedBefore(task) {
continue
}
successOccupy := w.status.occupyRunningTaskKey(task)
if !successOccupy {
continue
}
w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}
}

// HandlePlanReplayerDumpTask handled the task
func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) {
// HandleTask handled task
func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (success bool) {
defer func() {
if success {
w.status.setTaskFinished(task)
}
}()
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey)
unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -303,13 +390,13 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return
return false
}
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(h.sctx).InfoSchema()
is := GetDomain(w.sctx).InfoSchema()
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
Expand All @@ -329,7 +416,8 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer
}
jsStats[tblID] = r
}
err = DumpPlanReplayerInfo(h.ctx, h.sctx, task)
task.JSONTblStats = jsStats
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -340,14 +428,30 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer
return true
}

// SendTask send dumpTask in background task handler
func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) {
select {
case h.taskCH <- task:
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
}
type planReplayerTaskDumpHandle struct {
taskCH chan *PlanReplayerDumpTask
status *planReplayerDumpTaskStatus
workers []*planReplayerTaskDumpWorker
}

// GetTaskStatus used for test
func (h *planReplayerTaskDumpHandle) GetTaskStatus() *planReplayerDumpTaskStatus {
return h.status
}

// GetWorker used for test
func (h *planReplayerTaskDumpHandle) GetWorker() *planReplayerTaskDumpWorker {
return h.workers[0]
}

// Close make finished flag ture
func (h *planReplayerTaskDumpHandle) Close() {
close(h.taskCH)
}

// DrainTask drain a task for unit test
func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
}

func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
Expand Down
5 changes: 4 additions & 1 deletion domain/plan_replayer_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ func TestPlanReplayerHandleDumpTask(t *testing.T) {
tk.MustQuery("select * from t;")
task := prHandle.DrainTask()
require.NotNil(t, task)
success := prHandle.HandlePlanReplayerDumpTask(task)
worker := prHandle.GetWorker()
success := worker.HandleTask(task)
require.True(t, success)
require.Equal(t, prHandle.GetTaskStatus().GetRunningTaskStatusLen(), 0)
require.Equal(t, prHandle.GetTaskStatus().GetFinishedTaskStatusLen(), 1)
// assert memory task consumed
require.Len(t, prHandle.GetTasks(), 0)

Expand Down
Loading

0 comments on commit a0a16d7

Please sign in to comment.