Skip to content

Commit

Permalink
executor: Fix tidb crash on index merge reader (#40904) (#40981)
Browse files Browse the repository at this point in the history
close #40877
  • Loading branch information
ti-chi-bot authored Mar 28, 2023
1 parent 0ac1013 commit 6c1e555
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
24 changes: 16 additions & 8 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,13 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co
defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End()
var task *lookupTableTask
util.WithRecovery(
func() { task = worker.pickAndExecTask(ctx1) },
worker.handlePickAndExecTaskPanic(ctx1, task),
// Note we use the address of `task` as the argument of both `pickAndExecTask` and `handlePickAndExecTaskPanic`
// because `task` is expected to be assigned in `pickAndExecTask`, and this assignment should also be visible
// in `handlePickAndExecTaskPanic` since it will get `doneCh` from `task`. Golang always pass argument by value,
// so if we don't use the address of `task` as the argument, the assignment to `task` in `pickAndExecTask` is
// not visible in `handlePickAndExecTaskPanic`
func() { worker.pickAndExecTask(ctx1, &task) },
worker.handlePickAndExecTaskPanic(ctx1, &task),
)
cancel()
e.tblWorkerWg.Done()
Expand Down Expand Up @@ -895,38 +900,41 @@ type indexMergeTableScanWorker struct {
memTracker *memory.Tracker
}

func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task *lookupTableTask) {
func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **lookupTableTask) {
var ok bool
for {
waitStart := time.Now()
select {
case task, ok = <-w.workCh:
case *task, ok = <-w.workCh:
if !ok {
return
}
case <-w.finished:
return
}
execStart := time.Now()
err := w.executeTask(ctx, task)
err := w.executeTask(ctx, *task)
if w.stats != nil {
atomic.AddInt64(&w.stats.WaitTime, int64(execStart.Sub(waitStart)))
atomic.AddInt64(&w.stats.FetchRow, int64(time.Since(execStart)))
atomic.AddInt64(&w.stats.TableTaskNum, 1)
}
task.doneCh <- err
failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil)
(*task).doneCh <- err
}
}

func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *lookupTableTask) func(r interface{}) {
func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task **lookupTableTask) func(r interface{}) {
return func(r interface{}) {
if r == nil {
return
}

err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r)
logutil.Logger(ctx).Error(err4Panic.Error())
task.doneCh <- err4Panic
if *task != nil {
(*task).doneCh <- err4Panic
}
}
}

Expand Down
21 changes: 21 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -51,6 +52,26 @@ func TestSingleTableRead(t *testing.T) {
tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(a) from t1 where a < 2 or b > 4").Check(testkit.Rows("6"))
}

func TestIndexMergePickAndExecTaskPanic(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(id int primary key, a int, b int, c int, d int)")
tk.MustExec("create index t1a on t1(a)")
tk.MustExec("create index t1b on t1(b)")
tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)")
tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1",
"5 5 5 5 5"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic", "panic(\"pickAndExecTaskPanic\")"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePickAndExecTaskPanic"))
}()
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id")
require.Contains(t, err.Error(), "pickAndExecTaskPanic")
}

func TestJoin(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down

0 comments on commit 6c1e555

Please sign in to comment.