Skip to content

Commit

Permalink
executor: fix goroutine leak in IndexMerge.Close (#41762)
Browse files Browse the repository at this point in the history
close #41545
  • Loading branch information
guo-shaoge authored Feb 28, 2023
1 parent ef42ca8 commit 58fcf7b
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 39 deletions.
115 changes: 97 additions & 18 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont
func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, workID int) error {
failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) {
// Wait for processWorker to close resultCh.
time.Sleep(2)
time.Sleep(time.Second * 2)
// Should use fetchCh instead of resultCh to send error.
syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly"))
})
Expand Down Expand Up @@ -371,8 +371,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
for parTblIdx, keyRange := range keyRanges {
// check if this executor is closed
select {
case <-ctx.Done():
return
case <-e.finished:
break
return
default:
}

Expand Down Expand Up @@ -495,8 +497,10 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
for parTblIdx, tbl := range tbls {
// check if this executor is closed
select {
case <-ctx.Done():
return
case <-e.finished:
break
return
default:
}

Expand Down Expand Up @@ -747,6 +751,12 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e
}

func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) {
failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) {
// To make sure processWorker make resultCh to be full.
// When main goroutine close finished, processWorker may be stuck when writing resultCh.
time.Sleep(time.Second * 20)
failpoint.Return(nil, errors.New("failpoint testIndexMergeMainReturnEarly"))
})
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
Expand Down Expand Up @@ -774,6 +784,7 @@ func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<-
defer close(ch)
}
if r == nil {
logutil.BgLogger().Info("worker finish without panic", zap.Any("worker", worker))
return
}

Expand Down Expand Up @@ -836,7 +847,20 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil)

distinctHandles := make(map[int64]*kv.HandleMap)
for task := range fetchCh {
for {
var ok bool
var task *indexMergeTableTask
select {
case <-ctx.Done():
return
case <-finished:
return
case task, ok = <-fetchCh:
if !ok {
return
}
}

select {
case err := <-task.doneCh:
// If got error from partialIndexWorker/partialTableWorker, stop processing.
Expand Down Expand Up @@ -872,7 +896,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
if len(fhs) == 0 {
continue
}
task := &indexMergeTableTask{
task = &indexMergeTableTask{
lookupTableTask: lookupTableTask{
handles: fhs,
doneCh: make(chan error, 1),
Expand All @@ -883,13 +907,27 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
if w.stats != nil {
w.stats.IndexMergeProcess += time.Since(start)
}
failpoint.Inject("testIndexMergeProcessWorkerUnionHang", func(_ failpoint.Value) {
for i := 0; i < cap(resultCh); i++ {
select {
case resultCh <- &indexMergeTableTask{}:
default:
}
}
})
select {
case <-ctx.Done():
return
case <-finished:
return
case workCh <- task:
resultCh <- task
select {
case <-ctx.Done():
return
case <-finished:
return
case resultCh <- task:
}
}
}
}
Expand Down Expand Up @@ -988,14 +1026,28 @@ func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Conte
zap.Int("parTblIdx", parTblIdx), zap.Int("task.handles", len(task.handles)))
}
}
failpoint.Inject("testIndexMergeProcessWorkerIntersectionHang", func(_ failpoint.Value) {
for i := 0; i < cap(resultCh); i++ {
select {
case resultCh <- &indexMergeTableTask{}:
default:
}
}
})
for _, task := range tasks {
select {
case <-ctx.Done():
return
case <-finished:
return
case workCh <- task:
resultCh <- task
select {
case <-ctx.Done():
return
case <-finished:
return
case resultCh <- task:
}
}
}
}
Expand Down Expand Up @@ -1054,29 +1106,47 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet
}, handleWorkerPanic(ctx, finished, resultCh, errCh, partTblIntersectionWorkerType))
workers = append(workers, worker)
}
loop:
for task := range fetchCh {
defer func() {
for _, processWorker := range workers {
close(processWorker.workerCh)
}
wg.Wait()
}()
for {
var ok bool
var task *indexMergeTableTask
select {
case <-ctx.Done():
return
case <-finished:
return
case task, ok = <-fetchCh:
if !ok {
return
}
}

select {
case err := <-task.doneCh:
// If got error from partialIndexWorker/partialTableWorker, stop processing.
if err != nil {
syncErr(ctx, finished, resultCh, err)
break loop
return
}
default:
}

select {
case <-ctx.Done():
return
case <-finished:
return
case workers[task.parTblIdx%workerCnt].workerCh <- task:
case <-errCh:
// If got error from intersectionProcessWorker, stop processing.
break loop
return
}
}
for _, processWorker := range workers {
close(processWorker.workerCh)
}
wg.Wait()
}

type partialIndexWorker struct {
Expand Down Expand Up @@ -1225,12 +1295,14 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **
for {
waitStart := time.Now()
select {
case <-ctx.Done():
return
case <-w.finished:
return
case *task, ok = <-w.workCh:
if !ok {
return
}
case <-w.finished:
return
}
// Make sure panic failpoint is after fetch task from workCh.
// Otherwise cannot send error to task.doneCh.
Expand All @@ -1251,13 +1323,20 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **
atomic.AddInt64(&w.stats.TableTaskNum, 1)
}
failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil)
(*task).doneCh <- err
select {
case <-ctx.Done():
return
case <-w.finished:
return
case (*task).doneCh <- err:
}
}
}

func (w *indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **indexMergeTableTask, worker string) func(r interface{}) {
return func(r interface{}) {
if r == nil {
logutil.BgLogger().Info("worker finish without panic", zap.Any("worker", worker))
return
}

Expand Down
66 changes: 45 additions & 21 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,49 @@ func TestIntersectionMemQuota(t *testing.T) {
require.Contains(t, err.Error(), "Out Of Memory Quota!")
}

func setupPartitionTableHelper(tk *testkit.TestKit) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
}

func TestIndexMergeProcessWorkerHang(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

var err error
sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;"
res := tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly", "return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerUnionHang", "return(true)"))
err = tk.QueryToErr(sql)
require.Contains(t, err.Error(), "testIndexMergeMainReturnEarly")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerUnionHang"))

sql = "select /*+ use_index_merge(t1, c2, c3) */ c1 from t1 where c2 < 900 and c3 < 1000;"
res = tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")
require.Contains(t, res[1][4], "intersection")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly", "return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerIntersectionHang", "return(true)"))
err = tk.QueryToErr(sql)
require.Contains(t, err.Error(), "testIndexMergeMainReturnEarly")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerIntersectionHang"))
}

func TestIndexMergePanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand All @@ -811,16 +854,7 @@ func TestIndexMergePanic(t *testing.T) {
tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly"))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
setupPartitionTableHelper(tk)

minV := 200
maxV := 1000
Expand Down Expand Up @@ -885,17 +919,7 @@ func TestIndexMergePanic(t *testing.T) {
func TestIndexMergeCoprGoroutinesLeak(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
setupPartitionTableHelper(tk)

var err error
sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;"
Expand Down

0 comments on commit 58fcf7b

Please sign in to comment.