Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Histogram field type support for ValueCount and Avg aggregations #55933

Merged
merged 7 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions docs/reference/aggregations/metrics/avg-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,55 @@ POST /exams/_search?size=0
// TEST[setup:exams]

<1> Documents without a value in the `grade` field will fall into the same bucket as documents that have the value `10`.


[[search-aggregations-metrics-avg-aggregation-histogram-fields]]
==== Histogram fields
When averages are computed on <<histogram,histogram fields>>, the result of the aggregation is the weighted average
of all elements in the `values` array taking into consideration the number in the same position in the `counts` array.

For example, if we have the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [3, 7, 23, 12, 6] <2>
}
}

PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [8, 17, 8, 7, 6] <2>
}
}

POST /metrics_index/_search?size=0
{
"aggs" : {
"avg_latency" : { "avg" : { "field" : "latency_histo" } }
}
}
--------------------------------------------------

For each histogram field the `avg` aggregation will multiply each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will compute the average over those values for all histograms and return the following result:

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"avg_latency" : {
"value" : 0.29690721649
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ POST /metrics_index/_search?size=0
}
--------------------------------------------------

For each histogram field the sum aggregation will multiply each number in the `values` array <1> multiplied with its associated count
For each histogram field the `sum` aggregation will multiply each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will add all values for all histograms and return the following result:

[source,console-result]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,54 @@ POST /sales/_search?size=0
NOTE:: Because `value_count` is designed to work with any field it internally treats all values as simple bytes.
Due to this implementation, if `_value` script variable is used to fetch a value instead of accessing the field
directly (e.g. a "value script"), the field value will be returned as a string instead of it's native format.

[[search-aggregations-metrics-valuecount-aggregation-histogram-fields]]
==== Histogram fields
When the `value_count` aggregation is computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all numbers
in the `counts` array of the histogram.

For example, if we have the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [3, 7, 23, 12, 6] <1>
}
}

PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [8, 17, 8, 7, 6] <1>
}
}

POST /metrics_index/_search?size=0
{
"aggs" : {
"total_requests" : { "value_count" : { "field" : "latency_histo" } }
}
}
--------------------------------------------------

For each histogram field the `value_count` aggregation will sum all numbers in the `counts` array <1>.
Eventually, it will add all values for all histograms and return the following result:

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"total_requests" : {
"value" : 97
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]
2 changes: 2 additions & 0 deletions docs/reference/mapping/types/histogram.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Because the data is not indexed, you only can use `histogram` fields for the
following aggregations and queries:

