Skip to content

Commit 6c58e5f

Browse files
committed
Fixed issues in reduce
Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
1 parent c563b43 commit 6c58e5f

File tree

1 file changed

+25
-23
lines changed

1 file changed

+25
-23
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -459,9 +459,9 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe
459459
final B[] list;
460460
if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) {
461461
final int size = Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size());
462-
463-
Comparator<MultiBucketsAggregation.Bucket> cmp = order.comparator();
464-
int countOfBucketsAboveMinCount = 0;
462+
final Comparator<MultiBucketsAggregation.Bucket> cmp = order.comparator();
463+
int validBucketCount = 0;
464+
// Process buckets and update doc count errors and count valid buckets
465465
for (B bucket : reducedBuckets) {
466466
if (sumDocCountError == -1) {
467467
bucket.setDocCountError(-1);
@@ -473,36 +473,38 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe
473473
if (bucket.getDocCount() < localBucketCountThresholds.getMinDocCount()) {
474474
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
475475
} else {
476-
countOfBucketsAboveMinCount++;
476+
validBucketCount++;
477477
}
478478
}
479-
480-
B[] reducedBucketsFinal = createBucketsArray(countOfBucketsAboveMinCount);
481-
int reducedBucketsFinalCount = 0;
479+
// Create array and populate with valid buckets
480+
B[] validBuckets = createBucketsArray(validBucketCount);
481+
int arrayIndex = 0;
482482
for (B bucket : reducedBuckets) {
483483
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
484-
reducedBucketsFinal[reducedBucketsFinalCount++] = bucket;
484+
validBuckets[arrayIndex++] = bucket;
485485
}
486486
}
487-
488-
if (size < reducedBucketsFinalCount) {
489-
ArrayUtil.select(reducedBucketsFinal, 0, reducedBucketsFinalCount, size, cmp);
487+
// Select top buckets if needed
488+
if (size < validBuckets.length) {
489+
ArrayUtil.select(validBuckets, 0, validBuckets.length, size, cmp);
490490
}
491-
492-
int selectSize = 0;
493-
for (B bucket : reducedBucketsFinal) {
494-
B removed = ((selectSize == size) ? bucket : null);
495-
if (removed != null) {
496-
otherDocCount += removed.getDocCount();
497-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
498-
} else {
499-
selectSize++;
491+
// Process selected buckets and calculate otherDocCount
492+
int finalSize = Math.min(size, validBuckets.length);
493+
for (int i = 0; i < validBuckets.length; i++) {
494+
B bucket = validBuckets[i];
495+
if (i < finalSize) {
500496
reduceContext.consumeBucketsAndMaybeBreak(1);
497+
} else {
498+
otherDocCount += bucket.getDocCount();
499+
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
501500
}
502501
}
503-
504-
list = createBucketsArray(selectSize);
505-
if (selectSize >= 0) System.arraycopy(reducedBucketsFinal, 0, list, 0, selectSize);
502+
if (finalSize != validBuckets.length) {
503+
list = createBucketsArray(finalSize);
504+
System.arraycopy(validBuckets, 0, list, 0, finalSize);
505+
} else {
506+
list = validBuckets;
507+
}
506508
Arrays.sort(list, cmp);
507509
} else {
508510
// we can prune the list on partial reduce if the aggregation is ordered by key

0 commit comments

Comments
 (0)