Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: support plan replayer dump workers running in concurrency #39347

Merged
merged 12 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,15 @@ type Performance struct {
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`

IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a SQL variable instead of a config item? SQL variables are much easier to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't adjust worker count in runtime thus we use config.

EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

Expand Down Expand Up @@ -923,16 +924,17 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
PlanReplayerDumpWorkerConcurrency: 1,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
34 changes: 23 additions & 11 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,17 +1573,31 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) {
func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, workersSctxs []sessionctx.Context) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{
ctx: ctx,
sctx: collectorSctx,
}
taskCH := make(chan *PlanReplayerDumpTask, 16)
taskStatus := &planReplayerDumpTaskStatus{}
taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{}

do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{
ctx: ctx,
sctx: dumperSctx,
taskCH: make(chan *PlanReplayerDumpTask, 16),
taskCH: taskCH,
status: taskStatus,
}
do.planReplayerHandle.planReplayerTaskDumpHandle.workers = make([]*planReplayerTaskDumpWorker, 0)
for i := 0; i < len(workersSctxs); i++ {
worker := &planReplayerTaskDumpWorker{
ctx: ctx,
sctx: workersSctxs[i],
taskCH: taskCH,
status: taskStatus,
}
do.planReplayerHandle.planReplayerTaskDumpHandle.workers = append(do.planReplayerHandle.planReplayerTaskDumpHandle.workers, worker)
}
}

Expand All @@ -1598,6 +1612,7 @@ func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) {
// SetupDumpFileGCChecker setup sctx
func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) {
do.dumpFileGcChecker.setupSctx(ctx)
do.dumpFileGcChecker.planReplayerTaskStatus = do.planReplayerHandle.status
}

var planReplayerHandleLease atomic.Uint64
Expand Down Expand Up @@ -1650,14 +1665,11 @@ func (do *Domain) StartPlanReplayerHandle() {
logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH:
do.planReplayerHandle.HandlePlanReplayerDumpTask(task)
}
for _, worker := range do.planReplayerHandle.planReplayerTaskDumpHandle.workers {
go worker.run()
}
<-do.exit
do.planReplayerHandle.planReplayerTaskDumpHandle.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to also put Close in defer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

}()
}

Expand Down
194 changes: 149 additions & 45 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ import (
// For now it is used by `plan replayer` and `trace plan` statement
type dumpFileGcChecker struct {
sync.Mutex
gcLease time.Duration
paths []string
sctx sessionctx.Context
gcLease time.Duration
paths []string
sctx sessionctx.Context
planReplayerTaskStatus *planReplayerDumpTaskStatus
}

// GetPlanReplayerDirName returns plan replayer directory path.
Expand Down Expand Up @@ -119,34 +120,12 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName))
if isPlanReplayer && p.sctx != nil {
deletePlanReplayerStatus(context.Background(), p.sctx, fileName)
p.planReplayerTaskStatus.clearFinishedTask()
}
}
}
}

type planReplayerHandle struct {
*planReplayerTaskCollectorHandle
*planReplayerTaskDumpHandle
}

// HandlePlanReplayerDumpTask handle dump task
func (h *planReplayerHandle) HandlePlanReplayerDumpTask(task *PlanReplayerDumpTask) bool {
success := h.dumpPlanReplayerDumpTask(task)
if success {
h.removeTask(task.PlanReplayerTaskKey)
}
return success
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
}

func deletePlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, token string) {
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
exec := sctx.(sqlexec.SQLExecutor)
Expand Down Expand Up @@ -198,6 +177,35 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.
}
}

type planReplayerHandle struct {
*planReplayerTaskCollectorHandle
*planReplayerTaskDumpHandle
}

// SendTask send dumpTask in background task handler
func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
select {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
logutil.BgLogger().Info("discard one plan replayer dump task",
zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest))
}
}

type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
}

// CollectPlanReplayerTask collects all unhandled plan replayer task
func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
allKeys, err := h.collectAllPlanReplayerTask(h.ctx)
Expand Down Expand Up @@ -270,21 +278,100 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context
return allKeys, nil
}

type planReplayerTaskDumpHandle struct {
type planReplayerDumpTaskStatus struct {
// running task records the task running by all workers in order to avoid multi workers running the same task key
runningTaskMu struct {
sync.RWMutex
runningTasks map[PlanReplayerTaskKey]struct{}
}

// finished task records the finished task in order to avoid running finished task key
finishedTaskMu struct {
sync.RWMutex
finishedTask map[PlanReplayerTaskKey]struct{}
}
}

// GetRunningTaskStatusLen used for unit test
func (r *planReplayerDumpTaskStatus) GetRunningTaskStatusLen() int {
r.runningTaskMu.RLock()
defer r.runningTaskMu.RUnlock()
return len(r.runningTaskMu.runningTasks)
}

// GetFinishedTaskStatusLen used for unit test
func (r *planReplayerDumpTaskStatus) GetFinishedTaskStatusLen() int {
r.finishedTaskMu.RLock()
defer r.finishedTaskMu.RUnlock()
return len(r.finishedTaskMu.finishedTask)
}

func (r *planReplayerDumpTaskStatus) occupyRunningTaskKey(task *PlanReplayerDumpTask) bool {
r.runningTaskMu.Lock()
defer r.runningTaskMu.Unlock()
_, ok := r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey]
if ok {
return false
}
r.runningTaskMu.runningTasks[task.PlanReplayerTaskKey] = struct{}{}
return true
}

func (r *planReplayerDumpTaskStatus) releaseRunningTaskKey(task *PlanReplayerDumpTask) {
r.runningTaskMu.Lock()
defer r.runningTaskMu.Unlock()
delete(r.runningTaskMu.runningTasks, task.PlanReplayerTaskKey)
}

func (r *planReplayerDumpTaskStatus) checkTaskKeyFinishedBefore(task *PlanReplayerDumpTask) bool {
r.finishedTaskMu.RLock()
defer r.finishedTaskMu.RUnlock()
_, ok := r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey]
return ok
}

func (r *planReplayerDumpTaskStatus) setTaskFinished(task *PlanReplayerDumpTask) {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask[task.PlanReplayerTaskKey] = struct{}{}
}

func (r *planReplayerDumpTaskStatus) clearFinishedTask() {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
}

type planReplayerTaskDumpWorker struct {
ctx context.Context
sctx sessionctx.Context
taskCH chan *PlanReplayerDumpTask
taskCH <-chan *PlanReplayerDumpTask
status *planReplayerDumpTaskStatus
}

// DrainTask drain a task for unit test
func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
func (w *planReplayerTaskDumpWorker) run() {
for task := range w.taskCH {
if w.status.checkTaskKeyFinishedBefore(task) {
continue
}
successOccupy := w.status.occupyRunningTaskKey(task)
if !successOccupy {
continue
}
w.HandleTask(task)
w.status.releaseRunningTaskKey(task)
}
}

// HandlePlanReplayerDumpTask handled the task
func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayerDumpTask) (success bool) {
// HandleTask handled task
func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (success bool) {
defer func() {
if success {
w.status.setTaskFinished(task)
}
}()
taskKey := task.PlanReplayerTaskKey
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, taskKey)
unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey)
if err != nil {
logutil.BgLogger().Warn("check plan replayer capture task failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -303,13 +390,13 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return
return false
}
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(h.sctx).InfoSchema()
is := GetDomain(w.sctx).InfoSchema()
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
Expand All @@ -329,7 +416,8 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer
}
jsStats[tblID] = r
}
err = DumpPlanReplayerInfo(h.ctx, h.sctx, task)
task.JSONTblStats = jsStats
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -340,14 +428,30 @@ func (h *planReplayerTaskDumpHandle) dumpPlanReplayerDumpTask(task *PlanReplayer
return true
}

// SendTask send dumpTask in background task handler
func (h *planReplayerTaskDumpHandle) SendTask(task *PlanReplayerDumpTask) {
select {
case h.taskCH <- task:
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
}
type planReplayerTaskDumpHandle struct {
taskCH chan *PlanReplayerDumpTask
status *planReplayerDumpTaskStatus
workers []*planReplayerTaskDumpWorker
}

// GetTaskStatus used for test
func (h *planReplayerTaskDumpHandle) GetTaskStatus() *planReplayerDumpTaskStatus {
return h.status
}

// GetWorker used for test
func (h *planReplayerTaskDumpHandle) GetWorker() *planReplayerTaskDumpWorker {
return h.workers[0]
}

// Close make finished flag ture
func (h *planReplayerTaskDumpHandle) Close() {
close(h.taskCH)
}

// DrainTask drain a task for unit test
func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
}

func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
Expand Down
5 changes: 4 additions & 1 deletion domain/plan_replayer_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ func TestPlanReplayerHandleDumpTask(t *testing.T) {
tk.MustQuery("select * from t;")
task := prHandle.DrainTask()
require.NotNil(t, task)
success := prHandle.HandlePlanReplayerDumpTask(task)
worker := prHandle.GetWorker()
success := worker.HandleTask(task)
require.True(t, success)
require.Equal(t, prHandle.GetTaskStatus().GetRunningTaskStatusLen(), 0)
require.Equal(t, prHandle.GetTaskStatus().GetFinishedTaskStatusLen(), 1)
// assert memory task consumed
require.Len(t, prHandle.GetTasks(), 0)

Expand Down
Loading