Skip to content

Commit e4f47d3

Browse files
executor: fix hang in hash agg when exceeding memory limit leads to panic (#57641)
close #57546
1 parent cfa16a3 commit e4f47d3

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

pkg/executor/aggregate/agg_hash_partial_worker.go

+17-16
Original file line numberDiff line numberDiff line change
@@ -71,25 +71,16 @@ type HashAggPartialWorker struct {
7171
inflightChunkSync *sync.WaitGroup
7272
}
7373

74-
func (w *HashAggPartialWorker) getChildInput() bool {
74+
func (w *HashAggPartialWorker) getChildInput() (*chunk.Chunk, bool) {
7575
select {
7676
case <-w.finishCh:
77-
return false
77+
return nil, false
7878
case chk, ok := <-w.inputCh:
7979
if !ok {
80-
return false
81-
}
82-
83-
sizeBefore := w.chk.MemoryUsage()
84-
w.chk.SwapColumns(chk)
85-
w.memTracker.Consume(w.chk.MemoryUsage() - sizeBefore)
86-
87-
w.giveBackCh <- &HashAggInput{
88-
chk: chk,
89-
giveBackCh: w.inputCh,
80+
return nil, false
9081
}
82+
return chk, true
9183
}
92-
return true
9384
}
9485

9586
func (w *HashAggPartialWorker) fetchChunkAndProcess(ctx sessionctx.Context, hasError *bool, needShuffle *bool) bool {
@@ -99,14 +90,24 @@ func (w *HashAggPartialWorker) fetchChunkAndProcess(ctx sessionctx.Context, hasE
9990
}
10091

10192
waitStart := time.Now()
102-
ok := w.getChildInput()
103-
updateWaitTime(w.stats, waitStart)
104-
93+
chk, ok := w.getChildInput()
10594
if !ok {
10695
return false
10796
}
10897

10998
defer w.inflightChunkSync.Done()
99+
updateWaitTime(w.stats, waitStart)
100+
101+
w.intestDuringPartialWorkerRun()
102+
103+
sizeBefore := w.chk.MemoryUsage()
104+
w.chk.SwapColumns(chk)
105+
w.memTracker.Consume(w.chk.MemoryUsage() - sizeBefore)
106+
107+
w.giveBackCh <- &HashAggInput{
108+
chk: chk,
109+
giveBackCh: w.inputCh,
110+
}
110111

111112
execStart := time.Now()
112113
if err := w.updatePartialResult(ctx, w.chk, len(w.partialResultsMap)); err != nil {

0 commit comments

Comments
 (0)