diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dbc434d1d18a..fc52b4493706f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643)) - Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933)) - Allow collectors take advantage of preaggregated data using collectRange API ([#20009](https://github.com/opensearch-project/OpenSearch/pull/20009)) +- Bulk collection logic for metrics and cardinality aggregations ([#20067](https://github.com/opensearch-project/OpenSearch/pull/20067)) - Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) - Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 32cf8953c1542..1031a0370e57d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -134,38 +134,64 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { - counts = bigArrays.grow(counts, bucket + 1); - sums = bigArrays.grow(sums, bucket + 1); - compensations = bigArrays.grow(compensations, bucket + 1); - if (values.advanceExact(doc)) { - final int valueCount = values.docValueCount(); + int valueCount = values.docValueCount(); + setKahanSummation(bucket); counts.increment(bucket, valueCount); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - - kahanSummation.reset(sum, compensation); - for (int i = 0; i < valueCount; i++) { double value = values.nextValue(); kahanSummation.add(value); } - sums.set(bucket, kahanSummation.value()); compensations.set(bucket, kahanSummation.delta()); } } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + public void collect(DocIdStream stream, long bucket) throws IOException { + setKahanSummation(bucket); + final int[] count = { 0 }; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + int valueCount = values.docValueCount(); + count[0] += valueCount; + for (int i = 0; i < valueCount; i++) { + kahanSummation.add(values.nextValue()); + } + } + }); + counts.increment(bucket, count[0]); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + setKahanSummation(0); + int count = 0; + for (int docId = min; docId < max; docId++) { + if (values.advanceExact(docId)) { + int valueCount = values.docValueCount(); + count += valueCount; + for (int i = 0; i < valueCount; i++) { + kahanSummation.add(values.nextValue()); + } + } + } + counts.increment(0, count); + sums.set(0, kahanSummation.value()); + compensations.set(0, kahanSummation.delta()); + } + + private void setKahanSummation(long bucket) { + counts = bigArrays.grow(counts, bucket + 1); + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 8cb21c5b6effb..5e1617001c6ac 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -579,12 +579,34 @@ public static long memoryOverhead(long maxOrd) { @Override public void collect(int doc, long bucketOrd) throws IOException { - visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1); - BitArray bits = visitedOrds.get(bucketOrd); + collect(doc, getBitArray(bucketOrd)); + } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + final BitArray bits = getBitArray(owningBucketOrd); + stream.forEach((doc) -> collect(doc, bits)); + } + + @Override + public void collectRange(int minDoc, int maxDoc) throws IOException { + final BitArray bits = getBitArray(0); + for (int doc = minDoc; doc < maxDoc; ++doc) { + collect(doc, bits); + } + } + + private BitArray getBitArray(long bucket) { + visitedOrds = bigArrays.grow(visitedOrds, bucket + 1); + BitArray bits = visitedOrds.get(bucket); if (bits == null) { bits = new BitArray(maxOrd, bigArrays); - visitedOrds.set(bucketOrd, bits); + visitedOrds.set(bucket, bits); } + return bits; + } + + private void collect(final int doc, final BitArray bits) throws IOException { if (values.advanceExact(doc)) { int count = values.docValueCount(); long ord; @@ -594,16 +616,6 @@ public void collect(int doc, long bucketOrd) throws IOException { } } - @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); - } - - @Override - public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); - } - @Override public void postCollect() throws IOException { try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java index 4d6d6d880da2e..bae482c9c0726 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CompensatedSum.java @@ -112,6 +112,26 @@ public CompensatedSum add(double value, double delta) { return this; } + /** + * Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors. + */ + public void add(double[] values, int count) { + // If the value is Inf or NaN, just add it to the running tally to "convert" to + // Inf/NaN. This keeps the behavior bwc from before kahan summing + double sum = value; + double c = delta; // Compensation for lost low-order bits + + for (int i = 0; i < count; i++) { + double y = values[i] - c; + double t = sum + y; + c = (t - sum) - y; // Calculate the lost part + sum = t; + } + + this.value = sum; + this.delta = c; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 8475a7509e6e0..8a656d768cee2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); return new LeafBucketCollectorBase(sub, allValues) { - @Override public void collect(int doc, long bucket) throws IOException { - if (bucket >= maxes.size()) { - long from = maxes.size(); - maxes = bigArrays.grow(maxes, bucket + 1); - maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); - } + growMaxes(bucket); if (values.advanceExact(doc)) { final double value = values.doubleValue(); double max = maxes.get(bucket); @@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + public void collect(DocIdStream stream, long bucket) throws IOException { + growMaxes(bucket); + final double[] max = { maxes.get(bucket) }; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + max[0] = Math.max(max[0], values.doubleValue()); + } + }); + maxes.set(bucket, max[0]); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + growMaxes(0); + double maximum = maxes.get(0); + for (int doc = min; doc < max; doc++) { + if (values.advanceExact(doc)) { + maximum = Math.max(maximum, values.doubleValue()); + } + } + maxes.set(0, maximum); + } + + private void growMaxes(long bucket) { + if (bucket >= maxes.size()) { + long from = maxes.size(); + maxes = bigArrays.grow(maxes, bucket + 1); + maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); + } } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index eb3bc0bd4ee34..cb4b530b5bda2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -157,14 +157,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); return new LeafBucketCollectorBase(sub, allValues) { - @Override public void collect(int doc, long bucket) throws IOException { - if (bucket >= mins.size()) { - long from = mins.size(); - mins = bigArrays.grow(mins, bucket + 1); - mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); - } + growMins(bucket); if (values.advanceExact(doc)) { final double value = values.doubleValue(); double min = mins.get(bucket); @@ -174,13 +169,35 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + public void collect(DocIdStream stream, long bucket) throws IOException { + growMins(bucket); + final double[] min = { mins.get(bucket) }; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + min[0] = Math.min(min[0], values.doubleValue()); + } + }); + mins.set(bucket, min[0]); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + growMins(0); + double minimum = mins.get(0); + for (int doc = min; doc < max; doc++) { + if (values.advanceExact(doc)) { + minimum = Math.min(minimum, values.doubleValue()); + } + } + mins.set(0, minimum); + } + + private void growMins(long bucket) { + if (bucket >= mins.size()) { + long from = mins.size(); + mins = bigArrays.grow(mins, bucket + 1); + mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); + } } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java index dbfa4641b1733..98fc5cc4d6d42 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/StatsAggregator.java @@ -107,28 +107,13 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { - if (bucket >= counts.size()) { - final long from = counts.size(); - final long overSize = BigArrays.overSize(bucket + 1); - counts = bigArrays.resize(counts, overSize); - sums = bigArrays.resize(sums, overSize); - compensations = bigArrays.resize(compensations, overSize); - mins = bigArrays.resize(mins, overSize); - maxes = bigArrays.resize(maxes, overSize); - mins.fill(from, overSize, Double.POSITIVE_INFINITY); - maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); - } + growStats(bucket); if (values.advanceExact(doc)) { final int valuesCount = values.docValueCount(); counts.increment(bucket, valuesCount); double min = mins.get(bucket); double max = maxes.get(bucket); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); @@ -144,13 +129,73 @@ public void collect(int doc, long bucket) throws IOException { } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + public void collect(DocIdStream stream, long bucket) throws IOException { + growStats(bucket); + + double[] min = { mins.get(bucket) }; + double[] max = { maxes.get(bucket) }; + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + counts.increment(bucket, valuesCount); + + for (int i = 0; i < valuesCount; i++) { + double value = values.nextValue(); + kahanSummation.add(value); + min[0] = Math.min(min[0], value); + max[0] = Math.max(max[0], value); + } + } + }); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); + mins.set(bucket, min[0]); + maxes.set(bucket, max[0]); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + growStats(0); + + double minimum = mins.get(0); + double maximum = maxes.get(0); + for (int doc = min; doc < maximum; doc++) { + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + counts.increment(0, valuesCount); + + for (int i = 0; i < valuesCount; i++) { + double value = values.nextValue(); + kahanSummation.add(value); + minimum = Math.min(minimum, value); + maximum = Math.max(maximum, value); + } + } + } + sums.set(0, kahanSummation.value()); + compensations.set(0, kahanSummation.delta()); + mins.set(0, minimum); + maxes.set(0, maximum); + } + + private void growStats(long bucket) { + if (bucket >= counts.size()) { + final long from = counts.size(); + final long overSize = BigArrays.overSize(bucket + 1); + counts = bigArrays.resize(counts, overSize); + sums = bigArrays.resize(sums, overSize); + compensations = bigArrays.resize(compensations, overSize); + mins = bigArrays.resize(mins, overSize); + maxes = bigArrays.resize(maxes, overSize); + mins.fill(from, overSize, Double.POSITIVE_INFINITY); + maxes.fill(from, overSize, Double.NEGATIVE_INFINITY); + } + + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index bf450388a14da..29228afb8ce8e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -123,35 +123,53 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { - sums = bigArrays.grow(sums, bucket + 1); - compensations = bigArrays.grow(compensations, bucket + 1); - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); - - for (int i = 0; i < valuesCount; i++) { + setKahanSummation(bucket); + for (int i = 0; i < values.docValueCount(); i++) { double value = values.nextValue(); kahanSummation.add(value); } - compensations.set(bucket, kahanSummation.delta()); sums.set(bucket, kahanSummation.value()); } } @Override - public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + public void collect(DocIdStream stream, long bucket) throws IOException { + setKahanSummation(bucket); + stream.forEach((doc) -> { + if (values.advanceExact(doc)) { + for (int i = 0; i < values.docValueCount(); i++) { + kahanSummation.add(values.nextValue()); + } + } + }); + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + setKahanSummation(0); + for (int docId = min; docId < max; docId++) { + if (values.advanceExact(docId)) { + for (int i = 0; i < values.docValueCount(); i++) { + kahanSummation.add(values.nextValue()); + } + } + } + sums.set(0, kahanSummation.value()); + compensations.set(0, kahanSummation.delta()); + } + + private void setKahanSummation(long bucket) { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); } }; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index be5add530b406..c59fadee03633 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -9,6 +9,7 @@ package org.opensearch.search.aggregations.bucket.filterrewrite; import org.apache.lucene.document.Field; +import org.apache.lucene.document.KeywordField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DirectoryReader; @@ -25,6 +26,7 @@ import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.ParseContext; import org.opensearch.search.aggregations.AggregationBuilder; @@ -41,7 +43,12 @@ import org.opensearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.opensearch.search.aggregations.bucket.range.InternalRange; import org.opensearch.search.aggregations.bucket.range.RangeAggregationBuilder; +import org.opensearch.search.aggregations.metrics.InternalAvg; +import org.opensearch.search.aggregations.metrics.InternalCardinality; +import org.opensearch.search.aggregations.metrics.InternalMax; +import org.opensearch.search.aggregations.metrics.InternalMin; import org.opensearch.search.aggregations.metrics.InternalStats; +import org.opensearch.search.aggregations.metrics.InternalSum; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.internal.SearchContext; @@ -59,27 +66,34 @@ public class FilterRewriteSubAggTests extends AggregatorTestCase { private final String longFieldName = "metric"; private final String dateFieldName = "timestamp"; + private final String nameFieldName = "name"; private final Query matchAllQuery = new MatchAllDocsQuery(); private final NumberFieldMapper.NumberFieldType longFieldType = new NumberFieldMapper.NumberFieldType( longFieldName, NumberFieldMapper.NumberType.LONG ); private final DateFieldMapper.DateFieldType dateFieldType = aggregableDateFieldType(false, true); + private final KeywordFieldMapper.KeywordFieldType nameFieldType = new KeywordFieldMapper.KeywordFieldType(nameFieldName); private final NumberFieldMapper.NumberType numberType = longFieldType.numberType(); private final String rangeAggName = "range"; private final String autoDateAggName = "auto"; private final String dateAggName = "date"; private final String statsAggName = "stats"; + private final String avgAggName = "avg"; + private final String sumAggName = "sum"; + private final String minAggName = "min"; + private final String maxAggName = "max"; + private final String cardinalityAggName = "cardinality"; private final List DEFAULT_DATA = List.of( - new TestDoc(0, Instant.parse("2020-03-01T00:00:00Z")), - new TestDoc(1, Instant.parse("2020-03-01T00:00:00Z")), - new TestDoc(1, Instant.parse("2020-03-01T00:00:01Z")), - new TestDoc(2, Instant.parse("2020-03-01T01:00:00Z")), - new TestDoc(3, Instant.parse("2020-03-01T02:00:00Z")), - new TestDoc(4, Instant.parse("2020-03-01T03:00:00Z")), - new TestDoc(4, Instant.parse("2020-03-01T04:00:00Z"), true), - new TestDoc(5, Instant.parse("2020-03-01T04:00:00Z")), - new TestDoc(6, Instant.parse("2020-03-01T04:00:00Z")) + new TestDoc(0, Instant.parse("2020-03-01T00:00:00Z"), "abc"), + new TestDoc(1, Instant.parse("2020-03-01T00:00:00Z"), "def"), + new TestDoc(1, Instant.parse("2020-03-01T00:00:01Z"), "ghi"), + new TestDoc(2, Instant.parse("2020-03-01T01:00:00Z"), "jkl"), + new TestDoc(3, Instant.parse("2020-03-01T02:00:00Z"), "jkl"), + new TestDoc(4, Instant.parse("2020-03-01T03:00:00Z"), "mno"), + new TestDoc(4, Instant.parse("2020-03-01T04:00:00Z"), "prq", true), + new TestDoc(5, Instant.parse("2020-03-01T04:00:00Z"), "stu"), + new TestDoc(6, Instant.parse("2020-03-01T04:00:00Z"), "stu") ); public void testRange() throws IOException { @@ -111,6 +125,151 @@ public void testRange() throws IOException { assertEquals(3, thirdAuto.getBuckets().size()); } + public void testRangeWithAvgAndSum() throws IOException { + // Test for sum metric aggregation + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.sum(sumAggName).field(longFieldName)); + + InternalRange result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalSum firstSum = firstBucket.getAggregations().get(sumAggName); + assertEquals(2, firstSum.getValue(), 0); + + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalSum secondSum = secondBucket.getAggregations().get(sumAggName); + assertEquals(5, secondSum.getValue(), 0); + + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalSum thirdSum = thirdBucket.getAggregations().get(sumAggName); + assertEquals(9, thirdSum.getValue(), 0); + + // Test for average metric aggregation now + rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.avg(avgAggName).field(longFieldName)); + + result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalAvg firstAvg = firstBucket.getAggregations().get(avgAggName); + assertEquals(1, firstAvg.getValue(), 0); + + secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalAvg secondAvg = secondBucket.getAggregations().get(avgAggName); + assertEquals(2.5, secondAvg.getValue(), 0); + + thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalAvg thirdAvg = thirdBucket.getAggregations().get(avgAggName); + assertEquals(4.5, thirdAvg.getValue(), 0); + } + + public void testRangeWithMinAndMax() throws IOException { + // Test for min metric aggregation + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.min(minAggName).field(longFieldName)); + + InternalRange result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalMin firstMin = firstBucket.getAggregations().get(minAggName); + assertEquals(1, firstMin.getValue(), 0); + + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalMin secondMin = secondBucket.getAggregations().get(minAggName); + assertEquals(2, secondMin.getValue(), 0); + + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalMin thirdMin = thirdBucket.getAggregations().get(minAggName); + assertEquals(4, thirdMin.getValue(), 0); + + // Test for max metric aggregation now + rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.max(maxAggName).field(longFieldName)); + + result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalMax firstMax = firstBucket.getAggregations().get(maxAggName); + assertEquals(1, firstMax.getValue(), 0); + + secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalMax secondMax = secondBucket.getAggregations().get(maxAggName); + assertEquals(3, secondMax.getValue(), 0); + + thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalMax thirdMax = thirdBucket.getAggregations().get(maxAggName); + assertEquals(5, thirdMax.getValue(), 0); + } + + public void testRangeWithCard() throws IOException { + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation(AggregationBuilders.cardinality(cardinalityAggName).field(nameFieldName).executionHint("ordinals")); + + InternalRange result = executeAggregation(DEFAULT_DATA, rangeAggregationBuilder, true); + + // Verify results + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(2, firstBucket.getDocCount()); + InternalCardinality firstCardinality = firstBucket.getAggregations().get(cardinalityAggName); + assertEquals(2, firstCardinality.getValue(), 0); + + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(2, secondBucket.getDocCount()); + InternalCardinality secondCardinality = secondBucket.getAggregations().get(cardinalityAggName); + assertEquals(1, secondCardinality.getValue(), 0); + + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.getDocCount()); + InternalCardinality thirdCardinality = thirdBucket.getAggregations().get(cardinalityAggName); + assertEquals(2, thirdCardinality.getValue(), 0); + } + public void testDateHisto() throws IOException { DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder(dateAggName).field( dateFieldName @@ -129,6 +288,7 @@ public void testDateHisto() throws IOException { assertEquals(3, firstStats.getCount()); assertEquals(1, firstStats.getMax(), 0); assertEquals(0, firstStats.getMin(), 0); + assertEquals(2, firstStats.getSum(), 0); InternalDateHistogram.Bucket secondBucket = buckets.get(1); assertEquals("2020-03-01T01:00:00.000Z", secondBucket.getKeyAsString()); @@ -137,6 +297,7 @@ public void testDateHisto() throws IOException { assertEquals(1, secondStats.getCount()); assertEquals(2, secondStats.getMax(), 0); assertEquals(2, secondStats.getMin(), 0); + assertEquals(2, secondStats.getSum(), 0); InternalDateHistogram.Bucket thirdBucket = buckets.get(2); assertEquals("2020-03-01T02:00:00.000Z", thirdBucket.getKeyAsString()); @@ -145,6 +306,7 @@ public void testDateHisto() throws IOException { assertEquals(1, thirdStats.getCount()); assertEquals(3, thirdStats.getMax(), 0); assertEquals(3, thirdStats.getMin(), 0); + assertEquals(3, thirdStats.getSum(), 0); InternalDateHistogram.Bucket fourthBucket = buckets.get(3); assertEquals("2020-03-01T03:00:00.000Z", fourthBucket.getKeyAsString()); @@ -153,6 +315,7 @@ public void testDateHisto() throws IOException { assertEquals(1, fourthStats.getCount()); assertEquals(4, fourthStats.getMax(), 0); assertEquals(4, fourthStats.getMin(), 0); + assertEquals(4, fourthStats.getSum(), 0); InternalDateHistogram.Bucket fifthBucket = buckets.get(4); assertEquals("2020-03-01T04:00:00.000Z", fifthBucket.getKeyAsString()); @@ -161,6 +324,7 @@ public void testDateHisto() throws IOException { assertEquals(2, fifthStats.getCount()); assertEquals(6, fifthStats.getMax(), 0); assertEquals(5, fifthStats.getMin(), 0); + assertEquals(11, fifthStats.getSum(), 0); } public void testAutoDateHisto() throws IOException { @@ -389,7 +553,8 @@ private IA executeAggregationOnReader( matchAllQuery, bucketConsumer, longFieldType, - dateFieldType + dateFieldType, + nameFieldType ); Aggregator aggregator = createAggregator(aggregationBuilder, searchContext); CountingAggregator countingAggregator = new CountingAggregator(new AtomicInteger(), aggregator); @@ -441,15 +606,21 @@ private InternalAggregation.ReduceContext createReduceContext( private class TestDoc { private final long metric; private final Instant timestamp; + private final String name; private final boolean deleted; public TestDoc(long metric, Instant timestamp) { - this(metric, timestamp, false); + this(metric, timestamp, "abc", false); + } + + public TestDoc(long metric, Instant timestamp, String name) { + this(metric, timestamp, name, false); } - public TestDoc(long metric, Instant timestamp, boolean deleted) { + public TestDoc(long metric, Instant timestamp, String name, boolean deleted) { this.metric = metric; this.timestamp = timestamp; + this.name = name; this.deleted = deleted; } @@ -460,6 +631,7 @@ public ParseContext.Document toDocument() { for (Field fld : fieldList) doc.add(fld); doc.add(new LongField(dateFieldName, dateFieldType.parse(timestamp.toString()), Field.Store.NO)); + doc.add(new KeywordField(nameFieldName, name, Field.Store.NO)); return doc; }