Skip to content

Commit 21538c2

Browse files
committed
add graceful handling of failures in QueryPhaseResultConsumer
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
1 parent 9908294 commit 21538c2

File tree

2 files changed

+25
-16
lines changed

2 files changed

+25
-16
lines changed

server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,9 @@ private MergeResult partialReduce(
217217
int numReducePhases
218218
) {
219219
checkCancellation();
220+
if (pendingMerges.hasFailure()) {
221+
return lastMerge;
222+
}
220223
// ensure consistent ordering
221224
Arrays.sort(toConsume, Comparator.comparingInt(QuerySearchResult::getShardIndex));
222225

@@ -450,6 +453,10 @@ private synchronized void onMergeFailure(Exception exc) {
450453
MergeTask task = runningTask.get();
451454
runningTask.compareAndSet(task, null);
452455
onPartialMergeFailure.accept(exc);
456+
clearPendingMerges(task);
457+
}
458+
459+
void clearPendingMerges(MergeTask task) {
453460
List<MergeTask> toCancels = new ArrayList<>();
454461
if (task != null) {
455462
toCancels.add(task);
@@ -471,10 +478,11 @@ private void resetCircuitBreakerForCurrentRequest() {
471478

472479
private void onAfterMerge(MergeTask task, MergeResult newResult, long estimatedSize) {
473480
synchronized (this) {
481+
runningTask.compareAndSet(task, null);
474482
if (hasFailure()) {
483+
task.cancel();
475484
return;
476485
}
477-
runningTask.compareAndSet(task, null);
478486
mergeResult = newResult;
479487
if (hasAggs) {
480488
// Update the circuit breaker to remove the size of the source aggregations
@@ -495,7 +503,11 @@ private void onAfterMerge(MergeTask task, MergeResult newResult, long estimatedS
495503
private void tryExecuteNext() {
496504
final MergeTask task;
497505
synchronized (this) {
498-
if (queue.isEmpty() || hasFailure() || runningTask.get() != null) {
506+
if (hasFailure()) {
507+
clearPendingMerges(null);
508+
return;
509+
}
510+
if (queue.isEmpty() || runningTask.get() != null) {
499511
return;
500512
}
501513
task = queue.poll();
@@ -511,6 +523,7 @@ protected void doRun() {
511523
try {
512524
final QuerySearchResult[] toConsume = task.consumeBuffer();
513525
if (toConsume == null) {
526+
task.cancel();
514527
return;
515528
}
516529
long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize);

server/src/test/java/org/opensearch/action/search/SearchPhaseControllerTests.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,8 +1747,11 @@ public void testCancellationWithoutCircuitBreaker() throws Exception {
17471747
int batchedReduceSize = randomIntBetween(2, expectedNumResults - 1);
17481748
SearchRequest request = getAggregationSearchRequestWithBatchedReduceSize(batchedReduceSize);
17491749
AssertingCircuitBreaker circuitBreaker = new AssertingCircuitBreaker(CircuitBreaker.REQUEST);
1750-
AtomicInteger checkCount = new AtomicInteger(0);
1751-
int cancelAfter = expectedNumResults / 2;
1750+
// To make it deterministic, we can count the number of times the partialReduce and reduce are called
1751+
// The exception is only thrown during the call to reduce which will happen once all shard level
1752+
// results have arrived
1753+
int partialReduceMethodCallCount = expectedNumResults / batchedReduceSize;
1754+
AtomicInteger checkCount = new AtomicInteger(expectedNumResults + partialReduceMethodCallCount);
17521755

17531756
QueryPhaseResultConsumer consumer = searchPhaseController.newSearchPhaseResults(
17541757
fixedExecutor,
@@ -1758,7 +1761,7 @@ public void testCancellationWithoutCircuitBreaker() throws Exception {
17581761
expectedNumResults,
17591762
exc -> {},
17601763
() -> {
1761-
return checkCount.incrementAndGet() > cancelAfter;
1764+
return checkCount.decrementAndGet() <= 0;
17621765
}
17631766
);
17641767

@@ -1775,9 +1778,8 @@ public void testCancellationDoesNotMaskCircuitBreakerException() throws Exceptio
17751778

17761779
// making sure circuit breaker trips first
17771780
circuitBreaker.shouldBreak.set(true);
1778-
AtomicInteger checkCount = new AtomicInteger(0);
1779-
int cancelAfter = expectedNumResults + 1;
1780-
1781+
int partialReduceMethodCallCount = expectedNumResults / batchedReduceSize;
1782+
AtomicInteger checkCount = new AtomicInteger(expectedNumResults + partialReduceMethodCallCount);
17811783
QueryPhaseResultConsumer consumer = searchPhaseController.newSearchPhaseResults(
17821784
fixedExecutor,
17831785
circuitBreaker,
@@ -1786,7 +1788,7 @@ public void testCancellationDoesNotMaskCircuitBreakerException() throws Exceptio
17861788
expectedNumResults,
17871789
exc -> {},
17881790
() -> {
1789-
return checkCount.incrementAndGet() > cancelAfter;
1791+
return checkCount.decrementAndGet() <= 0;
17901792
}
17911793
);
17921794

@@ -1826,13 +1828,7 @@ private static void consumeShardLevelQueryPhaseResultsAsync(int expectedNumResul
18261828
result.setShardIndex(index);
18271829
result.size(1);
18281830

1829-
try {
1830-
consumer.consumeResult(result, latch::countDown);
1831-
} catch (Exception e) {
1832-
// Ensure latch counts down even on cancellation
1833-
latch.countDown();
1834-
// Don't rethrow - let the thread complete normally
1835-
}
1831+
consumer.consumeResult(result, latch::countDown);
18361832
});
18371833
threads[index].start();
18381834
}

0 commit comments

Comments
 (0)