diff --git a/docs/reference/aggregations/metrics/sum-aggregation.asciidoc b/docs/reference/aggregations/metrics/sum-aggregation.asciidoc index 0f843f94c40d8..e355bdcfff18d 100644 --- a/docs/reference/aggregations/metrics/sum-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/sum-aggregation.asciidoc @@ -1,7 +1,9 @@ [[search-aggregations-metrics-sum-aggregation]] === Sum Aggregation -A `single-value` metrics aggregation that sums up numeric values that are extracted from the aggregated documents. These values can be extracted either from specific numeric fields in the documents, or be generated by a provided script. +A `single-value` metrics aggregation that sums up numeric values that are extracted from the aggregated documents. +These values can be extracted either from specific numeric or <> fields in the documents, +or be generated by a provided script. Assuming the data consists of documents representing sales records we can sum the sale price of all hats with: @@ -30,9 +32,9 @@ Resulting in: -------------------------------------------------- { ... - "aggregations": { - "hat_prices": { - "value": 450.0 + "aggregations" : { + "hat_prices" : { + "value" : 450.0 } } } @@ -157,3 +159,55 @@ POST /sales/_search?size=0 } -------------------------------------------------- // TEST[setup:sales] + +[[search-aggregations-metrics-sum-aggregation-histogram-fields]] +==== Histogram fields + +When the sums are computed on <>, the result of the aggregation is the sum of all elements in the `values` +array multiplied by the number in the same position in the `counts` array. + +For example, if we have the following index that stores pre-aggregated histograms with latency metrics for different networks: + +[source,console] +-------------------------------------------------- +PUT metrics_index/_doc/1 +{ + "network.name" : "net-1", + "latency_histo" : { + "values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1> + "counts" : [3, 7, 23, 12, 6] <2> + } +} + +PUT metrics_index/_doc/2 +{ + "network.name" : "net-2", + "latency_histo" : { + "values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1> + "counts" : [8, 17, 8, 7, 6] <2> + } +} + +POST /metrics_index/_search?size=0 +{ + "aggs" : { + "total_latency" : { "sum" : { "field" : "latency_histo" } } + } +} +-------------------------------------------------- + +For each histogram field the sum aggregation will multiply each number in the `values` array <1> multiplied with its associated count +in the `counts` array <2>. Eventually, it will add all values for all histograms and return the following result: + +[source,console-result] +-------------------------------------------------- +{ + ... + "aggregations" : { + "total_latency" : { + "value" : 28.8 + } + } +} +-------------------------------------------------- +// TESTRESPONSE[skip:test not setup] diff --git a/docs/reference/mapping/types/histogram.asciidoc b/docs/reference/mapping/types/histogram.asciidoc index 6d1dbe49b131d..3d3abe6e570a5 100644 --- a/docs/reference/mapping/types/histogram.asciidoc +++ b/docs/reference/mapping/types/histogram.asciidoc @@ -35,6 +35,7 @@ binary <> and not indexed. Its size in bytes is at most Because the data is not indexed, you only can use `histogram` fields for the following aggregations and queries: +* <> aggregation * <> aggregation * <> aggregation * <> aggregation @@ -73,9 +74,9 @@ The following <> API request creates a new i -------------------------------------------------- PUT my_index { - "mappings": { - "properties": { - "my_histogram": { + "mappings" : { + "properties" : { + "my_histogram" : { "type" : "histogram" }, "my_text" : { @@ -114,6 +115,3 @@ increasing order. For < Count for each bucket. Values in the arrays are treated as integers and must be positive or zero. Negative values will be rejected. The relation between a bucket and a count is given by the position in the array. - - - diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java index 65a201d1e75f7..1a65c577e0da7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java @@ -32,7 +32,7 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue implements Sum { private final double sum; - InternalSum(String name, double sum, DocValueFormat formatter, Map metadata) { + public InternalSum(String name, double sum, DocValueFormat formatter, Map metadata) { super(name, metadata); this.sum = sum; this.format = formatter; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java index 284d8c5ab7682..26bb3d82b1d00 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java @@ -35,7 +35,7 @@ import java.io.IOException; import java.util.Map; -class SumAggregator extends NumericMetricsAggregator.SingleValue { +public class SumAggregator extends NumericMetricsAggregator.SingleValue { private final ValuesSource.Numeric valuesSource; private final DocValueFormat format; diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java index ccb0d2abfad0c..47139062aff7d 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java @@ -31,7 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction; -import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsPercentilesAggregatorFactory; +import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory; import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder; import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; @@ -142,8 +142,11 @@ public Map getMappers() { @Override public List> getAggregationExtentions() { - return Arrays.asList(AnalyticsPercentilesAggregatorFactory::registerPercentilesAggregator, - AnalyticsPercentilesAggregatorFactory::registerPercentileRanksAggregator); + return Arrays.asList( + AnalyticsAggregatorFactory::registerPercentilesAggregator, + AnalyticsAggregatorFactory::registerPercentileRanksAggregator, + AnalyticsAggregatorFactory::registerHistoBackedSumAggregator + ); } @Override diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/AnalyticsPercentilesAggregatorFactory.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/AnalyticsAggregatorFactory.java similarity index 88% rename from x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/AnalyticsPercentilesAggregatorFactory.java rename to x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/AnalyticsAggregatorFactory.java index ed466185d3111..112ede8a10b0d 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/AnalyticsPercentilesAggregatorFactory.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/AnalyticsAggregatorFactory.java @@ -6,15 +6,18 @@ package org.elasticsearch.xpack.analytics.aggregations.metrics; +import org.elasticsearch.search.aggregations.metrics.MetricAggregatorSupplier; import org.elasticsearch.search.aggregations.metrics.PercentileRanksAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.PercentilesAggregatorSupplier; import org.elasticsearch.search.aggregations.metrics.PercentilesConfig; import org.elasticsearch.search.aggregations.metrics.PercentilesMethod; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType; -public class AnalyticsPercentilesAggregatorFactory { +public class AnalyticsAggregatorFactory { + public static void registerPercentilesAggregator(ValuesSourceRegistry.Builder builder) { builder.register(PercentilesAggregationBuilder.NAME, AnalyticsValuesSourceType.HISTOGRAM, @@ -58,4 +61,10 @@ public static void registerPercentileRanksAggregator(ValuesSourceRegistry.Builde "is not compatible with Histogram field"); }); } + + public static void registerHistoBackedSumAggregator(ValuesSourceRegistry.Builder builder) { + builder.register(SumAggregationBuilder.NAME, + AnalyticsValuesSourceType.HISTOGRAM, + (MetricAggregatorSupplier) HistoBackedSumAggregator::new); + } } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistoBackedSumAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistoBackedSumAggregator.java new file mode 100644 index 0000000000000..08617dc1ae899 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistoBackedSumAggregator.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.analytics.aggregations.metrics; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.index.fielddata.HistogramValue; +import org.elasticsearch.index.fielddata.HistogramValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.CompensatedSum; +import org.elasticsearch.search.aggregations.metrics.InternalSum; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource; + +import java.io.IOException; +import java.util.Map; + +/** + * Sum aggregator operating over histogram datatypes {@link HistogramValuesSource} + */ +class HistoBackedSumAggregator extends NumericMetricsAggregator.SingleValue { + + private final ValuesSource valuesSource; + private final DocValueFormat format; + + private DoubleArray sums; + private DoubleArray compensations; + + HistoBackedSumAggregator(String name, ValuesSource valuesSource, DocValueFormat formatter, SearchContext context, + Aggregator parent, Map metadata) throws IOException { + super(name, context, parent, metadata); + this.valuesSource = valuesSource; + this.format = formatter; + if (valuesSource != null) { + sums = context.bigArrays().newDoubleArray(1, true); + compensations = context.bigArrays().newDoubleArray(1, true); + } + } + + @Override + public ScoreMode scoreMode() { + return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final BigArrays bigArrays = context.bigArrays(); + final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx); + + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final HistogramValue sketch = values.histogram(); + final double sum = sums.get(bucket); + final double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + while (sketch.next()) { + double d = sketch.value() * sketch.count(); + kahanSummation.add(d); + } + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; + } + + @Override + public double metric(long owningBucketOrd) { + if (valuesSource == null || owningBucketOrd >= sums.size()) { + return 0.0; + } + return sums.get(owningBucketOrd); + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= sums.size()) { + return buildEmptyAggregation(); + } + return new InternalSum(name, sums.get(bucket), format, metadata()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalSum(name, 0.0, format, metadata()); + } + + @Override + public void doClose() { + Releasables.close(sums, compensations); + } +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistoBackedSumAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistoBackedSumAggregatorTests.java new file mode 100644 index 0000000000000..7a262ffb13afe --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistoBackedSumAggregatorTests.java @@ -0,0 +1,180 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.analytics.aggregations.metrics; + +import com.tdunning.math.stats.Centroid; +import com.tdunning.math.stats.TDigest; +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.metrics.InternalSum; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xpack.analytics.AnalyticsPlugin; +import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType; +import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singleton; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; + +public class HistoBackedSumAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "field"; + + public void testNoDocs() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + // Intentionally not writing any docs + }, sum -> { + assertEquals(0L, sum.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(sum)); + }); + } + + public void testNoMatchingField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(getDocValue("wrong_field", new double[] {3, 1.2, 10}))); + iw.addDocument(singleton(getDocValue("wrong_field", new double[] {5.3, 6, 20}))); + }, sum -> { + assertEquals(0L, sum.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(sum)); + }); + } + + public void testSimpleHistogram() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))); + iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {5.3, 6, 6, 20}))); + iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90}))); + }, sum -> { + assertEquals(132.51d, sum.getValue(), 0.01d); + assertTrue(AggregationInspectionHelper.hasValue(sum)); + }); + } + + public void testQueryFiltering() throws IOException { + testCase(new TermQuery(new Term("match", "yes")), iw -> { + iw.addDocument(Arrays.asList( + new StringField("match", "yes", Field.Store.NO), + getDocValue(FIELD_NAME, new double[] {3, 1.2, 10})) + ); + iw.addDocument(Arrays.asList( + new StringField("match", "yes", Field.Store.NO), + getDocValue(FIELD_NAME, new double[] {5.3, 6, 20})) + ); + iw.addDocument(Arrays.asList( + new StringField("match", "no", Field.Store.NO), + getDocValue(FIELD_NAME, new double[] {3, 1.2, 10})) + ); + iw.addDocument(Arrays.asList( + new StringField("match", "no", Field.Store.NO), + getDocValue(FIELD_NAME, new double[] {3, 1.2, 10})) + ); + iw.addDocument(Arrays.asList( + new StringField("match", "yes", Field.Store.NO), + getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90})) + ); + }, sum -> { + assertEquals(126.51d, sum.getValue(), 0.01d); + assertTrue(AggregationInspectionHelper.hasValue(sum)); + }); + } + + private void testCase(Query query, + CheckedConsumer indexer, + Consumer verify) throws IOException { + testCase(query, sum("_name").field(FIELD_NAME), indexer, verify, singleton(defaultFieldType(FIELD_NAME))); + } + + private void testCase(Query query, + SumAggregationBuilder aggregationBuilder, + CheckedConsumer indexer, + Consumer verify, + Collection fieldTypes) throws IOException { + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + indexer.accept(indexWriter); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + final MappedFieldType[] fieldTypesArray = fieldTypes.toArray(new MappedFieldType[0]); + final InternalSum internalSum = search(indexSearcher, query, aggregationBuilder, fieldTypesArray); + verify.accept(internalSum); + } + } + } + + private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException { + TDigest histogram = new TDigestState(100.0); //default + for (double value : values) { + histogram.add(value); + } + BytesStreamOutput streamOutput = new BytesStreamOutput(); + histogram.compress(); + Collection centroids = histogram.centroids(); + Iterator iterator = centroids.iterator(); + while ( iterator.hasNext()) { + Centroid centroid = iterator.next(); + streamOutput.writeVInt(centroid.count()); + streamOutput.writeDouble(centroid.mean()); + } + return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef()); + } + + @Override + protected List getSearchPlugins() { + return Arrays.asList(new AnalyticsPlugin(Settings.EMPTY)); + } + + @Override + protected List getSupportedValuesSourceTypes() { + // Note: this is the same list as Core, plus Analytics + return Arrays.asList( + CoreValuesSourceType.NUMERIC, + CoreValuesSourceType.DATE, + CoreValuesSourceType.BOOLEAN, + AnalyticsValuesSourceType.HISTOGRAM + ); + } + + @Override + protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { + return new SumAggregationBuilder("_name").field(fieldName); + } + + private MappedFieldType defaultFieldType(String fieldName) { + MappedFieldType fieldType = new HistogramFieldMapper.Builder("field").fieldType(); + fieldType.setName("field"); + return fieldType; + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/histogram.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/histogram.yml new file mode 100644 index 0000000000000..1e277da16351c --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/histogram.yml @@ -0,0 +1,40 @@ +setup: + - skip: + features: headers + - do: + indices.create: + index: "test" + body: + mappings: + properties: + latency: + type: "histogram" + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + bulk: + index: test + refresh: true + body: + - '{"index": {}}' + - '{"latency": {"values" : [0.1, 0.2, 0.3, 0.4, 0.5], "counts" : [3, 7, 23, 12, 6]}}' + - '{"index": {}}' + - '{"latency": {"values" : [0.1, 0.2, 0.3, 0.4, 0.5], "counts" : [2, 5, 10, 1, 8]}}' + +--- +"Histogram Sum Aggregation": + + - do: + search: + index: "test" + body: + size: 0 + aggs: + histo_sum: + sum: + field: "latency" + + - match: { hits.total.value: 2 } + - match: { aggregations.histo_sum.value: 25 } + +