Skip to content

Commit 17b762a

Browse files
Add sub aggregation support for histogram aggregation using skiplist (#19438)
Signed-off-by: Asim Mahmood <asim.seng@gmail.com> Signed-off-by: Ankit Jain <jainankitk@apache.org> Co-authored-by: Ankit Jain <jainankitk@apache.org>
1 parent 08ed9ee commit 17b762a

File tree

3 files changed

+208
-68
lines changed

3 files changed

+208
-68
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4343
- Implement GRPC GeoBoundingBox, GeoDistance queries ([#19451](https://github.com/opensearch-project/OpenSearch/pull/19451))
4444
- Implement GRPC Ids, Range, and Terms Set queries ([#19448](https://github.com/opensearch-project/OpenSearch/pull/19448))
4545
- Implement GRPC Nested query ([#19453](https://github.com/opensearch-project/OpenSearch/pull/19453))
46+
- Add sub aggregation support for histogram aggregation using skiplist ([19438](https://github.com/opensearch-project/OpenSearch/pull/19438))
4647
- Optimization in String Terms Aggregation query for Large Bucket Counts([#18732](https://github.com/opensearch-project/OpenSearch/pull/18732))
4748

4849
### Changed

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
115115
private final String fieldName;
116116
private final boolean fieldIndexSort;
117117

118+
// Collector usage tracking fields
119+
private int noOpCollectorsUsed;
120+
private int singleValuedCollectorsUsed;
121+
private int multiValuedCollectorsUsed;
122+
private int skipListCollectorsUsed;
123+
118124
DateHistogramAggregator(
119125
String name,
120126
AggregatorFactories factories,
@@ -215,6 +221,7 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws
215221
@Override
216222
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
217223
if (valuesSource == null) {
224+
noOpCollectorsUsed++;
218225
return LeafBucketCollector.NO_OP_COLLECTOR;
219226
}
220227

@@ -225,17 +232,17 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
225232
final SortedNumericDocValues values = valuesSource.longValues(ctx);
226233
final NumericDocValues singleton = DocValues.unwrapSingleton(values);
227234

228-
// If no subaggregations and index sorted on given field, we can use skip list based collector
229-
logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper);
230-
if (fieldIndexSort && skipper != null && singleton != null) {
235+
if (skipper != null && singleton != null) {
231236
// TODO: add hard bounds support
232-
if (hardBounds != null || sub == null || sub == LeafBucketCollector.NO_OP_COLLECTOR) {
233-
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, this::incrementBucketDocCount);
237+
if (hardBounds == null && parent == null) {
238+
skipListCollectorsUsed++;
239+
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this);
234240
}
235241
}
236242

237243
if (singleton != null) {
238244
// Optimized path for single-valued fields
245+
singleValuedCollectorsUsed++;
239246
return new LeafBucketCollectorBase(sub, values) {
240247
@Override
241248
public void collect(int doc, long owningBucketOrd) throws IOException {
@@ -248,6 +255,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
248255
}
249256

250257
// Original path for multi-valued fields
258+
multiValuedCollectorsUsed++;
251259
return new LeafBucketCollectorBase(sub, values) {
252260
@Override
253261
public void collect(int doc, long owningBucketOrd) throws IOException {
@@ -404,8 +412,13 @@ public void doClose() {
404412

405413
@Override
406414
public void collectDebugInfo(BiConsumer<String, Object> add) {
415+
super.collectDebugInfo(add);
407416
add.accept("total_buckets", bucketOrds.size());
408417
filterRewriteOptimizationContext.populateDebugInfo(add);
418+
add.accept("no_op_collectors_used", noOpCollectorsUsed);
419+
add.accept("single_valued_collectors_used", singleValuedCollectorsUsed);
420+
add.accept("multi_valued_collectors_used", multiValuedCollectorsUsed);
421+
add.accept("skip_list_collectors_used", skipListCollectorsUsed);
409422
}
410423

411424
/**
@@ -426,7 +439,8 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector
426439
private final DocValuesSkipper skipper;
427440
private final Rounding.Prepared preparedRounding;
428441
private final LongKeyedBucketOrds bucketOrds;
429-
private final BiConsumer<Long, Long> incrementDocCount;
442+
private final LeafBucketCollector sub;
443+
private final BucketsAggregator aggregator;
430444

431445
/**
432446
* Max doc ID (inclusive) up to which all docs values may map to the same bucket.
@@ -448,19 +462,25 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector
448462
DocValuesSkipper skipper,
449463
Rounding.Prepared preparedRounding,
450464
LongKeyedBucketOrds bucketOrds,
451-
BiConsumer<Long, Long> incrementDocCount
465+
LeafBucketCollector sub,
466+
BucketsAggregator aggregator
452467
) {
453468
this.values = values;
454469
this.skipper = skipper;
455470
this.preparedRounding = preparedRounding;
456471
this.bucketOrds = bucketOrds;
457-
this.incrementDocCount = incrementDocCount;
472+
this.sub = sub;
473+
this.aggregator = aggregator;
458474
}
459475

460476
@Override
461-
public void setScorer(Scorable scorer) throws IOException {}
477+
public void setScorer(Scorable scorer) throws IOException {
478+
if (sub != null) {
479+
sub.setScorer(scorer);
480+
}
481+
}
462482

463-
private void advanceSkipper(int doc) throws IOException {
483+
private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
464484
if (doc > skipper.maxDocID(0)) {
465485
skipper.advance(doc);
466486
}
@@ -485,7 +505,7 @@ private void advanceSkipper(int doc) throws IOException {
485505
// All docs at this level have a value, and all values map to the same bucket.
486506
upToInclusive = skipper.maxDocID(level);
487507
upToSameBucket = true;
488-
upToBucketIndex = bucketOrds.add(0, maxBucket);
508+
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
489509
if (upToBucketIndex < 0) {
490510
upToBucketIndex = -1 - upToBucketIndex;
491511
}
@@ -497,48 +517,64 @@ private void advanceSkipper(int doc) throws IOException {
497517

498518
@Override
499519
public void collect(int doc, long owningBucketOrd) throws IOException {
500-
collect(doc);
501-
}
502-
503-
@Override
504-
public void collect(int doc) throws IOException {
505520
if (doc > upToInclusive) {
506-
advanceSkipper(doc);
521+
advanceSkipper(doc, owningBucketOrd);
507522
}
508523

509524
if (upToSameBucket) {
510-
incrementDocCount.accept(upToBucketIndex, 1L);
525+
aggregator.incrementBucketDocCount(upToBucketIndex, 1L);
526+
sub.collect(doc, upToBucketIndex);
511527
} else if (values.advanceExact(doc)) {
512528
final long value = values.longValue();
513-
long bucketIndex = bucketOrds.add(0, preparedRounding.round(value));
529+
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
514530
if (bucketIndex < 0) {
515531
bucketIndex = -1 - bucketIndex;
532+
aggregator.collectExistingBucket(sub, doc, bucketIndex);
533+
} else {
534+
aggregator.collectBucket(sub, doc, bucketIndex);
516535
}
517-
incrementDocCount.accept(bucketIndex, 1L);
518536
}
519537
}
520538

539+
@Override
540+
public void collect(int doc) throws IOException {
541+
collect(doc, 0);
542+
}
543+
521544
@Override
522545
public void collect(DocIdStream stream) throws IOException {
546+
// This will only be called if its the top agg
523547
for (;;) {
524548
int upToExclusive = upToInclusive + 1;
525549
if (upToExclusive < 0) { // overflow
526550
upToExclusive = Integer.MAX_VALUE;
527551
}
528552

529553
if (upToSameBucket) {
530-
long count = stream.count(upToExclusive);
531-
incrementDocCount.accept(upToBucketIndex, count);
554+
if (sub == NO_OP_COLLECTOR) {
555+
// stream.count maybe faster when we don't need to handle sub-aggs
556+
long count = stream.count(upToExclusive);
557+
aggregator.incrementBucketDocCount(upToBucketIndex, count);
558+
} else {
559+
final int[] count = { 0 };
560+
stream.forEach(upToExclusive, doc -> {
561+
sub.collect(doc, upToBucketIndex);
562+
count[0]++;
563+
});
564+
aggregator.incrementBucketDocCount(upToBucketIndex, count[0]);
565+
}
566+
532567
} else {
533568
stream.forEach(upToExclusive, this::collect);
534569
}
535570

536571
if (stream.mayHaveRemaining()) {
537-
advanceSkipper(upToExclusive);
572+
advanceSkipper(upToExclusive, 0);
538573
} else {
539574
break;
540575
}
541576
}
542577
}
578+
543579
}
544580
}

0 commit comments

Comments
 (0)