Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Histogram field type support for ValueCount and Avg aggregations #56099

Merged
merged 2 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/reference/aggregations/metrics/avg-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,57 @@ POST /exams/_search?size=0
// TEST[setup:exams]

<1> Documents without a value in the `grade` field will fall into the same bucket as documents that have the value `10`.


[[search-aggregations-metrics-avg-aggregation-histogram-fields]]
==== Histogram fields
When average is computed on <<histogram,histogram fields>>, the result of the aggregation is the weighted average
of all elements in the `values` array taking into consideration the number in the same position in the `counts` array.

For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [3, 7, 23, 12, 6] <2>
}
}

PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5], <1>
"counts" : [8, 17, 8, 7, 6] <2>
}
}

POST /metrics_index/_search?size=0
{
"aggs" : {
"avg_latency" :
{ "avg" : { "field" : "latency_histo" }
}
}
}
--------------------------------------------------

For each histogram field the `avg` aggregation adds each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will compute the average over those values for all histograms and return the following result:

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"avg_latency" : {
"value" : 0.29690721649
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]
6 changes: 3 additions & 3 deletions docs/reference/aggregations/metrics/sum-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ POST /sales/_search?size=0
[[search-aggregations-metrics-sum-aggregation-histogram-fields]]
==== Histogram fields

When the sums are computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all elements in the `values`
When sum is computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all elements in the `values`
array multiplied by the number in the same position in the `counts` array.

For example, if we have the following index that stores pre-aggregated histograms with latency metrics for different networks:
For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
Expand Down Expand Up @@ -196,7 +196,7 @@ POST /metrics_index/_search?size=0
}
--------------------------------------------------

For each histogram field the sum aggregation will multiply each number in the `values` array <1> multiplied with its associated count
For each histogram field the `sum` aggregation will multiply each number in the `values` array <1> multiplied by its associated count
in the `counts` array <2>. Eventually, it will add all values for all histograms and return the following result:

[source,console-result]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ These values can be extracted either from specific fields in the documents, or b
this aggregator will be used in conjunction with other single-value aggregations. For example, when computing the `avg`
one might be interested in the number of values the average is computed over.

`value_count` does not de-duplicate values, so even if a field has duplicates (or a script generates multiple
identical values for a single document), each value will be counted individually.

[source,console]
--------------------------------------------------
POST /sales/_search?size=0
Expand Down Expand Up @@ -77,3 +80,60 @@ POST /sales/_search?size=0
}
--------------------------------------------------
// TEST[setup:sales,stored_example_script]

NOTE:: Because `value_count` is designed to work with any field it internally treats all values as simple bytes.
Due to this implementation, if `_value` script variable is used to fetch a value instead of accessing the field
directly (e.g. a "value script"), the field value will be returned as a string instead of it's native format.

[[search-aggregations-metrics-valuecount-aggregation-histogram-fields]]
==== Histogram fields
When the `value_count` aggregation is computed on <<histogram,histogram fields>>, the result of the aggregation is the sum of all numbers
in the `counts` array of the histogram.

For example, for the following index that stores pre-aggregated histograms with latency metrics for different networks:

[source,console]
--------------------------------------------------
PUT metrics_index/_doc/1
{
"network.name" : "net-1",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [3, 7, 23, 12, 6] <1>
}
}

PUT metrics_index/_doc/2
{
"network.name" : "net-2",
"latency_histo" : {
"values" : [0.1, 0.2, 0.3, 0.4, 0.5],
"counts" : [8, 17, 8, 7, 6] <1>
}
}

POST /metrics_index/_search?size=0
{
"aggs" : {
"total_requests" : {
"value_count" : { "field" : "latency_histo" }
}
}
}
--------------------------------------------------

For each histogram field the `value_count` aggregation will sum all numbers in the `counts` array <1>.
Eventually, it will add all values for all histograms and return the following result:

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"total_requests" : {
"value" : 97
}
}
}
--------------------------------------------------
// TESTRESPONSE[skip:test not setup]
2 changes: 2 additions & 0 deletions docs/reference/mapping/types/histogram.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Because the data is not indexed, you only can use `histogram` fields for the
following aggregations and queries:

