Skip to content

Commit

Permalink
executor: tiny refactor code to improve CheckTableExec (#32295)
Browse files Browse the repository at this point in the history
close #32294
  • Loading branch information
hawkingrei authored Feb 22, 2022
1 parent 964a940 commit f60a589
Showing 1 changed file with 40 additions and 29 deletions.
69 changes: 40 additions & 29 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
tikvutil "github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -715,6 +716,7 @@ func (e *CheckTableExec) Open(ctx context.Context) error {
// Close implements the Executor Close interface.
func (e *CheckTableExec) Close() error {
var firstErr error
close(e.exitCh)
for _, src := range e.srcs {
if err := src.Close(); err != nil && firstErr == nil {
firstErr = err
Expand Down Expand Up @@ -748,19 +750,13 @@ func (e *CheckTableExec) checkIndexHandle(ctx context.Context, src *IndexLookUpE
for {
err = Next(ctx, src, chk)
if err != nil {
e.retCh <- errors.Trace(err)
break
}
if chk.NumRows() == 0 {
break
}

select {
case <-e.exitCh:
return nil
default:
}
}
e.retCh <- errors.Trace(err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -797,32 +793,47 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {

// The number of table rows is equal to the number of index rows.
// TODO: Make the value of concurrency adjustable. And we can consider the number of records.
concurrency := 3
wg := sync.WaitGroup{}
for i := range e.srcs {
wg.Add(1)
go func(num int) {
defer wg.Done()
if len(e.srcs) == 1 {
return e.checkIndexHandle(ctx, e.srcs[0])
}
taskCh := make(chan *IndexLookUpExecutor, len(e.srcs))
failure := atomicutil.NewBool(false)
concurrency := mathutil.Min(3, len(e.srcs))
var wg util.WaitGroupWrapper
for _, src := range e.srcs {
taskCh <- src
}
for i := 0; i < concurrency; i++ {
wg.Run(func() {
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
for {
if fail := failure.Load(); fail {
return
}
select {
case src := <-taskCh:
err1 := e.checkIndexHandle(ctx, src)
if err1 != nil {
failure.Store(true)
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
return
}
case <-e.exitCh:
return
default:
return
}
}
}, e.handlePanic)
}(i)

if (i+1)%concurrency == 0 {
wg.Wait()
}
})
}

for i := 0; i < len(e.srcs); i++ {
err = <-e.retCh
if err != nil {
return errors.Trace(err)
}
wg.Wait()
select {
case err := <-e.retCh:
return errors.Trace(err)
default:
return nil
}
return nil
}

func (e *CheckTableExec) checkTableRecord(ctx context.Context, idxOffset int) error {
Expand Down Expand Up @@ -1776,7 +1787,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.OriginalSQL = s.Text()
if explainStmt, ok := s.(*ast.ExplainStmt); ok {
sc.InExplainStmt = true
sc.IgnoreExplainIDSuffix = (strings.ToLower(explainStmt.Format) == types.ExplainFormatBrief)
sc.IgnoreExplainIDSuffix = strings.ToLower(explainStmt.Format) == types.ExplainFormatBrief
sc.InVerboseExplain = strings.ToLower(explainStmt.Format) == types.ExplainFormatVerbose
s = explainStmt.Stmt
}
Expand Down

0 comments on commit f60a589

Please sign in to comment.