Skip to content

Commit 452be96

Browse files
Release memory held by aggs on failure (#72966)
If consuming a query result were disrupted by circuit breaker we would leak memory for aggs in buffered query results, fixed. Relates #62439 and #72309 Closes #72923
1 parent 0bf1601 commit 452be96

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ public void consume(QuerySearchResult result, Runnable next) {
311311
addEstimateAndMaybeBreak(aggsSize);
312312
} catch (Exception exc) {
313313
result.releaseAggs();
314+
buffer.forEach(QuerySearchResult::releaseAggs);
315+
buffer.clear();
314316
onMergeFailure(exc);
315317
next.run();
316318
return;
@@ -402,17 +404,20 @@ protected void doRun() {
402404
final MergeResult thisMergeResult = mergeResult;
403405
long estimatedTotalSize = (thisMergeResult != null ? thisMergeResult.estimatedSize : 0) + task.aggsBufferSize;
404406
final MergeResult newMerge;
407+
final QuerySearchResult[] toConsume = task.consumeBuffer();
408+
if (toConsume == null) {
409+
return;
410+
}
405411
try {
406-
final QuerySearchResult[] toConsume = task.consumeBuffer();
407-
if (toConsume == null) {
408-
return;
409-
}
410412
long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize);
411413
addEstimateAndMaybeBreak(estimatedMergeSize);
412414
estimatedTotalSize += estimatedMergeSize;
413415
++ numReducePhases;
414416
newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases);
415417
} catch (Exception t) {
418+
for (QuerySearchResult result : toConsume) {
419+
result.releaseAggs();
420+
}
416421
onMergeFailure(t);
417422
return;
418423
}
@@ -507,7 +512,12 @@ public void consumeListener() {
507512
}
508513

509514
public synchronized void cancel() {
510-
consumeBuffer();
515+
QuerySearchResult[] buffer = consumeBuffer();
516+
if (buffer != null) {
517+
for (QuerySearchResult result : buffer) {
518+
result.releaseAggs();
519+
}
520+
}
511521
consumeListener();
512522
}
513523
}

0 commit comments

Comments
 (0)