* <<search-aggregations-metrics-sum-aggregation-histogram-fields,sum>> aggregation
* <<search-aggregations-metrics-valuecount-aggregation-histogram-fields,value_count>> aggregation
* <<search-aggregations-metrics-avg-aggregation-histogram-fields,avg>> aggregation
* <<search-aggregations-metrics-percentile-aggregation,percentiles>> aggregation
* <<search-aggregations-metrics-percentile-rank-aggregation,percentile ranks>> aggregation
* <<search-aggregations-metrics-boxplot-aggregation,boxplot>> aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class InternalValueCount extends InternalNumericMetricsAggregation.SingleValue implements ValueCount {
private final long value;

InternalValueCount(String name, long value, Map<String, Object> metadata) {
public InternalValueCount(String name, long value, Map<String, Object> metadata) {
super(name, metadata);
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -86,7 +85,7 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {

@Override
public List<AggregationSpec> getAggregations() {
return Arrays.asList(
return org.elasticsearch.common.collect.List.of(
new AggregationSpec(
StringStatsAggregationBuilder.NAME,
StringStatsAggregationBuilder::new,
Expand Down Expand Up @@ -143,10 +142,12 @@ public Map<String, Mapper.TypeParser> getMappers() {

@Override
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return Arrays.asList(
return org.elasticsearch.common.collect.List.of(
AnalyticsAggregatorFactory::registerPercentilesAggregator,
AnalyticsAggregatorFactory::registerPercentileRanksAggregator,
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator
AnalyticsAggregatorFactory::registerHistoBackedSumAggregator,
AnalyticsAggregatorFactory::registerHistoBackedValueCountAggregator,
AnalyticsAggregatorFactory::registerHistoBackedAverageAggregator
);
}

Expand All @@ -160,7 +161,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
return org.elasticsearch.common.collect.List.of(
new NamedWriteableRegistry.Entry(TTestState.class, PairedTTestState.NAME, PairedTTestState::new),
new NamedWriteableRegistry.Entry(TTestState.class, UnpairedTTestState.NAME, UnpairedTTestState::new)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.aggregations.metrics;

import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MetricAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.PercentileRanksAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.PercentilesConfig;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

public class AnalyticsAggregatorFactory {

Expand Down Expand Up @@ -65,6 +68,24 @@ public static void registerPercentileRanksAggregator(ValuesSourceRegistry.Builde
public static void registerHistoBackedSumAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(SumAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) HistoBackedSumAggregator::new);
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedSumAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
);
}

public static void registerHistoBackedValueCountAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(ValueCountAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(ValueCountAggregatorSupplier) (name, valuesSource, context, parent, metadata) ->
new HistoBackedValueCountAggregator(name, (HistogramValuesSource.Histogram) valuesSource, context, parent, metadata)
);
}

public static void registerHistoBackedAverageAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(AvgAggregationBuilder.NAME,
AnalyticsValuesSourceType.HISTOGRAM,
(MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) ->
new HistoBackedAvgAggregator(name, (HistogramValuesSource.Histogram) valuesSource, format, context, parent, metadata)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

import java.io.IOException;
import java.util.Map;

/**
* Average aggregator operating over histogram datatypes {@link HistogramValuesSource}
* The aggregation computes weighted average by taking counts into consideration for each value
*/
class HistoBackedAvgAggregator extends NumericMetricsAggregator.SingleValue {

private final HistogramValuesSource.Histogram valuesSource;

LongArray counts;
DoubleArray sums;
DoubleArray compensations;
DocValueFormat format;

HistoBackedAvgAggregator(String name, HistogramValuesSource.Histogram valuesSource, DocValueFormat formatter, SearchContext context,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
this.format = formatter;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
counts = bigArrays.newLongArray(1, true);
sums = bigArrays.newDoubleArray(1, true);
compensations = bigArrays.newDoubleArray(1, true);
}
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final HistogramValues values = valuesSource.getHistogramValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();

// Compute the sum of double values with Kahan summation algorithm which is more accurate than naive summation
final double sum = sums.get(bucket);
final double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
while (sketch.next()) {
double d = sketch.value() * sketch.count();
kahanSummation.add(d);
counts.increment(bucket, sketch.count());
}

sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= sums.size()) {
return Double.NaN;
}
return sums.get(owningBucketOrd) / counts.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= sums.size()) {
return buildEmptyAggregation();
}
return new InternalAvg(name, sums.get(bucket), counts.get(bucket), format, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalAvg(name, 0.0, 0L, format, metadata());
}

@Override
public void doClose() {
Releasables.close(counts, sums, compensations);
}

}
Loading