Skip to content

Commit 7edf16f

Browse files
committed
Updated the final reduce part to select topN buckets
Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
1 parent f0da440 commit 7edf16f

File tree

1 file changed

+43
-31
lines changed

1 file changed

+43
-31
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -459,68 +459,80 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe
459459
final B[] list;
460460
if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) {
461461
final int size = Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size());
462-
if (size < reducedBuckets.size()) {
463-
Comparator<MultiBucketsAggregation.Bucket> cmp = order.comparator();
464-
B[] reducedBucketsArr = createBucketsArray(reducedBuckets.size());;
465-
for (int i = 0; i < reducedBuckets.size(); i++) {
466-
reducedBucketsArr[i] = reducedBuckets.get(i);
467-
}
468-
ArrayUtil.select(
469-
reducedBucketsArr,
470-
0,
471-
reducedBuckets.size(),
472-
size,
473-
cmp
474-
);
475-
int sz = 0;
476-
for (B bucket : reducedBucketsArr) {
462+
463+
if (size <= 1000) {
464+
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
465+
for (B bucket : reducedBuckets) {
477466
if (sumDocCountError == -1) {
478467
bucket.setDocCountError(-1);
479468
} else {
480469
final long finalSumDocCountError = sumDocCountError;
481470
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
482471
}
483472
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
484-
B removed = ((sz == size) ? bucket : null);
473+
B removed = ordered.insertWithOverflow(bucket);
485474
if (removed != null) {
486475
otherDocCount += removed.getDocCount();
487476
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
488477
} else {
489-
sz++;
490478
reduceContext.consumeBucketsAndMaybeBreak(1);
491479
}
492480
} else {
493481
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
494482
}
495483
}
496-
list = createBucketsArray(sz);
497-
if (sz >= 0) System.arraycopy(reducedBucketsArr, 0, list, 0, sz);
498-
Arrays.sort(list, cmp);
484+
list = createBucketsArray(ordered.size());
485+
for (int i = ordered.size() - 1; i >= 0; i--) {
486+
list[i] = ordered.pop();
487+
}
499488
} else {
500-
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
489+
final Comparator<MultiBucketsAggregation.Bucket> cmp = order.comparator();
490+
int validBucketCount = 0;
491+
// Process buckets and update doc count errors and count valid buckets
501492
for (B bucket : reducedBuckets) {
502493
if (sumDocCountError == -1) {
503494
bucket.setDocCountError(-1);
504495
} else {
505496
final long finalSumDocCountError = sumDocCountError;
506497
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
507498
}
499+
500+
if (bucket.getDocCount() < localBucketCountThresholds.getMinDocCount()) {
501+
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
502+
} else {
503+
validBucketCount++;
504+
}
505+
}
506+
// Create array and populate with valid buckets
507+
B[] validBuckets = createBucketsArray(validBucketCount);
508+
int arrayIndex = 0;
509+
for (B bucket : reducedBuckets) {
508510
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
509-
B removed = ordered.insertWithOverflow(bucket);
510-
if (removed != null) {
511-
otherDocCount += removed.getDocCount();
512-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
513-
} else {
514-
reduceContext.consumeBucketsAndMaybeBreak(1);
515-
}
511+
validBuckets[arrayIndex++] = bucket;
512+
}
513+
}
514+
// Select top buckets if needed
515+
if (size < validBuckets.length) {
516+
ArrayUtil.select(validBuckets, 0, validBuckets.length, size, cmp);
517+
}
518+
// Process selected buckets and calculate otherDocCount
519+
int finalSize = Math.min(size, validBuckets.length);
520+
for (int i = 0; i < validBuckets.length; i++) {
521+
B bucket = validBuckets[i];
522+
if (i < finalSize) {
523+
reduceContext.consumeBucketsAndMaybeBreak(1);
516524
} else {
525+
otherDocCount += bucket.getDocCount();
517526
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
518527
}
519528
}
520-
list = createBucketsArray(ordered.size());
521-
for (int i = ordered.size() - 1; i >= 0; i--) {
522-
list[i] = ordered.pop();
529+
if (finalSize != validBuckets.length) {
530+
list = createBucketsArray(finalSize);
531+
System.arraycopy(validBuckets, 0, list, 0, finalSize);
532+
} else {
533+
list = validBuckets;
523534
}
535+
Arrays.sort(list, cmp);
524536
}
525537
} else {
526538
// we can prune the list on partial reduce if the aggregation is ordered by key

0 commit comments

Comments
 (0)