From d4500b81eef57b2539a0c244267174580e6605cc Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 13 Jul 2023 11:57:44 +0800 Subject: [PATCH] executor: fix query hang in IndexMerge Executor when it's killed (#45284) close pingcap/tidb#45279 --- executor/index_merge_reader.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 2a8f14f46533e..3e76ae47deb57 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -807,7 +807,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e req.Reset() for { - resultTask, err := e.getResultTask() + resultTask, err := e.getResultTask(ctx) if err != nil { return errors.Trace(err) } @@ -825,7 +825,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e } } -func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) { +func (e *IndexMergeReaderExecutor) getResultTask(ctx context.Context) (*indexMergeTableTask, error) { failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) { // To make sure processWorker make resultCh to be full. // When main goroutine close finished, processWorker may be stuck when writing resultCh. @@ -839,8 +839,14 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) if !ok { return nil, nil } - if err := <-task.doneCh; err != nil { - return nil, errors.Trace(err) + + select { + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + case err := <-task.doneCh: + if err != nil { + return nil, errors.Trace(err) + } } // Release the memory usage of last task before we handle a new task.