Skip to content

Commit

Permalink
executor: fix data race in parallel sort test (#52246)
Browse files Browse the repository at this point in the history
close #52238
  • Loading branch information
xzhangxian1008 authored Apr 1, 2024
1 parent 42673cb commit 6f0adb8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
8 changes: 5 additions & 3 deletions pkg/executor/sortexec/parallel_sort_spill_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ func (p *parallelSortSpillHelper) spillImpl(merger *multiWayMerger) error {
inDisk.GetDiskTracker().AttachTo(p.sortExec.diskTracker)

spilledRowChannel := make(chan chunk.Row, 10000)

// We must wait the finish of the following goroutine,
// or we will exit `spillImpl` function in advance and
// this will cause data race.
defer channel.Clear(spilledRowChannel)
go func() {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -207,9 +212,6 @@ func (p *parallelSortSpillHelper) spillImpl(merger *multiWayMerger) error {
for {
select {
case <-p.finishCh:
// We must wait the finish of the above goroutine,
// or p.errOutputChan may be closed in advandce.
channel.Clear(spilledRowChannel)
return nil
case row, ok = <-spilledRowChannel:
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/sortexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ func (e *SortExec) generateResult(waitGroups ...*util.WaitGroupWrapper) {
processPanicAndLog(e.Parallel.resultChannel, r)
}

close(e.Parallel.resultChannel)
for i := range e.Parallel.sortedRowsIters {
e.Parallel.sortedRowsIters[i].Reset(nil)
}
Expand All @@ -447,6 +446,7 @@ func (e *SortExec) generateResult(waitGroups ...*util.WaitGroupWrapper) {
}
}
e.Parallel.merger = nil
close(e.Parallel.resultChannel)
}()

if !e.Parallel.spillHelper.isSpillTriggered() {
Expand Down

0 comments on commit 6f0adb8

Please sign in to comment.