diff --git a/docs/reference/aggregations/metrics/rate-aggregation.asciidoc b/docs/reference/aggregations/metrics/rate-aggregation.asciidoc index 9f6de2562cc64..5d2be2520fa19 100644 --- a/docs/reference/aggregations/metrics/rate-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/rate-aggregation.asciidoc @@ -4,7 +4,8 @@ === Rate Aggregation A `rate` metrics aggregation can be used only inside a `date_histogram` and calculates a rate of documents or a field in each -`date_histogram` bucket. +`date_histogram` bucket. The field values can be generated by a provided script or extracted from specific numeric or +<> in the documents. ==== Syntax diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java similarity index 62% rename from x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregator.java rename to x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java index 5dde711b268be..937e925703066 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java @@ -5,39 +5,33 @@ */ package org.elasticsearch.xpack.analytics.rate; -import org.apache.lucene.index.LeafReaderContext; +import java.io.IOException; +import java.util.Map; + import org.apache.lucene.search.ScoreMode; import org.elasticsearch.common.Rounding; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; -import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.histogram.SizedBucketAggregator; -import org.elasticsearch.search.aggregations.metrics.CompensatedSum; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; -import java.io.IOException; -import java.util.Map; - -public class RateAggregator extends NumericMetricsAggregator.SingleValue { +public abstract class AbstractRateAggregator extends NumericMetricsAggregator.SingleValue { - private final ValuesSource.Numeric valuesSource; + protected final ValuesSource valuesSource; private final DocValueFormat format; private final Rounding.DateTimeUnit rateUnit; private final SizedBucketAggregator sizedBucketAggregator; - private DoubleArray sums; - private DoubleArray compensations; + protected DoubleArray sums; + protected DoubleArray compensations; - public RateAggregator( + public AbstractRateAggregator( String name, ValuesSourceConfig valuesSourceConfig, Rounding.DateTimeUnit rateUnit, @@ -46,7 +40,7 @@ public RateAggregator( Map metadata ) throws IOException { super(name, context, parent, metadata); - this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource(); + this.valuesSource = valuesSourceConfig.getValuesSource(); this.format = valuesSourceConfig.format(); if (valuesSource != null) { sums = context.bigArrays().newDoubleArray(1, true); @@ -75,38 +69,6 @@ public ScoreMode scoreMode() { return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; } - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { - final BigArrays bigArrays = context.bigArrays(); - final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); - final CompensatedSum kahanSummation = new CompensatedSum(0, 0); - - return new LeafBucketCollectorBase(sub, values) { - @Override - public void collect(int doc, long bucket) throws IOException { - sums = bigArrays.grow(sums, bucket + 1); - compensations = bigArrays.grow(compensations, bucket + 1); - - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); - - for (int i = 0; i < valuesCount; i++) { - double value = values.nextValue(); - kahanSummation.add(value); - } - - compensations.set(bucket, kahanSummation.delta()); - sums.set(bucket, kahanSummation.value()); - } - } - }; - } - @Override public double metric(long owningBucketOrd) { if (sizedBucketAggregator == null || valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/HistogramRateAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/HistogramRateAggregator.java new file mode 100644 index 0000000000000..033549a90e0da --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/HistogramRateAggregator.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.rate; + +import java.io.IOException; +import java.util.Map; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.fielddata.HistogramValue; +import org.elasticsearch.index.fielddata.HistogramValues; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.CompensatedSum; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource; + +public class HistogramRateAggregator extends AbstractRateAggregator { + public HistogramRateAggregator( + String name, + ValuesSourceConfig valuesSourceConfig, + Rounding.DateTimeUnit rateUnit, + SearchContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, valuesSourceConfig, rateUnit, context, parent, metadata); + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final HistogramValue sketch = values.histogram(); + while (sketch.next()) { + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + kahanSummation.add(sketch.value()); + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + } + }; + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java new file mode 100644 index 0000000000000..e37c9386b1553 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.rate; + +import java.io.IOException; +import java.util.Map; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.CompensatedSum; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +public class NumericRateAggregator extends AbstractRateAggregator { + public NumericRateAggregator( + String name, + ValuesSourceConfig valuesSourceConfig, + Rounding.DateTimeUnit rateUnit, + SearchContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, valuesSourceConfig, rateUnit, context, parent, metadata); + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + + for (int i = 0; i < valuesCount; i++) { + double value = values.nextValue(); + kahanSummation.add(value); + } + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java index ccdd8afb4071f..1c3cc7e7804c8 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.Objects; -public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly { +public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly { public static final String NAME = "rate"; public static final ParseField UNIT_FIELD = new ParseField("unit"); public static final ValuesSourceRegistry.RegistryKey REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>( diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java index 8decfd09525e0..e9fed04272375 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java @@ -6,6 +6,10 @@ package org.elasticsearch.xpack.analytics.rate; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + import org.apache.lucene.index.LeafReaderContext; import org.elasticsearch.common.Rounding; import org.elasticsearch.index.query.QueryShardContext; @@ -20,9 +24,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.internal.SearchContext; -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; +import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType; class RateAggregatorFactory extends ValuesSourceAggregatorFactory { @@ -44,15 +46,21 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( RateAggregationBuilder.REGISTRY_KEY, - Arrays.asList(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN), - RateAggregator::new, + Collections.singletonList(CoreValuesSourceType.NUMERIC), + NumericRateAggregator::new, + true + ); + builder.register( + RateAggregationBuilder.REGISTRY_KEY, + Collections.singletonList(AnalyticsValuesSourceType.HISTOGRAM), + HistogramRateAggregator::new, true ); } @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { - return new RateAggregator(name, config, rateUnit, searchContext, parent, metadata) { + return new AbstractRateAggregator(name, config, rateUnit, searchContext, parent, metadata) { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) { return LeafBucketCollector.NO_OP_COLLECTOR; diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java index f8eef2fb595cc..0f719cd1b4ff1 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java @@ -6,6 +6,22 @@ package org.elasticsearch.xpack.analytics.rate; +import static org.elasticsearch.xpack.analytics.AnalyticsTestsUtils.histogramFieldDocValues; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; + import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; @@ -40,21 +56,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.lookup.LeafDocLookup; import org.elasticsearch.xpack.analytics.AnalyticsPlugin; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Function; - -import static org.hamcrest.Matchers.closeTo; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; +import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper; public class RateAggregatorTests extends AggregatorTestCase { @@ -386,6 +388,67 @@ public void testFormatter() throws IOException { }, dateType, numType); } + public void testHistogramFieldMonthToMonth() throws IOException { + MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap()); + MappedFieldType dateType = dateFieldType(DATE_FIELD); + RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val"); + + DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD) + .calendarInterval(new DateHistogramInterval("month")) + .subAggregation(rateAggregationBuilder); + + testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 }))); + iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3, 4 }))); + }, (Consumer) dh -> { + assertThat(dh.getBuckets(), hasSize(2)); + assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValue(), closeTo(3.0, 0.000001)); + assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).getValue(), closeTo(7.0, 0.000001)); + }, dateType, histType); + } + + public void testHistogramFieldMonthToYear() throws IOException { + MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap()); + MappedFieldType dateType = dateFieldType(DATE_FIELD); + RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val"); + + DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD) + .calendarInterval(new DateHistogramInterval("year")) + .subAggregation(rateAggregationBuilder); + + testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 }))); + iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3, 4 }))); + }, (Consumer) dh -> { + assertThat(dh.getBuckets(), hasSize(1)); + assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValue(), closeTo(10.0 / 12, 0.000001)); + }, dateType, histType); + } + + public void testFilterWithHistogramField() throws IOException { + MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap()); + MappedFieldType dateType = dateFieldType(DATE_FIELD); + MappedFieldType keywordType = new KeywordFieldMapper.KeywordFieldType("term"); + RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val"); + + DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD) + .calendarInterval(new DateHistogramInterval("month")) + .subAggregation(rateAggregationBuilder); + + testCase(dateHistogramAggregationBuilder, new TermQuery(new Term("term", "a")), iw -> { + iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 }), + new StringField("term", "a", Field.Store.NO))); + iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3 }), + new StringField("term", "a", Field.Store.NO))); + iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 4 }), + new StringField("term", "b", Field.Store.NO))); + }, (Consumer) dh -> { + assertThat(dh.getBuckets(), hasSize(2)); + assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001)); + assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001)); + }, dateType, histType, keywordType); + } + private void testCase( Query query, String interval,