Skip to content

Commit

Permalink
Optimise few metric aggregations for single value fields (elastic#107832
Browse files Browse the repository at this point in the history
)

This commit adds two new abstractions for NumericMetricsAggregator which expects implementation for 
LeafCollectors for single and multi-value fields.
  • Loading branch information
iverase authored Apr 25, 2024
1 parent 0719c90 commit c0b023c
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 178 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/107832.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 107832
summary: Optimise few metric aggregations for single value fields
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,24 @@
package org.elasticsearch.search.aggregations.metrics;

import org.HdrHistogram.DoubleHistogram;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;

import java.io.IOException;
import java.util.Map;

abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator.MultiValue {
abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator.MultiDoubleValue {

protected final double[] keys;
protected final ValuesSource valuesSource;
protected final DocValueFormat format;
protected ObjectArray<DoubleHistogram> states;
protected final int numberOfSignificantValueDigits;
Expand All @@ -46,9 +43,8 @@ abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator
DocValueFormat formatter,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
super(name, config, context, parent, metadata);
assert config.hasValues();
this.valuesSource = config.getValuesSource();
this.keyed = keyed;
this.format = formatter;
this.states = context.bigArrays().newObjectArray(1);
Expand All @@ -57,26 +53,31 @@ abstract class AbstractHDRPercentilesAggregator extends NumericMetricsAggregator
}

@Override
public ScoreMode scoreMode() {
return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(aggCtx.getLeafReaderContext());
protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
DoubleHistogram state = getExistingOrNewHistogram(bigArrays(), bucket);
final int valueCount = values.docValueCount();
for (int i = 0; i < valueCount; i++) {
final DoubleHistogram state = getExistingOrNewHistogram(bigArrays(), bucket);
for (int i = 0; i < values.docValueCount(); i++) {
state.recordValue(values.nextValue());
}
}
}
};
}

@Override
protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
final DoubleHistogram state = getExistingOrNewHistogram(bigArrays(), bucket);
state.recordValue(values.doubleValue());
}
}
};
}

private DoubleHistogram getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,24 @@

package org.elasticsearch.search.aggregations.metrics;

import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;

import java.io.IOException;
import java.util.Map;

abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggregator.MultiValue {
abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggregator.MultiDoubleValue {

protected final double[] keys;
protected final ValuesSource valuesSource;
protected final DocValueFormat formatter;
protected ObjectArray<TDigestState> states;
protected final double compression;
Expand All @@ -47,9 +44,8 @@ abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggreg
DocValueFormat formatter,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
super(name, config, context, parent, metadata);
assert config.hasValues();
this.valuesSource = config.getValuesSource();
this.keyed = keyed;
this.formatter = formatter;
this.states = context.bigArrays().newObjectArray(1);
Expand All @@ -59,22 +55,28 @@ abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggreg
}

@Override
public ScoreMode scoreMode() {
return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
final TDigestState state = getExistingOrNewHistogram(bigArrays(), bucket);
for (int i = 0; i < values.docValueCount(); i++) {
state.add(values.nextValue());
}
}
}
};
}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(aggCtx.getLeafReaderContext());
protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
TDigestState state = getExistingOrNewHistogram(bigArrays(), bucket);
final int valueCount = values.docValueCount();
for (int i = 0; i < valueCount; i++) {
state.add(values.nextValue());
}
final TDigestState state = getExistingOrNewHistogram(bigArrays(), bucket);
state.add(values.doubleValue());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,24 @@
*/
package org.elasticsearch.search.aggregations.metrics;

import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;

import java.io.IOException;
import java.util.Map;

class AvgAggregator extends NumericMetricsAggregator.SingleValue {

final ValuesSource.Numeric valuesSource;
class AvgAggregator extends NumericMetricsAggregator.SingleDoubleValue {

LongArray counts;
DoubleArray sums;
Expand All @@ -42,9 +38,8 @@ class AvgAggregator extends NumericMetricsAggregator.SingleValue {
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
super(name, valuesSourceConfig, context, parent, metadata);
assert valuesSourceConfig.hasValues();
this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
this.format = valuesSourceConfig.format();
final BigArrays bigArrays = context.bigArrays();
counts = bigArrays.newLongArray(1, true);
Expand All @@ -53,45 +48,56 @@ class AvgAggregator extends NumericMetricsAggregator.SingleValue {
}

@Override
public ScoreMode scoreMode() {
return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
final SortedNumericDoubleValues values = valuesSource.doubleValues(aggCtx.getLeafReaderContext());
protected LeafBucketCollector getLeafCollector(SortedNumericDoubleValues values, final LeafBucketCollector sub) {
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
if (bucket >= counts.size()) {
counts = bigArrays().grow(counts, bucket + 1);
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);
}
maybeGrow(bucket);
final int valueCount = values.docValueCount();
counts.increment(bucket, valueCount);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);

kahanSummation.reset(sum, compensation);

kahanSummation.reset(sums.get(bucket), compensations.get(bucket));
for (int i = 0; i < valueCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
kahanSummation.add(values.nextValue());
}
sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}
}
};
}

@Override
protected LeafBucketCollector getLeafCollector(NumericDoubleValues values, final LeafBucketCollector sub) {
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
if (values.advanceExact(doc)) {
maybeGrow(bucket);
counts.increment(bucket, 1L);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
kahanSummation.reset(sums.get(bucket), compensations.get(bucket));
kahanSummation.add(values.doubleValue());
sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}
}
};
}

private void maybeGrow(long bucket) {
if (bucket >= counts.size()) {
counts = bigArrays().grow(counts, bucket + 1);
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);
}
}

@Override
public double metric(long owningBucketOrd) {
if (owningBucketOrd >= sums.size()) {
Expand Down
Loading

0 comments on commit c0b023c

Please sign in to comment.