Skip to content

Commit

Permalink
Histogram field type support for ValueCount and Avg aggregations (#55933
Browse files Browse the repository at this point in the history
)

Implements value_count and avg aggregations over Histogram fields as discussed in #53285

- value_count returns the sum of all counts array of the histograms
- avg computes a weighted average of the values array of the histogram by multiplying each value with its associated element in the counts array
  • Loading branch information
csoulios authored May 4, 2020
1 parent 7fe56ca commit caf6c5a
Show file tree
Hide file tree
Showing 19 changed files with 696 additions and 45 deletions.
54 changes: 54 additions & 0 deletions docs/reference/aggregations/metrics/avg-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,57 @@ POST /exams/_search?size=0
// TEST[setup:exams]

<1> Documents without a value in the `grade` field will fall into the same bucket as documents that have the value `10`.


[[search-aggregations-metrics-avg-aggregation-histogram-fields]]
==== Histogram fields
When average is computed on <<histogram,histogram fields>>, the result of the aggregation is the weighted average
of all elements in the `values` array taking into consideration the number in the same position in the `counts` array.

For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [3, 7, 23, 12, 6] <2>
}
}
PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [8, 17, 8, 7, 6] <2>
}
}
POST /metrics_index/_search?size=0
{
"aggs" : {
"avg_latency" :
{ "avg" : { "field" : "latency_histo" }
}
}
}
--------------------------------------------------

For each histogram field the `avg` aggregation adds each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will compute the average over those values for all histograms and return the following result:

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"avg_latency" : {
"value" : 0.29690721649
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]
6 changes: 3 additions & 3 deletions docs/reference/aggregations/metrics/sum-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ POST /sales/_search?size=0
[[search-aggregations-metrics-sum-aggregation-histogram-fields]]
==== Histogram fields

When the sums are computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all elements in the `values`
When sum is computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all elements in the `values`
array multiplied by the number in the same position in the `counts` array.

For example, if we have the following index that stores pre-aggregated histograms with latency metrics for different networks:
For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
Expand Down Expand Up @@ -196,7 +196,7 @@ POST /metrics_index/_search?size=0
}
--------------------------------------------------

For each histogram field the sum aggregation will multiply each number in the `values` array <1> multiplied with its associated count
For each histogram field the `sum` aggregation will multiply each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will add all values for all histograms and return the following result:

[source,console-result]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,56 @@ POST /sales/_search?size=0
NOTE:: Because `value_count` is designed to work with any field it internally treats all values as simple bytes.
Due to this implementation, if `_value` script variable is used to fetch a value instead of accessing the field
directly (e.g. a "value script"), the field value will be returned as a string instead of it's native format.

[[search-aggregations-metrics-valuecount-aggregation-histogram-fields]]
==== Histogram fields
When the `value_count` aggregation is computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all numbers
in the `counts` array of the histogram.

For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [3, 7, 23, 12, 6] <1>
}
}
PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [8, 17, 8, 7, 6] <1>
}
}
POST /metrics_index/_search?size=0
{
"aggs" : {
"total_requests" : {
"value_count" : { "field" : "latency_histo" }
}
}
}
--------------------------------------------------

For each histogram field the `value_count` aggregation will sum all numbers in the `counts` array <1>.
Eventually, it will add all values for all histograms and return the following result:

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"total_requests" : {
"value" : 97
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]
2 changes: 2 additions & 0 deletions docs/reference/mapping/types/histogram.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Because the data is not indexed, you only can use `histogram` fields for the
following aggregations and queries:

