Skip to content
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor the Cache.CacheStats class to use the Builder pattern instead of constructors ([#20015](https://github.com/opensearch-project/OpenSearch/pull/20015))
- Refactor the HttpStats, ScriptStats, AdaptiveSelectionStats and OsStats class to use the Builder pattern instead of constructors ([#20014](https://github.com/opensearch-project/OpenSearch/pull/20014))
- Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059))

- Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966))
- Add skiplist optimization to auto_date_histogram aggregation ([#20057](https://github.com/opensearch-project/OpenSearch/pull/20057))
- Throw exceptions for currently unsupported GRPC request-side fields ([#20162](https://github.com/opensearch-project/OpenSearch/pull/20162))

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,39 @@
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.Rounding;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorBase;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;
import java.util.function.LongFunction;
import java.util.function.Supplier;

/**
* Histogram collection logic using skip list.
*
* Currently, it can only handle one owningBucketOrd at a time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be better to save the owningBucketOrd into a SetOnce reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to extend to use multiple bucket, like in #20175.

*
* @opensearch.internal
*/
public class HistogramSkiplistLeafCollector extends LeafBucketCollector {

private final NumericDocValues values;
private final DocValuesSkipper skipper;
private final Rounding.Prepared preparedRounding;
private final LongKeyedBucketOrds bucketOrds;
private final LeafBucketCollector sub;
private final boolean isSubNoOp;
private final BucketsAggregator aggregator;

/**
* Supplier function to get the current preparedRounding from the parent aggregator.
* This allows detection of rounding changes in AutoDateHistogramAggregator.
*/
private final LongFunction<Rounding.Prepared> preparedRoundingSupplier;
private final Supplier<LongKeyedBucketOrds> bucketOrdsSupplier;
private final IncreaseRoundingIfNeeded increaseRoundingIfNeeded;

/**
* Max doc ID (inclusive) up to which all docs values may map to the same
* bucket.
Expand All @@ -48,20 +62,43 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector {
*/
private long upToBucketIndex;

/**
* Tracks the last preparedRounding reference to detect rounding changes.
* Used for cache invalidation when AutoDateHistogramAggregator changes rounding.
*/
private Rounding.Prepared lastPreparedRounding;

public HistogramSkiplistLeafCollector(
NumericDocValues values,
DocValuesSkipper skipper,
Rounding.Prepared preparedRounding,
LongKeyedBucketOrds bucketOrds,
LeafBucketCollector sub,
BucketsAggregator aggregator
) {
this(values, skipper, (owningBucketOrd) -> preparedRounding, () -> bucketOrds, sub, aggregator, (owningBucketOrd, rounded) -> {});
}

/**
* Constructor that accepts a supplier for dynamic rounding (used by AutoDateHistogramAggregator).
*/
public HistogramSkiplistLeafCollector(
NumericDocValues values,
DocValuesSkipper skipper,
LongFunction<Rounding.Prepared> preparedRoundingSupplier,
Supplier<LongKeyedBucketOrds> bucketOrdsSupplier,
LeafBucketCollector sub,
BucketsAggregator aggregator,
IncreaseRoundingIfNeeded increaseRoundingIfNeeded
) {
this.values = values;
this.skipper = skipper;
this.preparedRounding = preparedRounding;
this.bucketOrds = bucketOrds;
this.preparedRoundingSupplier = preparedRoundingSupplier;
this.bucketOrdsSupplier = bucketOrdsSupplier;
this.sub = sub;
this.isSubNoOp = (sub == NO_OP_COLLECTOR);
this.aggregator = aggregator;
this.increaseRoundingIfNeeded = increaseRoundingIfNeeded;
}

@Override
Expand All @@ -87,17 +124,20 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {

upToInclusive = skipper.maxDocID(0);

// Get current rounding from supplier
Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd);

// Now find the highest level where all docs map to the same bucket.
for (int level = 0; level < skipper.numLevels(); ++level) {
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
long minBucket = preparedRounding.round(skipper.minValue(level));
long maxBucket = preparedRounding.round(skipper.maxValue(level));
long minBucket = currentRounding.round(skipper.minValue(level));
long maxBucket = currentRounding.round(skipper.maxValue(level));

if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
// All docs at this level have a value, and all values map to the same bucket.
upToInclusive = skipper.maxDocID(level);
upToSameBucket = true;
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
upToBucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, maxBucket);
if (upToBucketIndex < 0) {
upToBucketIndex = -1 - upToBucketIndex;
}
Expand All @@ -109,6 +149,16 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd);

// Check if rounding changed (using reference equality)
// AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes
if (currentRounding != lastPreparedRounding) {
upToInclusive = -1; // Invalidate
upToSameBucket = false;
lastPreparedRounding = currentRounding;
}

if (doc > upToInclusive) {
advanceSkipper(doc, owningBucketOrd);
}
Expand All @@ -118,12 +168,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
sub.collect(doc, upToBucketIndex);
} else if (values.advanceExact(doc)) {
final long value = values.longValue();
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
long rounded = currentRounding.round(value);
long bucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, rounded);
if (bucketIndex < 0) {
bucketIndex = -1 - bucketIndex;
aggregator.collectExistingBucket(sub, doc, bucketIndex);
} else {
aggregator.collectBucket(sub, doc, bucketIndex);
increaseRoundingIfNeeded.accept(owningBucketOrd, rounded);
}
}
}
Expand All @@ -136,15 +188,14 @@ public void collect(DocIdStream stream) throws IOException {

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
// This will only be called if its the sub aggregation
for (;;) {
int upToExclusive = upToInclusive + 1;
if (upToExclusive < 0) { // overflow
upToExclusive = Integer.MAX_VALUE;
}

if (upToSameBucket) {
if (sub == NO_OP_COLLECTOR) {
if (isSubNoOp) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below line 204, does it make sense to use collect(DocIdStream stream, long owningBucketOrd) api instead of iterating over docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea. The current DocIdStream api doesn't way to specify upTo, e.g.

sub.collect(steam.copy(upToExclusive), upToBucketIndex)

Plus we still need to count the them and stream is a read once. Will need to think about this more.

// stream.count maybe faster when we don't need to handle sub-aggs
long count = stream.count(upToExclusive);
aggregator.incrementBucketDocCount(upToBucketIndex, count);
Expand All @@ -167,4 +218,30 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException
}
}
}

/**
* Call back for auto date histogram
*
* @opensearch.internal
*/
public interface IncreaseRoundingIfNeeded {
void accept(long owningBucket, long rounded);
}

/**
* Skiplist is based as top level agg (null parent) or parent that will execute in sorted order
*
*/
public static boolean canUseSkiplist(LongBounds hardBounds, Aggregator parent, DocValuesSkipper skipper, NumericDocValues singleton) {
if (skipper == null || singleton == null) return false;
// TODO: add hard bounds support
if (hardBounds != null) return false;

if (parent == null) return true;

if (parent instanceof AggregatorBase base) {
return base.getLeafCollectorMode() == AggregatorBase.LeafCollectionMode.FILTER_REWRITE;
}
return false;
}
}
Loading
Loading