Skip to content

Commit

Permalink
Add support for histogram fields to rate aggregation (elastic#63289)
Browse files Browse the repository at this point in the history
The rate aggregation now supports histogram fields. At the moment only sum
is supported.

Closes elastic#62939
  • Loading branch information
imotov committed Oct 8, 2020
1 parent 0aebb59 commit 4b6565f
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
=== Rate Aggregation

A `rate` metrics aggregation can be used only inside a `date_histogram` and calculates a rate of documents or a field in each
`date_histogram` bucket.
`date_histogram` bucket. The field values can be generated by a provided script or extracted from specific numeric or
<<histogram,histogram fields>> in the documents.

==== Syntax

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,33 @@
*/
package org.elasticsearch.xpack.analytics.rate;

import org.apache.lucene.index.LeafReaderContext;
import java.io.IOException;
import java.util.Map;

import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.histogram.SizedBucketAggregator;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Map;

public class RateAggregator extends NumericMetricsAggregator.SingleValue {
public abstract class AbstractRateAggregator extends NumericMetricsAggregator.SingleValue {

private final ValuesSource.Numeric valuesSource;
protected final ValuesSource valuesSource;
private final DocValueFormat format;
private final Rounding.DateTimeUnit rateUnit;
private final SizedBucketAggregator sizedBucketAggregator;

private DoubleArray sums;
private DoubleArray compensations;
protected DoubleArray sums;
protected DoubleArray compensations;

public RateAggregator(
public AbstractRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
Expand All @@ -46,7 +40,7 @@ public RateAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
this.valuesSource = valuesSourceConfig.getValuesSource();
this.format = valuesSourceConfig.format();
if (valuesSource != null) {
sums = context.bigArrays().newDoubleArray(1, true);
Expand Down Expand Up @@ -75,38 +69,6 @@ public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (sizedBucketAggregator == null || valuesSource == null || owningBucketOrd >= sums.size()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.rate;

import java.io.IOException;
import java.util.Map;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;

public class HistogramRateAggregator extends AbstractRateAggregator {
public HistogramRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
final BigArrays bigArrays = context.bigArrays();
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while (sketch.next()) {
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
kahanSummation.add(sketch.value());
compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.rate;

import java.io.IOException;
import java.util.Map;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;

public class NumericRateAggregator extends AbstractRateAggregator {
public NumericRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
final BigArrays bigArrays = context.bigArrays();
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Map;
import java.util.Objects;

public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, RateAggregationBuilder> {
public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, RateAggregationBuilder> {
public static final String NAME = "rate";
public static final ParseField UNIT_FIELD = new ParseField("unit");
public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

package org.elasticsearch.xpack.analytics.rate;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.index.query.QueryShardContext;
Expand All @@ -20,9 +24,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;

class RateAggregatorFactory extends ValuesSourceAggregatorFactory {

Expand All @@ -44,15 +46,21 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(
RateAggregationBuilder.REGISTRY_KEY,
Arrays.asList(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN),
RateAggregator::new,
Collections.singletonList(CoreValuesSourceType.NUMERIC),
NumericRateAggregator::new,
true
);
builder.register(
RateAggregationBuilder.REGISTRY_KEY,
Collections.singletonList(AnalyticsValuesSourceType.HISTOGRAM),
HistogramRateAggregator::new,
true
);
}

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
return new RateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
return new AbstractRateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) {
return LeafBucketCollector.NO_OP_COLLECTOR;
Expand Down
Loading

0 comments on commit 4b6565f

Please sign in to comment.