1313import org .apache .lucene .search .DocIdStream ;
1414import org .apache .lucene .search .Scorable ;
1515import org .opensearch .common .Rounding ;
16+ import org .opensearch .search .aggregations .Aggregator ;
17+ import org .opensearch .search .aggregations .AggregatorBase ;
1618import org .opensearch .search .aggregations .LeafBucketCollector ;
19+ import org .opensearch .search .aggregations .bucket .histogram .LongBounds ;
1720import org .opensearch .search .aggregations .bucket .terms .LongKeyedBucketOrds ;
1821
1922import java .io .IOException ;
23+ import java .util .function .LongFunction ;
24+ import java .util .function .Supplier ;
2025
2126/**
2227 * Histogram collection logic using skip list.
2328 *
29+ * Currently, it can only handle one owningBucketOrd at a time.
30+ *
2431 * @opensearch.internal
2532 */
2633public class HistogramSkiplistLeafCollector extends LeafBucketCollector {
2734
2835 private final NumericDocValues values ;
2936 private final DocValuesSkipper skipper ;
30- private final Rounding .Prepared preparedRounding ;
31- private final LongKeyedBucketOrds bucketOrds ;
3237 private final LeafBucketCollector sub ;
38+ private final boolean isSubNoOp ;
3339 private final BucketsAggregator aggregator ;
3440
41+ /**
42+ * Supplier function to get the current preparedRounding from the parent aggregator.
43+ * This allows detection of rounding changes in AutoDateHistogramAggregator.
44+ */
45+ private final LongFunction <Rounding .Prepared > preparedRoundingSupplier ;
46+ private final Supplier <LongKeyedBucketOrds > bucketOrdsSupplier ;
47+ private final IncreaseRoundingIfNeeded increaseRoundingIfNeeded ;
48+
3549 /**
3650 * Max doc ID (inclusive) up to which all docs values may map to the same
3751 * bucket.
@@ -48,20 +62,43 @@ public class HistogramSkiplistLeafCollector extends LeafBucketCollector {
4862 */
4963 private long upToBucketIndex ;
5064
65+ /**
66+ * Tracks the last preparedRounding reference to detect rounding changes.
67+ * Used for cache invalidation when AutoDateHistogramAggregator changes rounding.
68+ */
69+ private Rounding .Prepared lastPreparedRounding ;
70+
5171 public HistogramSkiplistLeafCollector (
5272 NumericDocValues values ,
5373 DocValuesSkipper skipper ,
5474 Rounding .Prepared preparedRounding ,
5575 LongKeyedBucketOrds bucketOrds ,
5676 LeafBucketCollector sub ,
5777 BucketsAggregator aggregator
78+ ) {
79+ this (values , skipper , (owningBucketOrd ) -> preparedRounding , () -> bucketOrds , sub , aggregator , (owningBucketOrd , rounded ) -> {});
80+ }
81+
82+ /**
83+ * Constructor that accepts a supplier for dynamic rounding (used by AutoDateHistogramAggregator).
84+ */
85+ public HistogramSkiplistLeafCollector (
86+ NumericDocValues values ,
87+ DocValuesSkipper skipper ,
88+ LongFunction <Rounding .Prepared > preparedRoundingSupplier ,
89+ Supplier <LongKeyedBucketOrds > bucketOrdsSupplier ,
90+ LeafBucketCollector sub ,
91+ BucketsAggregator aggregator ,
92+ IncreaseRoundingIfNeeded increaseRoundingIfNeeded
5893 ) {
5994 this .values = values ;
6095 this .skipper = skipper ;
61- this .preparedRounding = preparedRounding ;
62- this .bucketOrds = bucketOrds ;
96+ this .preparedRoundingSupplier = preparedRoundingSupplier ;
97+ this .bucketOrdsSupplier = bucketOrdsSupplier ;
6398 this .sub = sub ;
99+ this .isSubNoOp = (sub == NO_OP_COLLECTOR );
64100 this .aggregator = aggregator ;
101+ this .increaseRoundingIfNeeded = increaseRoundingIfNeeded ;
65102 }
66103
67104 @ Override
@@ -87,17 +124,20 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
87124
88125 upToInclusive = skipper .maxDocID (0 );
89126
127+ // Get current rounding from supplier
128+ Rounding .Prepared currentRounding = preparedRoundingSupplier .apply (owningBucketOrd );
129+
90130 // Now find the highest level where all docs map to the same bucket.
91131 for (int level = 0 ; level < skipper .numLevels (); ++level ) {
92132 int totalDocsAtLevel = skipper .maxDocID (level ) - skipper .minDocID (level ) + 1 ;
93- long minBucket = preparedRounding .round (skipper .minValue (level ));
94- long maxBucket = preparedRounding .round (skipper .maxValue (level ));
133+ long minBucket = currentRounding .round (skipper .minValue (level ));
134+ long maxBucket = currentRounding .round (skipper .maxValue (level ));
95135
96136 if (skipper .docCount (level ) == totalDocsAtLevel && minBucket == maxBucket ) {
97137 // All docs at this level have a value, and all values map to the same bucket.
98138 upToInclusive = skipper .maxDocID (level );
99139 upToSameBucket = true ;
100- upToBucketIndex = bucketOrds .add (owningBucketOrd , maxBucket );
140+ upToBucketIndex = bucketOrdsSupplier . get () .add (owningBucketOrd , maxBucket );
101141 if (upToBucketIndex < 0 ) {
102142 upToBucketIndex = -1 - upToBucketIndex ;
103143 }
@@ -109,6 +149,16 @@ private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
109149
110150 @ Override
111151 public void collect (int doc , long owningBucketOrd ) throws IOException {
152+ Rounding .Prepared currentRounding = preparedRoundingSupplier .apply (owningBucketOrd );
153+
154+ // Check if rounding changed (using reference equality)
155+ // AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes
156+ if (currentRounding != lastPreparedRounding ) {
157+ upToInclusive = -1 ; // Invalidate
158+ upToSameBucket = false ;
159+ lastPreparedRounding = currentRounding ;
160+ }
161+
112162 if (doc > upToInclusive ) {
113163 advanceSkipper (doc , owningBucketOrd );
114164 }
@@ -118,12 +168,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
118168 sub .collect (doc , upToBucketIndex );
119169 } else if (values .advanceExact (doc )) {
120170 final long value = values .longValue ();
121- long bucketIndex = bucketOrds .add (owningBucketOrd , preparedRounding .round (value ));
171+ long rounded = currentRounding .round (value );
172+ long bucketIndex = bucketOrdsSupplier .get ().add (owningBucketOrd , rounded );
122173 if (bucketIndex < 0 ) {
123174 bucketIndex = -1 - bucketIndex ;
124175 aggregator .collectExistingBucket (sub , doc , bucketIndex );
125176 } else {
126177 aggregator .collectBucket (sub , doc , bucketIndex );
178+ increaseRoundingIfNeeded .accept (owningBucketOrd , rounded );
127179 }
128180 }
129181 }
@@ -136,15 +188,14 @@ public void collect(DocIdStream stream) throws IOException {
136188
137189 @ Override
138190 public void collect (DocIdStream stream , long owningBucketOrd ) throws IOException {
139- // This will only be called if its the sub aggregation
140191 for (;;) {
141192 int upToExclusive = upToInclusive + 1 ;
142193 if (upToExclusive < 0 ) { // overflow
143194 upToExclusive = Integer .MAX_VALUE ;
144195 }
145196
146197 if (upToSameBucket ) {
147- if (sub == NO_OP_COLLECTOR ) {
198+ if (isSubNoOp ) {
148199 // stream.count maybe faster when we don't need to handle sub-aggs
149200 long count = stream .count (upToExclusive );
150201 aggregator .incrementBucketDocCount (upToBucketIndex , count );
@@ -167,4 +218,30 @@ public void collect(DocIdStream stream, long owningBucketOrd) throws IOException
167218 }
168219 }
169220 }
221+
222+ /**
223+ * Call back for auto date histogram
224+ *
225+ * @opensearch.internal
226+ */
227+ public interface IncreaseRoundingIfNeeded {
228+ void accept (long owningBucket , long rounded );
229+ }
230+
231+ /**
232+ * Skiplist is based as top level agg (null parent) or parent that will execute in sorted order
233+ *
234+ */
235+ public static boolean canUseSkiplist (LongBounds hardBounds , Aggregator parent , DocValuesSkipper skipper , NumericDocValues singleton ) {
236+ if (skipper == null || singleton == null ) return false ;
237+ // TODO: add hard bounds support
238+ if (hardBounds != null ) return false ;
239+
240+ if (parent == null ) return true ;
241+
242+ if (parent instanceof AggregatorBase base ) {
243+ return base .getLeafCollectorMode () == AggregatorBase .LeafCollectionMode .FILTER_REWRITE ;
244+ }
245+ return false ;
246+ }
170247}
0 commit comments