Skip to content

Commit

Permalink
executor: fix query hang in IndexMerge Executor when it's killed (#45284
Browse files Browse the repository at this point in the history
)

close #45279
  • Loading branch information
xzhangxian1008 committed Jul 13, 2023
1 parent 19e7eaa commit d4500b8
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e

req.Reset()
for {
resultTask, err := e.getResultTask()
resultTask, err := e.getResultTask(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -825,7 +825,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e
}
}

func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) {
func (e *IndexMergeReaderExecutor) getResultTask(ctx context.Context) (*indexMergeTableTask, error) {
failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) {
// To make sure processWorker make resultCh to be full.
// When main goroutine close finished, processWorker may be stuck when writing resultCh.
Expand All @@ -839,8 +839,14 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error)
if !ok {
return nil, nil
}
if err := <-task.doneCh; err != nil {
return nil, errors.Trace(err)

select {
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
case err := <-task.doneCh:
if err != nil {
return nil, errors.Trace(err)
}
}

// Release the memory usage of last task before we handle a new task.
Expand Down

0 comments on commit d4500b8

Please sign in to comment.