From f60a5891474c5631de44119a3691555534b18603 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 22 Feb 2022 16:27:43 +0800 Subject: [PATCH] executor: tiny refactor code to improve CheckTableExec (#32295) close pingcap/tidb#32294 --- executor/executor.go | 69 +++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 29 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index b912873917a26..df7099b18b469 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -68,6 +68,7 @@ import ( tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" tikvutil "github.com/tikv/client-go/v2/util" + atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -715,6 +716,7 @@ func (e *CheckTableExec) Open(ctx context.Context) error { // Close implements the Executor Close interface. func (e *CheckTableExec) Close() error { var firstErr error + close(e.exitCh) for _, src := range e.srcs { if err := src.Close(); err != nil && firstErr == nil { firstErr = err @@ -748,19 +750,13 @@ func (e *CheckTableExec) checkIndexHandle(ctx context.Context, src *IndexLookUpE for { err = Next(ctx, src, chk) if err != nil { + e.retCh <- errors.Trace(err) break } if chk.NumRows() == 0 { break } - - select { - case <-e.exitCh: - return nil - default: - } } - e.retCh <- errors.Trace(err) return errors.Trace(err) } @@ -797,32 +793,47 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { // The number of table rows is equal to the number of index rows. // TODO: Make the value of concurrency adjustable. And we can consider the number of records. - concurrency := 3 - wg := sync.WaitGroup{} - for i := range e.srcs { - wg.Add(1) - go func(num int) { - defer wg.Done() + if len(e.srcs) == 1 { + return e.checkIndexHandle(ctx, e.srcs[0]) + } + taskCh := make(chan *IndexLookUpExecutor, len(e.srcs)) + failure := atomicutil.NewBool(false) + concurrency := mathutil.Min(3, len(e.srcs)) + var wg util.WaitGroupWrapper + for _, src := range e.srcs { + taskCh <- src + } + for i := 0; i < concurrency; i++ { + wg.Run(func() { util.WithRecovery(func() { - err1 := e.checkIndexHandle(ctx, e.srcs[num]) - if err1 != nil { - logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1)) + for { + if fail := failure.Load(); fail { + return + } + select { + case src := <-taskCh: + err1 := e.checkIndexHandle(ctx, src) + if err1 != nil { + failure.Store(true) + logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1)) + return + } + case <-e.exitCh: + return + default: + return + } } }, e.handlePanic) - }(i) - - if (i+1)%concurrency == 0 { - wg.Wait() - } + }) } - - for i := 0; i < len(e.srcs); i++ { - err = <-e.retCh - if err != nil { - return errors.Trace(err) - } + wg.Wait() + select { + case err := <-e.retCh: + return errors.Trace(err) + default: + return nil } - return nil } func (e *CheckTableExec) checkTableRecord(ctx context.Context, idxOffset int) error { @@ -1776,7 +1787,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.OriginalSQL = s.Text() if explainStmt, ok := s.(*ast.ExplainStmt); ok { sc.InExplainStmt = true - sc.IgnoreExplainIDSuffix = (strings.ToLower(explainStmt.Format) == types.ExplainFormatBrief) + sc.IgnoreExplainIDSuffix = strings.ToLower(explainStmt.Format) == types.ExplainFormatBrief sc.InVerboseExplain = strings.ToLower(explainStmt.Format) == types.ExplainFormatVerbose s = explainStmt.Stmt }