* <<search-aggregations-metrics-sum-aggregation-histogram-fields,sum>> aggregation
* <<search-aggregations-metrics-valuecount-aggregation-histogram-fields,value_count>> aggregation
* <<search-aggregations-metrics-avg-aggregation-histogram-fields,avg>> aggregation
* <<search-aggregations-metrics-percentile-aggregation,percentiles>> aggregation
* <<search-aggregations-metrics-percentile-rank-aggregation,percentile ranks>> aggregation
* <<search-aggregations-metrics-boxplot-aggregation,boxplot>> aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class InternalValueCount extends InternalNumericMetricsAggregation.SingleValue implements ValueCount {
private final long value;

InternalValueCount(String name, long value, Map<String, Object> metadata) {
public InternalValueCount(String name, long value, Map<String, Object> metadata) {
super(name, metadata);
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return List.of(
AnalyticsAggregatorFactory::registerPercentilesAggregator,
AnalyticsAggregatorFactory::registerPercentileRanksAggregator,
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator,
AnalyticsAggregatorFactory::registerHistoBackedValueCountAggregator,
AnalyticsAggregatorFactory::registerHistoBackedAverageAggregator
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.aggregations.metrics;

import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MetricAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.PercentileRanksAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.PercentilesConfig;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

public class AnalyticsAggregatorFactory {

Expand Down Expand Up @@ -65,6 +68,24 @@ public static void registerPercentileRanksAggregator(ValuesSourceRegistry.Builde
public static void registerHistoBackedSumAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(SumAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) HistoBackedSumAggregator::new);
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedSumAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@not-napoleon and I've been favoring changing the ctor so you can just to (MetricAggregatorSupplier) HistoBackedSumAggregator::new. It is less "big".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MetricAggregatorSupplier) HistoBackedSumAggregator::new is definitely better looking and I would prefer it as well.

The problem is that MetricAggregatorSupplier passes a ValuesSource argument

Aggregator build(String name,
                     ValuesSource valuesSource,
                     DocValueFormat format,
                     SearchContext context,
                     Aggregator parent,
                     Map<String, Object> metadata) throws IOException;

while HistoBackedSumAggregator constructor expects a HistogramValuesSource.Histogram

HistoBackedSumAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
            Aggregator parent, Map<String, Object> metadata) throws IOException 

So I had 3 options:

  1. Keep the ValuesSource parameter in the HistoBackedSumAggregator constructor and do a type cast later when I would use the HistogramValuesSource.Histogram object.
  2. Create a separate HistoBackedMetricAggregatorSupplier that would match the arguments of the HistoBackedSumAggregator
  3. Do the cast of ValuesSource to HistogramValuesSource.Histogram in the registrar method changing the admittedly prettier (MetricAggregatorSupplier) HistoBackedSumAggregator::new to the longer lambda expression.

I chose the third option because it seemed cleaner to do the type cast at the registrar method where it is obvious that the VS is a histogram. On the other hand, I didn't want to create yet another AggregatorSupplier sub-class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been favoring having generic ValuesSource in the supplier and doing the cast in the aggregator constructor instead. Really the suppliers shouldn't care about the ValuesSource subclass, since the whole idea is to be able to add new ones in, and having to subclass a supplier to do that is a lot of boiler plate. I know I haven't been perfectly consistent about that so far, but I'm working on cleaning it up as I can.

);
}

public static void registerHistoBackedValueCountAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(ValueCountAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(ValueCountAggregatorSupplier) (name, valuesSource, context, parent, metadata) ->
new HistoBackedValueCountAggregator(name, (HistogramValuesSource.Histogram) valuesSource, context, parent, metadata)
);
}

public static void registerHistoBackedAverageAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(AvgAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedAvgAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

import java.io.IOException;
import java.util.Map;

/**
* Average aggregator operating over histogram datatypes {@link HistogramValuesSource}
* The aggregation computes weighted average by taking counts into consideration for each value
*/
class HistoBackedAvgAggregator extends NumericMetricsAggregator.SingleValue {

private final HistogramValuesSource.Histogram valuesSource;

LongArray counts;
DoubleArray sums;
DoubleArray compensations;
DocValueFormat format;

HistoBackedAvgAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
this.format = formatter;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
counts = bigArrays.newLongArray(1, true);
sums = bigArrays.newDoubleArray(1, true);
compensations = bigArrays.newDoubleArray(1, true);
}
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = valuesSource.getHistogramValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();

// Compute the sum of double values with Kahan summation algorithm which is more accurate than naive summation
final double sum = sums.get(bucket);
final double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
while (sketch.next()) {
double d = sketch.value() * sketch.count();
kahanSummation.add(d);
counts.increment(bucket, sketch.count());
}

sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= sums.size()) {
return Double.NaN;
}
return sums.get(owningBucketOrd) / counts.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= sums.size()) {
return buildEmptyAggregation();
}
return new InternalAvg(name, sums.get(bucket), counts.get(bucket), format, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalAvg(name, 0.0, 0L, format, metadata());
}

@Override
public void doClose() {
Releasables.close(counts, sums, compensations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

Expand All @@ -29,16 +28,19 @@

/**
* Sum aggregator operating over histogram datatypes {@link HistogramValuesSource}
*
* The aggregator sums each histogram value multiplied by its count.
* Eg for a histogram of response times, this is an approximate "total time spent".
*/
class HistoBackedSumAggregator extends NumericMetricsAggregator.SingleValue {

private final ValuesSource valuesSource;
private final HistogramValuesSource.Histogram valuesSource;
private final DocValueFormat format;

private DoubleArray sums;
private DoubleArray compensations;

HistoBackedSumAggregator(String name, ValuesSource valuesSource, DocValueFormat formatter, SearchContext context,
HistoBackedSumAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
Expand All @@ -61,7 +63,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx);
final HistogramValues values = valuesSource.getHistogramValues(ctx);

final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
Expand Down
Loading