Skip to content

Commit 4583e16

Browse files
committed
callback should be outside synchronized block
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 89f9dde commit 4583e16

File tree

1 file changed

+42
-46
lines changed

1 file changed

+42
-46
lines changed

server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
* as shard results are consumed.
6767
* This implementation adds the memory that it used to save and reduce the results of shard aggregations
6868
* in the {@link CircuitBreaker#REQUEST} circuit breaker. Before any partial or final reduce, the memory
69-
* needed to reduce the aggregations is estimated and a {@link CircuitBreakingException} is thrown if it
69+
* needed to reduce the aggregations is estimated and a {@link CircuitBreakingException} is handled if it
7070
* exceeds the maximum memory allowed in this breaker.
7171
*
7272
* @opensearch.internal
@@ -367,9 +367,6 @@ private void resetCircuitBreaker() {
367367
* provided {@link QuerySearchResult}.
368368
*/
369369
private long ramBytesUsedQueryResult(QuerySearchResult result) {
370-
if (result.aggregations() == null) {
371-
return 0;
372-
}
373370
if (hasAggs == false) {
374371
return 0;
375372
}
@@ -388,53 +385,52 @@ private long estimateRamBytesUsedForReduce(long size) {
388385
return Math.round(0.5d * size);
389386
}
390387

391-
void consume(QuerySearchResult result, Runnable callback) throws CircuitBreakingException {
388+
void consume(QuerySearchResult result, Runnable callback) {
392389
checkCancellation();
393-
boolean callbackWaitsForMerge = false;
394390

395-
synchronized (this) {
396-
if (hasFailure()) {
397-
result.consumeAll();
398-
callback.run();
399-
return;
400-
}
401-
if (result.isNull()) {
402-
SearchShardTarget target = result.getSearchShardTarget();
403-
emptyResults.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
404-
callback.run();
405-
return;
406-
}
407-
// Check circuit breaker before consuming
408-
if (hasAggs) {
409-
long aggsSize = ramBytesUsedQueryResult(result);
410-
try {
411-
addEstimateAndMaybeBreak(aggsSize);
412-
aggsCurrentBufferSize += aggsSize;
413-
} catch (CircuitBreakingException e) {
414-
onMergeFailure(e);
415-
callback.run();
416-
return;
417-
}
418-
}
419-
// Process non-empty, valid results
420-
int size = buffer.size() + (hasPartialReduce ? 1 : 0);
421-
if (size >= batchReduceSize) {
422-
hasPartialReduce = true;
423-
// new result's callback must wait for this merge task to complete to maintain proper result processing order
424-
callbackWaitsForMerge = true;
425-
QuerySearchResult[] clone = buffer.toArray(QuerySearchResult[]::new);
426-
MergeTask task = new MergeTask(clone, aggsCurrentBufferSize, new ArrayList<>(emptyResults), callback);
427-
aggsCurrentBufferSize = 0;
428-
buffer.clear();
429-
emptyResults.clear();
430-
queue.add(task);
431-
tryExecuteNext();
391+
if (processResult(result, callback)) {
392+
callback.run();
393+
}
394+
}
395+
396+
private synchronized boolean processResult(QuerySearchResult result, Runnable callback) {
397+
if (hasFailure()) {
398+
result.consumeAll(); // release memory
399+
return true;
400+
}
401+
if (result.isNull()) {
402+
SearchShardTarget target = result.getSearchShardTarget();
403+
emptyResults.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
404+
return true;
405+
}
406+
// Check circuit breaker before consuming
407+
if (hasAggs) {
408+
long aggsSize = ramBytesUsedQueryResult(result);
409+
try {
410+
addEstimateAndMaybeBreak(aggsSize);
411+
aggsCurrentBufferSize += aggsSize;
412+
} catch (CircuitBreakingException e) {
413+
onMergeFailure(e);
414+
return true;
432415
}
433-
buffer.add(result);
434416
}
435-
if (!callbackWaitsForMerge) {
436-
callback.run();
417+
// Process non-empty results
418+
int size = buffer.size() + (hasPartialReduce ? 1 : 0);
419+
if (size >= batchReduceSize) {
420+
hasPartialReduce = true;
421+
// the callback must wait for the new merge task to complete to maintain proper result processing order
422+
QuerySearchResult[] clone = buffer.toArray(QuerySearchResult[]::new);
423+
MergeTask task = new MergeTask(clone, aggsCurrentBufferSize, new ArrayList<>(emptyResults), callback);
424+
aggsCurrentBufferSize = 0;
425+
buffer.clear();
426+
emptyResults.clear();
427+
queue.add(task);
428+
tryExecuteNext();
429+
buffer.add(result);
430+
return false; // callback will be run by merge task
437431
}
432+
buffer.add(result);
433+
return true;
438434
}
439435

440436
private void tryExecuteNext() {

0 commit comments

Comments
 (0)