diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index c0f3231223c74..fc54d30759057 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -174,7 +174,7 @@ type planReplayerHandle struct { } // SendTask send dumpTask in background task handler -func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) { +func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) bool { select { case h.planReplayerTaskDumpHandle.taskCH <- task: // we directly remove the task key if we put task in channel successfully, if the task was failed to dump, @@ -182,11 +182,13 @@ func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) { if !task.IsContinuesCapture { h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey) } + return true default: // TODO: add metrics here // directly discard the task if the task channel is full in order not to block the query process - logutil.BgLogger().Info("discard one plan replayer dump task", - zap.String("sql digest", task.SQLDigest), zap.String("plan digest", task.PlanDigest)) + logutil.BgLogger().Warn("discard one plan replayer dump task", + zap.String("sql-digest", task.SQLDigest), zap.String("plan-digest", task.PlanDigest)) + return false } } @@ -209,9 +211,13 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error { for _, key := range allKeys { unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key) if err != nil { + logutil.BgLogger().Warn("[plan-replayer-task] collect plan replayer task failed", zap.Error(err)) return err } if unhandled { + logutil.BgLogger().Debug("[plan-replayer-task] collect plan replayer task success", + zap.String("sql-digest", key.SQLDigest), + zap.String("plan-digest", key.PlanDigest)) tasks = append(tasks, key) } } @@ -351,16 +357,36 @@ type planReplayerTaskDumpWorker struct { func (w *planReplayerTaskDumpWorker) run() { for task := range w.taskCH { + w.handleTask(task) + } +} + +func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) { + sqlDigest := task.SQLDigest + planDigest := task.PlanDigest + check := true + occupy := true + handleTask := true + defer func() { + logutil.BgLogger().Debug("[plan-replayer-capture] handle task", + zap.String("sql-digest", sqlDigest), + zap.String("plan-digest", planDigest), + zap.Bool("check", check), + zap.Bool("occupy", occupy), + zap.Bool("handle", handleTask)) + }() + if task.IsContinuesCapture { if w.status.checkTaskKeyFinishedBefore(task) { - continue + check = false + return } - successOccupy := w.status.occupyRunningTaskKey(task) - if !successOccupy { - continue - } - w.HandleTask(task) - w.status.releaseRunningTaskKey(task) } + occupy = w.status.occupyRunningTaskKey(task) + if !occupy { + return + } + handleTask = w.HandleTask(task) + w.status.releaseRunningTaskKey(task) } // HandleTask handled task @@ -373,7 +399,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc taskKey := task.PlanReplayerTaskKey unhandled, err := checkUnHandledReplayerTask(w.ctx, w.sctx, taskKey) if err != nil { - logutil.BgLogger().Warn("check plan replayer capture task failed", + logutil.BgLogger().Warn("[plan-replayer-capture] check task failed", zap.String("sqlDigest", taskKey.SQLDigest), zap.String("planDigest", taskKey.PlanDigest), zap.Error(err)) @@ -386,7 +412,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture) if err != nil { - logutil.BgLogger().Warn("generate plan replayer capture task file failed", + logutil.BgLogger().Warn("[plan-replayer-capture] generate task file failed", zap.String("sqlDigest", taskKey.SQLDigest), zap.String("planDigest", taskKey.PlanDigest), zap.Error(err)) @@ -409,7 +435,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc } r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table)) if err != nil { - logutil.BgLogger().Warn("generate plan replayer capture task json stats failed", + logutil.BgLogger().Warn("[plan-replayer-capture] generate task json stats failed", zap.String("sqlDigest", taskKey.SQLDigest), zap.String("planDigest", taskKey.PlanDigest), zap.Error(err)) @@ -421,7 +447,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc } err = DumpPlanReplayerInfo(w.ctx, w.sctx, task) if err != nil { - logutil.BgLogger().Warn("dump plan replayer capture task result failed", + logutil.BgLogger().Warn("[plan-replayer-capture] dump task result failed", zap.String("sqlDigest", taskKey.SQLDigest), zap.String("planDigest", taskKey.PlanDigest), zap.Error(err)) diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 264631b6fbdb3..0dd4945873e58 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -66,6 +66,10 @@ const ( PlanReplayerTaskMetaIsCapture = "isCapture" // PlanReplayerTaskMetaIsContinues indicates whether this task is continues task PlanReplayerTaskMetaIsContinues = "isContinues" + // PlanReplayerTaskMetaSQLDigest indicates the sql digest of this task + PlanReplayerTaskMetaSQLDigest = "sqlDigest" + // PlanReplayerTaskMetaPlanDigest indicates the plan digest of this task + PlanReplayerTaskMetaPlanDigest = "planDigest" ) type tableNamePair struct { @@ -180,25 +184,53 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, execStmts := task.ExecStmts zw := zip.NewWriter(zf) var records []PlanReplayerStatusRecord + sqls := make([]string, 0) + for _, execStmt := range task.ExecStmts { + sqls = append(sqls, execStmt.Text()) + } + if task.IsCapture { + logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result", + zap.String("sql-digest", task.SQLDigest), + zap.String("plan-digest", task.PlanDigest), + zap.Strings("sql", sqls), + zap.Bool("isContinues", task.IsContinuesCapture)) + } else { + logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result", + zap.Strings("sqls", sqls)) + } defer func() { + errMsg := "" if err != nil { - logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err)) + if task.IsCapture { + logutil.BgLogger().Info("[plan-replayer-dump] dump file failed", + zap.String("sql-digest", task.SQLDigest), + zap.String("plan-digest", task.PlanDigest), + zap.Strings("sql", sqls), + zap.Bool("isContinues", task.IsContinuesCapture)) + } else { + logutil.BgLogger().Info("[plan-replayer-dump] start to dump plan replayer result", + zap.Strings("sqls", sqls)) + } + errMsg = err.Error() } - err = zw.Close() - if err != nil { - logutil.BgLogger().Error("Closing zip writer failed", zap.Error(err), zap.String("filename", fileName)) + err1 := zw.Close() + if err1 != nil { + logutil.BgLogger().Error("[plan-replayer-dump] Closing zip writer failed", zap.Error(err), zap.String("filename", fileName)) + errMsg = errMsg + "," + err1.Error() } - err = zf.Close() - if err != nil { - logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName)) + err2 := zf.Close() + if err2 != nil { + logutil.BgLogger().Error("[plan-replayer-dump] Closing zip file failed", zap.Error(err), zap.String("filename", fileName)) + errMsg = errMsg + "," + err2.Error() + } + if len(errMsg) > 0 { for i, record := range records { - record.FailedReason = err.Error() + record.FailedReason = errMsg records[i] = record } } insertPlanReplayerStatus(ctx, sctx, records) }() - // Dump SQLMeta if err = dumpSQLMeta(zw, task); err != nil { return err @@ -299,6 +331,8 @@ func dumpSQLMeta(zw *zip.Writer, task *PlanReplayerDumpTask) error { varMap[PlanReplayerSQLMetaStartTS] = strconv.FormatUint(task.StartTS, 10) varMap[PlanReplayerTaskMetaIsCapture] = strconv.FormatBool(task.IsCapture) varMap[PlanReplayerTaskMetaIsContinues] = strconv.FormatBool(task.IsContinuesCapture) + varMap[PlanReplayerTaskMetaSQLDigest] = task.SQLDigest + varMap[PlanReplayerTaskMetaPlanDigest] = task.PlanDigest if err := toml.NewEncoder(cf).Encode(varMap); err != nil { return errors.AddStack(err) } diff --git a/executor/compiler.go b/executor/compiler.go index e2c2a29794d1d..e000a22ba633e 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -158,19 +158,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL { - if _, ok := stmtNode.(*ast.SelectStmt); ok { - startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() - if err != nil { - return nil, err - } - if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture { - checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS) - } else { - checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) - } + startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() + if err != nil { + return nil, err + } + if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture { + checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS) + } else { + checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) } } - return stmt, nil } @@ -183,9 +180,17 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode if handle == nil { return } + captured := false tasks := handle.GetTasks() _, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest() _, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx) + defer func() { + logutil.BgLogger().Debug("[plan-replayer-capture] check capture task", + zap.String("sql-digest", sqlDigest.String()), + zap.String("plan-digest", planDigest.String()), + zap.Int("tasks", len(tasks)), + zap.Bool("captured", captured)) + }() key := replayer.PlanReplayerTaskKey{ SQLDigest: sqlDigest.String(), PlanDigest: planDigest.String(), @@ -193,7 +198,7 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode for _, task := range tasks { if task.SQLDigest == sqlDigest.String() { if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) + captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false) return } } @@ -215,16 +220,26 @@ func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.Stm SQLDigest: sqlDigest.String(), PlanDigest: planDigest.String(), } + captured := false + defer func() { + logutil.BgLogger().Debug("[plan-replayer-capture] check continues capture task", + zap.String("sql-digest", sqlDigest.String()), + zap.String("plan-digest", planDigest.String()), + zap.Bool("captured", captured)) + }() + existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key) if existed { return } - sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) - sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) + captured = sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true) + if captured { + sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key) + } } func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode, - startTS uint64, isContinuesCapture bool) { + startTS uint64, isContinuesCapture bool) bool { stmtCtx := sctx.GetSessionVars().StmtCtx handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) dumpTask := &domain.PlanReplayerDumpTask{ @@ -239,7 +254,7 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx. IsCapture: true, IsContinuesCapture: isContinuesCapture, } - domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) + return domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask) } // needLowerPriority checks whether it's needed to lower the execution priority diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index 46a2a9fc691e3..f16e2c9719088 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -22,7 +22,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/sqlexec" @@ -153,15 +155,32 @@ func (h *Handle) ClearOutdatedHistoryStats() error { h.mu.Lock() defer h.mu.Unlock() exec := h.mu.ctx.(sqlexec.SQLExecutor) - sql := "delete from mysql.stats_meta_history where NOW() - create_time >= %?" - _, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + sql := "select count(*) from mysql.stats_meta_history where NOW() - create_time >= %?" + rs, err := exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) if err != nil { return err } - sql = "delete from mysql.stats_history where NOW() - create_time >= %? " - _, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) - logutil.BgLogger().Info("clear outdated historical stats") - return err + if rs == nil { + return nil + } + var rows []chunk.Row + defer terror.Call(rs.Close) + if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil { + return errors.Trace(err) + } + count := rows[0].GetInt64(0) + if count > 0 { + sql = "delete from mysql.stats_meta_history where NOW() - create_time >= %?" + _, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + if err != nil { + return err + } + sql = "delete from mysql.stats_history where NOW() - create_time >= %? " + _, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + logutil.BgLogger().Info("clear outdated historical stats") + return err + } + return nil } func (h *Handle) gcHistoryStatsFromKV(physicalID int64) error {