Skip to content

Commit 18cbfa0

Browse files
committed
rename all "merge" task to "reduce"
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 0cba011 commit 18cbfa0

File tree

2 files changed

+79
-69
lines changed

2 files changed

+79
-69
lines changed

server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 78 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
8686
private final boolean hasAggs;
8787
private final boolean performFinalReduce;
8888

89-
final PendingMerges pendingMerges;
89+
final PendingReduces pendingReduces;
9090
private final Consumer<Exception> cancelTaskOnFailure;
9191
private final BooleanSupplier isTaskCancelled;
9292

@@ -143,7 +143,7 @@ public QueryPhaseResultConsumer(
143143
this.hasTopDocs = source == null || source.size() != 0;
144144
this.hasAggs = source != null && source.aggregations() != null;
145145
int batchReduceSize = getBatchReduceSize(request.getBatchedReduceSize(), expectedResultSize);
146-
this.pendingMerges = new PendingMerges(batchReduceSize, request.resolveTrackTotalHitsUpTo());
146+
this.pendingReduces = new PendingReduces(batchReduceSize, request.resolveTrackTotalHitsUpTo());
147147
this.isTaskCancelled = isTaskCancelled;
148148
}
149149

@@ -153,52 +153,56 @@ int getBatchReduceSize(int requestBatchedReduceSize, int minBatchReduceSize) {
153153

154154
@Override
155155
public void close() {
156-
Releasables.close(pendingMerges);
156+
Releasables.close(pendingReduces);
157157
}
158158

159159
@Override
160160
public void consumeResult(SearchPhaseResult result, Runnable next) {
161161
super.consumeResult(result, () -> {});
162162
QuerySearchResult querySearchResult = result.queryResult();
163163
progressListener.notifyQueryResult(querySearchResult.getShardIndex());
164-
pendingMerges.consume(querySearchResult, next);
164+
pendingReduces.consume(querySearchResult, next);
165165
}
166166

167167
@Override
168168
public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
169-
if (pendingMerges.hasPendingMerges()) {
169+
if (pendingReduces.hasPendingReduceTask()) {
170170
throw new AssertionError("partial reduce in-flight");
171171
}
172172
checkCancellation();
173-
if (pendingMerges.hasFailure()) {
174-
throw pendingMerges.failure.get();
173+
if (pendingReduces.hasFailure()) {
174+
throw pendingReduces.failure.get();
175175
}
176176

177177
// ensure consistent ordering
178-
pendingMerges.sortBuffer();
179-
final SearchPhaseController.TopDocsStats topDocsStats = pendingMerges.consumeTopDocsStats();
180-
final List<TopDocs> topDocsList = pendingMerges.consumeTopDocs();
181-
final List<InternalAggregations> aggsList = pendingMerges.consumeAggs();
182-
long breakerSize = pendingMerges.circuitBreakerBytes;
178+
pendingReduces.sortBuffer();
179+
final SearchPhaseController.TopDocsStats topDocsStats = pendingReduces.consumeTopDocsStats();
180+
final List<TopDocs> topDocsList = pendingReduces.consumeTopDocs();
181+
final List<InternalAggregations> aggsList = pendingReduces.consumeAggs();
182+
long breakerSize = pendingReduces.circuitBreakerBytes;
183183
if (hasAggs) {
184184
// Add an estimate of the final reduce size
185-
breakerSize = pendingMerges.addEstimateAndMaybeBreak(pendingMerges.estimateRamBytesUsedForReduce(breakerSize));
185+
breakerSize = pendingReduces.addEstimateAndMaybeBreak(pendingReduces.estimateRamBytesUsedForReduce(breakerSize));
186186
}
187187
SearchPhaseController.ReducedQueryPhase reducePhase = controller.reducedQueryPhase(
188188
results.asList(),
189189
aggsList,
190190
topDocsList,
191191
topDocsStats,
192-
pendingMerges.numReducePhases,
192+
pendingReduces.numReducePhases,
193193
false,
194194
aggReduceContextBuilder,
195195
performFinalReduce
196196
);
197197
if (hasAggs) {
198198
// Update the circuit breaker to replace the estimation with the serialized size of the newly reduced result
199199
long finalSize = reducePhase.aggregations.getSerializedSize() - breakerSize;
200-
pendingMerges.addWithoutBreaking(finalSize);
201-
logger.trace("aggs final reduction [{}] max [{}]", pendingMerges.aggsCurrentBufferSize, pendingMerges.maxAggsCurrentBufferSize);
200+
pendingReduces.addWithoutBreaking(finalSize);
201+
logger.trace(
202+
"aggs final reduction [{}] max [{}]",
203+
pendingReduces.aggsCurrentBufferSize,
204+
pendingReduces.maxAggsCurrentBufferSize
205+
);
202206
}
203207
progressListener.notifyFinalReduce(
204208
SearchProgressListener.buildSearchShards(results.asList()),
@@ -209,16 +213,16 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
209213
return reducePhase;
210214
}
211215

212-
private MergeResult partialReduce(
216+
private ReduceResult partialReduce(
213217
QuerySearchResult[] toConsume,
214218
List<SearchShard> emptyResults,
215219
SearchPhaseController.TopDocsStats topDocsStats,
216-
MergeResult lastMerge,
220+
ReduceResult lastReduceResult,
217221
int numReducePhases
218222
) {
219223
checkCancellation();
220-
if (pendingMerges.hasFailure()) {
221-
return lastMerge;
224+
if (pendingReduces.hasFailure()) {
225+
return lastReduceResult;
222226
}
223227
// ensure consistent ordering
224228
Arrays.sort(toConsume, Comparator.comparingInt(QuerySearchResult::getShardIndex));
@@ -230,8 +234,8 @@ private MergeResult partialReduce(
230234
final TopDocs newTopDocs;
231235
if (hasTopDocs) {
232236
List<TopDocs> topDocsList = new ArrayList<>();
233-
if (lastMerge != null) {
234-
topDocsList.add(lastMerge.reducedTopDocs);
237+
if (lastReduceResult != null) {
238+
topDocsList.add(lastReduceResult.reducedTopDocs);
235239
}
236240
for (QuerySearchResult result : toConsume) {
237241
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
@@ -251,8 +255,8 @@ private MergeResult partialReduce(
251255
final InternalAggregations newAggs;
252256
if (hasAggs) {
253257
List<InternalAggregations> aggsList = new ArrayList<>();
254-
if (lastMerge != null) {
255-
aggsList.add(lastMerge.reducedAggs);
258+
if (lastReduceResult != null) {
259+
aggsList.add(lastReduceResult.reducedAggs);
256260
}
257261
for (QuerySearchResult result : toConsume) {
258262
aggsList.add(result.consumeAggs().expand());
@@ -262,8 +266,8 @@ private MergeResult partialReduce(
262266
newAggs = null;
263267
}
264268
List<SearchShard> processedShards = new ArrayList<>(emptyResults);
265-
if (lastMerge != null) {
266-
processedShards.addAll(lastMerge.processedShards);
269+
if (lastReduceResult != null) {
270+
processedShards.addAll(lastReduceResult.processedShards);
267271
}
268272
for (QuerySearchResult result : toConsume) {
269273
SearchShardTarget target = result.getSearchShardTarget();
@@ -273,25 +277,31 @@ private MergeResult partialReduce(
273277
// we leave the results un-serialized because serializing is slow but we compute the serialized
274278
// size as an estimate of the memory used by the newly reduced aggregations.
275279
long serializedSize = hasAggs ? newAggs.getSerializedSize() : 0;
276-
return new MergeResult(processedShards, newTopDocs, newAggs, hasAggs ? serializedSize : 0);
280+
return new ReduceResult(processedShards, newTopDocs, newAggs, hasAggs ? serializedSize : 0);
277281
}
278282

279283
private void checkCancellation() {
280284
if (isTaskCancelled.getAsBoolean()) {
281-
pendingMerges.onFailure(new TaskCancelledException("request has been terminated"));
285+
pendingReduces.onFailure(new TaskCancelledException("request has been terminated"));
282286
}
283287
}
284288

285289
public int getNumReducePhases() {
286-
return pendingMerges.numReducePhases;
290+
return pendingReduces.numReducePhases;
287291
}
288292

289293
/**
290-
* Class representing pending merges
294+
* Manages incremental query result reduction by buffering incoming results and
295+
* triggering partial reduce operations when the threshold is reached.
296+
* <ul>
297+
* <li>Handles circuit breaker memory accounting</li>
298+
* <li>Coordinates reduce task execution to be one at a time</li>
299+
* <li>Provides thread-safe failure handling with cleanup</li>
300+
* </ul>
291301
*
292302
* @opensearch.internal
293303
*/
294-
class PendingMerges implements Releasable {
304+
class PendingReduces implements Releasable {
295305
private final int batchReduceSize;
296306
private final List<QuerySearchResult> buffer = new ArrayList<>();
297307
private final List<SearchShard> emptyResults = new ArrayList<>();
@@ -301,23 +311,23 @@ class PendingMerges implements Releasable {
301311
private volatile long aggsCurrentBufferSize;
302312
private volatile long maxAggsCurrentBufferSize = 0;
303313

304-
private final ArrayDeque<MergeTask> queue = new ArrayDeque<>();
305-
private final AtomicReference<MergeTask> runningTask = new AtomicReference<>(); // ensure only one task is running
314+
private final ArrayDeque<ReduceTask> queue = new ArrayDeque<>();
315+
private final AtomicReference<ReduceTask> runningTask = new AtomicReference<>(); // ensure only one task is running
306316
private final AtomicReference<Exception> failure = new AtomicReference<>();
307317

308318
private final SearchPhaseController.TopDocsStats topDocsStats;
309-
private volatile MergeResult mergeResult;
319+
private volatile ReduceResult reduceResult;
310320
private volatile boolean hasPartialReduce;
311321
private volatile int numReducePhases;
312322

313-
PendingMerges(int batchReduceSize, int trackTotalHitsUpTo) {
323+
PendingReduces(int batchReduceSize, int trackTotalHitsUpTo) {
314324
this.batchReduceSize = batchReduceSize;
315325
this.topDocsStats = new SearchPhaseController.TopDocsStats(trackTotalHitsUpTo);
316326
}
317327

318328
@Override
319329
public synchronized void close() {
320-
assert hasPendingMerges() == false : "cannot close with partial reduce in-flight";
330+
assert hasPendingReduceTask() == false : "cannot close with partial reduce in-flight";
321331
if (hasFailure()) {
322332
assert circuitBreakerBytes == 0;
323333
return;
@@ -331,7 +341,7 @@ private boolean hasFailure() {
331341
return failure.get() != null;
332342
}
333343

334-
private boolean hasPendingMerges() {
344+
private boolean hasPendingReduceTask() {
335345
return queue.isEmpty() == false || runningTask.get() != null;
336346
}
337347

@@ -424,23 +434,23 @@ private synchronized boolean consumeResult(QuerySearchResult result, Runnable ca
424434
int size = buffer.size() + (hasPartialReduce ? 1 : 0);
425435
if (size >= batchReduceSize) {
426436
hasPartialReduce = true;
427-
// the callback must wait for the new merge task to complete to maintain proper result processing order
437+
// the callback must wait for the new reduce task to complete to maintain proper result processing order
428438
QuerySearchResult[] clone = buffer.toArray(QuerySearchResult[]::new);
429-
MergeTask task = new MergeTask(clone, aggsCurrentBufferSize, new ArrayList<>(emptyResults), callback);
439+
ReduceTask task = new ReduceTask(clone, aggsCurrentBufferSize, new ArrayList<>(emptyResults), callback);
430440
aggsCurrentBufferSize = 0;
431441
buffer.clear();
432442
emptyResults.clear();
433443
queue.add(task);
434444
tryExecuteNext();
435445
buffer.add(result);
436-
return false; // callback will be run by merge task
446+
return false; // callback will be run by reduce task
437447
}
438448
buffer.add(result);
439449
return true;
440450
}
441451

442452
private void tryExecuteNext() {
443-
final MergeTask task;
453+
final ReduceTask task;
444454
synchronized (this) {
445455
if (hasFailure()) {
446456
return;
@@ -455,51 +465,51 @@ private void tryExecuteNext() {
455465
executor.execute(new AbstractRunnable() {
456466
@Override
457467
protected void doRun() {
458-
final MergeResult thisMergeResult = mergeResult;
459-
long estimatedTotalSize = (thisMergeResult != null ? thisMergeResult.estimatedSize : 0) + task.aggsBufferSize;
460-
final MergeResult newMerge;
468+
final ReduceResult thisReduceResult = reduceResult;
469+
long estimatedTotalSize = (thisReduceResult != null ? thisReduceResult.estimatedSize : 0) + task.aggsBufferSize;
470+
final ReduceResult newReduceResult;
461471
try {
462472
final QuerySearchResult[] toConsume = task.consumeBuffer();
463473
if (toConsume == null) {
464-
onAfterMerge(task, null, 0);
474+
onAfterReduce(task, null, 0);
465475
return;
466476
}
467477
long estimateRamBytesUsedForReduce = estimateRamBytesUsedForReduce(estimatedTotalSize);
468478
addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce);
469479
estimatedTotalSize += estimateRamBytesUsedForReduce;
470480
++numReducePhases;
471-
newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases);
481+
newReduceResult = partialReduce(toConsume, task.emptyResults, topDocsStats, thisReduceResult, numReducePhases);
472482
} catch (Exception t) {
473-
PendingMerges.this.onFailure(t);
483+
PendingReduces.this.onFailure(t);
474484
return;
475485
}
476-
onAfterMerge(task, newMerge, estimatedTotalSize);
486+
onAfterReduce(task, newReduceResult, estimatedTotalSize);
477487
}
478488

479489
@Override
480490
public void onFailure(Exception exc) {
481-
PendingMerges.this.onFailure(exc);
491+
PendingReduces.this.onFailure(exc);
482492
}
483493
});
484494
}
485495

486-
private void onAfterMerge(MergeTask task, MergeResult newResult, long estimatedSize) {
496+
private void onAfterReduce(ReduceTask task, ReduceResult newResult, long estimatedSize) {
487497
if (newResult != null) {
488498
synchronized (this) {
489499
if (hasFailure()) {
490500
return;
491501
}
492502
runningTask.compareAndSet(task, null);
493-
mergeResult = newResult;
503+
reduceResult = newResult;
494504
if (hasAggs) {
495505
// Update the circuit breaker to remove the size of the source aggregations
496506
// and replace the estimation with the serialized size of the newly reduced result.
497-
long newSize = mergeResult.estimatedSize - estimatedSize;
507+
long newSize = reduceResult.estimatedSize - estimatedSize;
498508
addWithoutBreaking(newSize);
499509
logger.trace(
500510
"aggs partial reduction [{}->{}] max [{}]",
501511
estimatedSize,
502-
mergeResult.estimatedSize,
512+
reduceResult.estimatedSize,
503513
maxAggsCurrentBufferSize
504514
);
505515
}
@@ -518,21 +528,21 @@ private synchronized void onFailure(Exception exc) {
518528
assert circuitBreakerBytes >= 0;
519529
resetCircuitBreaker();
520530
failure.compareAndSet(null, exc);
521-
clearMergeTaskQueue();
531+
clearReduceTaskQueue();
522532
cancelTaskOnFailure.accept(exc);
523533
}
524534

525-
private synchronized void clearMergeTaskQueue() {
526-
MergeTask task = runningTask.get();
535+
private synchronized void clearReduceTaskQueue() {
536+
ReduceTask task = runningTask.get();
527537
runningTask.compareAndSet(task, null);
528-
List<MergeTask> toCancels = new ArrayList<>();
538+
List<ReduceTask> toCancels = new ArrayList<>();
529539
if (task != null) {
530540
toCancels.add(task);
531541
}
532542
toCancels.addAll(queue);
533543
queue.clear();
534-
mergeResult = null;
535-
for (MergeTask toCancel : toCancels) {
544+
reduceResult = null;
545+
for (ReduceTask toCancel : toCancels) {
536546
toCancel.cancel();
537547
}
538548
}
@@ -549,8 +559,8 @@ private synchronized List<TopDocs> consumeTopDocs() {
549559
return Collections.emptyList();
550560
}
551561
List<TopDocs> topDocsList = new ArrayList<>();
552-
if (mergeResult != null) {
553-
topDocsList.add(mergeResult.reducedTopDocs);
562+
if (reduceResult != null) {
563+
topDocsList.add(reduceResult.reducedTopDocs);
554564
}
555565
for (QuerySearchResult result : buffer) {
556566
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
@@ -565,8 +575,8 @@ private synchronized List<InternalAggregations> consumeAggs() {
565575
return Collections.emptyList();
566576
}
567577
List<InternalAggregations> aggsList = new ArrayList<>();
568-
if (mergeResult != null) {
569-
aggsList.add(mergeResult.reducedAggs);
578+
if (reduceResult != null) {
579+
aggsList.add(reduceResult.reducedAggs);
570580
}
571581
for (QuerySearchResult result : buffer) {
572582
aggsList.add(result.consumeAggs().expand());
@@ -576,26 +586,26 @@ private synchronized List<InternalAggregations> consumeAggs() {
576586
}
577587

578588
/**
579-
* A single merge result
589+
* Immutable container holding the outcome of a partial reduce operation
580590
*
581591
* @opensearch.internal
582592
*/
583-
private record MergeResult(List<SearchShard> processedShards, TopDocs reducedTopDocs, InternalAggregations reducedAggs,
593+
private record ReduceResult(List<SearchShard> processedShards, TopDocs reducedTopDocs, InternalAggregations reducedAggs,
584594
long estimatedSize) {
585595
}
586596

587597
/**
588-
* A single merge task
598+
* ReduceTask is created to reduce buffered query results when buffer size hits threshold
589599
*
590600
* @opensearch.internal
591601
*/
592-
private static class MergeTask {
602+
private static class ReduceTask {
593603
private final List<SearchShard> emptyResults;
594604
private QuerySearchResult[] buffer;
595605
private final long aggsBufferSize;
596606
private Runnable next;
597607

598-
private MergeTask(QuerySearchResult[] buffer, long aggsBufferSize, List<SearchShard> emptyResults, Runnable next) {
608+
private ReduceTask(QuerySearchResult[] buffer, long aggsBufferSize, List<SearchShard> emptyResults, Runnable next) {
599609
this.buffer = buffer;
600610
this.aggsBufferSize = aggsBufferSize;
601611
this.emptyResults = emptyResults;

server/src/main/java/org/opensearch/action/search/StreamQueryPhaseResultConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,6 @@ void consumeStreamResult(SearchPhaseResult result, Runnable next) {
5959
// For streaming, we skip the ArraySearchPhaseResults.consumeResult() call
6060
// since it doesn't support multiple results from the same shard.
6161
QuerySearchResult querySearchResult = result.queryResult();
62-
pendingMerges.consume(querySearchResult, next);
62+
pendingReduces.consume(querySearchResult, next);
6363
}
6464
}

0 commit comments

Comments
 (0)