Skip to content

Commit 1d2a809

Browse files
Add skiplist optimization to auto_date_histogram aggregation (#20057)
Signed-off-by: Asim Mahmood <asim.seng@gmail.com> (cherry picked from commit dfef2c1) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent f791594 commit 1d2a809

File tree

6 files changed

+541
-161
lines changed

6 files changed

+541
-161
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7171
- Refactor the Cache.CacheStats class to use the Builder pattern instead of constructors ([#20015](https://github.com/opensearch-project/OpenSearch/pull/20015))
7272
- Refactor the HttpStats, ScriptStats, AdaptiveSelectionStats and OsStats class to use the Builder pattern instead of constructors ([#20014](https://github.com/opensearch-project/OpenSearch/pull/20014))
7373
- Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059))
74-
7574
- Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966))
75+
- Add skiplist optimization to auto_date_histogram aggregation ([#20057](https://github.com/opensearch-project/OpenSearch/pull/20057))
76+
- Throw exceptions for currently unsupported GRPC request-side fields ([#20162](https://github.com/opensearch-project/OpenSearch/pull/20162))
7677

7778
### Fixed
7879
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))

server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java

Lines changed: 87 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,39 @@
1313
import org.apache.lucene.search.DocIdStream;
1414
import org.apache.lucene.search.Scorable;
1515
import org.opensearch.common.Rounding;
16+
import org.opensearch.search.aggregations.Aggregator;
17+
import org.opensearch.search.aggregations.AggregatorBase;
1618
import org.opensearch.search.aggregations.LeafBucketCollector;
19+
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
1720
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
1821

1922
import java.io.IOException;
23+
import java.util.function.LongFunction;
24+
import java.util.function.Supplier;
2025

2126
/**
2227
* Histogram collection logic using skip list.
2328
*
29+
* Currently, it can only handle one owningBucketOrd at a time.
30+
*
2431
* @opensearch.internal
2532
*/
2633
public class HistogramSkiplistLeafCollector extends LeafBucketCollector {
2734

2835
private final NumericDocValues values;
2936
private final DocValuesSkipper skipper;
30-
private final Rounding.Prepared preparedRounding;
31-
private final LongKeyedBucketOrds bucketOrds;
3237
private final LeafBucketCollector sub;
38+
private final boolean isSubNoOp;
3339
private final BucketsAggregator aggregator;
3440

41+
/**
42+
* Supplier function to get the current preparedRounding from the parent aggregator.
43+
* This allows detection of rounding changes in AutoDateHistogramAggregator.
44+
*/
45+
private final LongFunction<Rounding.Prepared> preparedRoundingSupplier;
46+
private final Supplier<LongKeyedBucketOrds> bucketOrdsSupplier;
47+
private final IncreaseRoundingIfNeeded increaseRoundingIfNeeded;
48+
3549
/**
3650
* Max doc ID (inclusive) up to which all docs values may map to the same
3751
* bucket.
@@ -48,20 +62,43 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector {
4862
*/
4963
private long upToBucketIndex;
5064

65+
/**
66+
* Tracks the last preparedRounding reference to detect rounding changes.
67+
* Used for cache invalidation when AutoDateHistogramAggregator changes rounding.
68+
*/
69+
private Rounding.Prepared lastPreparedRounding;
70+
5171
public HistogramSkiplistLeafCollector(
5272
NumericDocValues values,
5373
DocValuesSkipper skipper,
5474
Rounding.Prepared preparedRounding,
5575
LongKeyedBucketOrds bucketOrds,
5676
LeafBucketCollector sub,
5777
BucketsAggregator aggregator
78+
) {
79+
this(values, skipper, (owningBucketOrd) -> preparedRounding, () -> bucketOrds, sub, aggregator, (owningBucketOrd, rounded) -> {});
80+
}
81+
82+
/**
83+
* Constructor that accepts a supplier for dynamic rounding (used by AutoDateHistogramAggregator).
84+
*/
85+
public HistogramSkiplistLeafCollector(
86+
NumericDocValues values,
87+
DocValuesSkipper skipper,
88+
LongFunction<Rounding.Prepared> preparedRoundingSupplier,
89+
Supplier<LongKeyedBucketOrds> bucketOrdsSupplier,
90+
LeafBucketCollector sub,
91+
BucketsAggregator aggregator,
92+
IncreaseRoundingIfNeeded increaseRoundingIfNeeded
5893
) {
5994
this.values = values;
6095
this.skipper = skipper;
61-
this.preparedRounding = preparedRounding;
62-
this.bucketOrds = bucketOrds;
96+
this.preparedRoundingSupplier = preparedRoundingSupplier;
97+
this.bucketOrdsSupplier = bucketOrdsSupplier;
6398
this.sub = sub;
99+
this.isSubNoOp = (sub == NO_OP_COLLECTOR);
64100
this.aggregator = aggregator;
101+
this.increaseRoundingIfNeeded = increaseRoundingIfNeeded;
65102
}
66103

67104
@Override
@@ -87,17 +124,20 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
87124

88125
upToInclusive = skipper.maxDocID(0);
89126

127+
// Get current rounding from supplier
128+
Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd);
129+
90130
// Now find the highest level where all docs map to the same bucket.
91131
for (int level = 0; level < skipper.numLevels(); ++level) {
92132
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
93-
long minBucket = preparedRounding.round(skipper.minValue(level));
94-
long maxBucket = preparedRounding.round(skipper.maxValue(level));
133+
long minBucket = currentRounding.round(skipper.minValue(level));
134+
long maxBucket = currentRounding.round(skipper.maxValue(level));
95135

96136
if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
97137
// All docs at this level have a value, and all values map to the same bucket.
98138
upToInclusive = skipper.maxDocID(level);
99139
upToSameBucket = true;
100-
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
140+
upToBucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, maxBucket);
101141
if (upToBucketIndex < 0) {
102142
upToBucketIndex = -1 - upToBucketIndex;
103143
}
@@ -109,6 +149,16 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
109149

110150
@Override
111151
public void collect(int doc, long owningBucketOrd) throws IOException {
152+
Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd);
153+
154+
// Check if rounding changed (using reference equality)
155+
// AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes
156+
if (currentRounding != lastPreparedRounding) {
157+
upToInclusive = -1; // Invalidate
158+
upToSameBucket = false;
159+
lastPreparedRounding = currentRounding;
160+
}
161+
112162
if (doc > upToInclusive) {
113163
advanceSkipper(doc, owningBucketOrd);
114164
}
@@ -118,12 +168,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
118168
sub.collect(doc, upToBucketIndex);
119169
} else if (values.advanceExact(doc)) {
120170
final long value = values.longValue();
121-
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
171+
long rounded = currentRounding.round(value);
172+
long bucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, rounded);
122173
if (bucketIndex < 0) {
123174
bucketIndex = -1 - bucketIndex;
124175
aggregator.collectExistingBucket(sub, doc, bucketIndex);
125176
} else {
126177
aggregator.collectBucket(sub, doc, bucketIndex);
178+
increaseRoundingIfNeeded.accept(owningBucketOrd, rounded);
127179
}
128180
}
129181
}
@@ -136,15 +188,14 @@ public void collect(DocIdStream stream) throws IOException {
136188

137189
@Override
138190
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
139-
// This will only be called if its the sub aggregation
140191
for (;;) {
141192
int upToExclusive = upToInclusive + 1;
142193
if (upToExclusive < 0) { // overflow
143194
upToExclusive = Integer.MAX_VALUE;
144195
}
145196

146197
if (upToSameBucket) {
147-
if (sub == NO_OP_COLLECTOR) {
198+
if (isSubNoOp) {
148199
// stream.count maybe faster when we don't need to handle sub-aggs
149200
long count = stream.count(upToExclusive);
150201
aggregator.incrementBucketDocCount(upToBucketIndex, count);
@@ -167,4 +218,30 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException
167218
}
168219
}
169220
}
221+
222+
/**
223+
* Call back for auto date histogram
224+
*
225+
* @opensearch.internal
226+
*/
227+
public interface IncreaseRoundingIfNeeded {
228+
void accept(long owningBucket, long rounded);
229+
}
230+
231+
/**
232+
* Skiplist is based as top level agg (null parent) or parent that will execute in sorted order
233+
*
234+
*/
235+
public static boolean canUseSkiplist(LongBounds hardBounds, Aggregator parent, DocValuesSkipper skipper, NumericDocValues singleton) {
236+
if (skipper == null || singleton == null) return false;
237+
// TODO: add hard bounds support
238+
if (hardBounds != null) return false;
239+
240+
if (parent == null) return true;
241+
242+
if (parent instanceof AggregatorBase base) {
243+
return base.getLeafCollectorMode() == AggregatorBase.LeafCollectionMode.FILTER_REWRITE;
244+
}
245+
return false;
246+
}
170247
}

0 commit comments

Comments
 (0)