diff --git a/pkg/executor/sortexec/parallel_sort_spill_helper.go b/pkg/executor/sortexec/parallel_sort_spill_helper.go index 05d3ba22f0fdb..b322e590e7f29 100644 --- a/pkg/executor/sortexec/parallel_sort_spill_helper.go +++ b/pkg/executor/sortexec/parallel_sort_spill_helper.go @@ -177,6 +177,11 @@ func (p *parallelSortSpillHelper) spillImpl(merger *multiWayMerger) error { inDisk.GetDiskTracker().AttachTo(p.sortExec.diskTracker) spilledRowChannel := make(chan chunk.Row, 10000) + + // We must wait the finish of the following goroutine, + // or we will exit `spillImpl` function in advance and + // this will cause data race. + defer channel.Clear(spilledRowChannel) go func() { defer func() { if r := recover(); r != nil { @@ -207,9 +212,6 @@ func (p *parallelSortSpillHelper) spillImpl(merger *multiWayMerger) error { for { select { case <-p.finishCh: - // We must wait the finish of the above goroutine, - // or p.errOutputChan may be closed in advandce. - channel.Clear(spilledRowChannel) return nil case row, ok = <-spilledRowChannel: if !ok { diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index 3fe45afe62c73..26c17c1979992 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -437,7 +437,6 @@ func (e *SortExec) generateResult(waitGroups ...*util.WaitGroupWrapper) { processPanicAndLog(e.Parallel.resultChannel, r) } - close(e.Parallel.resultChannel) for i := range e.Parallel.sortedRowsIters { e.Parallel.sortedRowsIters[i].Reset(nil) } @@ -447,6 +446,7 @@ func (e *SortExec) generateResult(waitGroups ...*util.WaitGroupWrapper) { } } e.Parallel.merger = nil + close(e.Parallel.resultChannel) }() if !e.Parallel.spillHelper.isSpillTriggered() {