* <<search-aggregations-metrics-sum-aggregation-histogram-fields,sum>> aggregation
* <<search-aggregations-metrics-valuecount-aggregation-histogram-fields,value_count>> aggregation
* <<search-aggregations-metrics-avg-aggregation-histogram-fields,avg>> aggregation
* <<search-aggregations-metrics-percentile-aggregation,percentiles>> aggregation
* <<search-aggregations-metrics-percentile-rank-aggregation,percentile ranks>> aggregation
* <<search-aggregations-metrics-boxplot-aggregation,boxplot>> aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class InternalValueCount extends InternalNumericMetricsAggregation.SingleValue implements ValueCount {
private final long value;

InternalValueCount(String name, long value, Map<String, Object> metadata) {
public InternalValueCount(String name, long value, Map<String, Object> metadata) {
super(name, metadata);
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return List.of(
AnalyticsAggregatorFactory::registerPercentilesAggregator,
AnalyticsAggregatorFactory::registerPercentileRanksAggregator,
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator,
AnalyticsAggregatorFactory::registerHistoBackedValueCountAggregator,
AnalyticsAggregatorFactory::registerHistoBackedAverageAggregator
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.aggregations.metrics;

import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MetricAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.PercentileRanksAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.PercentilesConfig;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

public class AnalyticsAggregatorFactory {

Expand Down Expand Up @@ -65,6 +68,24 @@ public static void registerPercentileRanksAggregator(ValuesSourceRegistry.Builde
public static void registerHistoBackedSumAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(SumAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) HistoBackedSumAggregator::new);
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedSumAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
);
}

public static void registerHistoBackedValueCountAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(ValueCountAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(ValueCountAggregatorSupplier) (name, valuesSource, context, parent, metadata) ->
new HistoBackedValueCountAggregator(name, (HistogramValuesSource.Histogram) valuesSource, context, parent, metadata)
);
}

public static void registerHistoBackedAverageAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(AvgAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedAvgAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

import java.io.IOException;
import java.util.Map;

/**
* Average aggregator operating over histogram datatypes {@link HistogramValuesSource}
* The aggregation computes weighted average by taking counts into consideration for each value
*/
class HistoBackedAvgAggregator extends NumericMetricsAggregator.SingleValue {

private final HistogramValuesSource.Histogram valuesSource;

LongArray counts;
DoubleArray sums;
DoubleArray compensations;
DocValueFormat format;

HistoBackedAvgAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
this.format = formatter;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
counts = bigArrays.newLongArray(1, true);
sums = bigArrays.newDoubleArray(1, true);
compensations = bigArrays.newDoubleArray(1, true);
}
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = valuesSource.getHistogramValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();

// Compute the sum of double values with Kahan summation algorithm which is more accurate than naive summation
final double sum = sums.get(bucket);
final double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
while (sketch.next()) {
double d = sketch.value() * sketch.count();
kahanSummation.add(d);
counts.increment(bucket, sketch.count());
}

sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= sums.size()) {
return Double.NaN;
}
return sums.get(owningBucketOrd) / counts.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= sums.size()) {
return buildEmptyAggregation();
}
return new InternalAvg(name, sums.get(bucket), counts.get(bucket), format, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalAvg(name, 0.0, 0L, format, metadata());
}

@Override
public void doClose() {
Releasables.close(counts, sums, compensations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

Expand All @@ -29,16 +28,19 @@

/**
* Sum aggregator operating over histogram datatypes {@link HistogramValuesSource}
*
* The aggregator sums each histogram value multiplied by its count.
* Eg for a histogram of response times, this is an approximate "total time spent".
*/
class HistoBackedSumAggregator extends NumericMetricsAggregator.SingleValue {

private final ValuesSource valuesSource;
private final HistogramValuesSource.Histogram valuesSource;
private final DocValueFormat format;

private DoubleArray sums;
private DoubleArray compensations;

HistoBackedSumAggregator(String name, ValuesSource valuesSource, DocValueFormat formatter, SearchContext context,
HistoBackedSumAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
Expand All @@ -61,7 +63,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx);
final HistogramValues values = valuesSource.getHistogramValues(ctx);

final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
Expand Down
Loading

0 comments on commit caf6c5a

Please sign in to comment.