-
Notifications
You must be signed in to change notification settings - Fork 25k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Histogram field type support for Sum aggregation (#55681)
Implements Sum aggregation over Histogram fields by summing the value of each bucket multiplied by their count as requested in #53285
- Loading branch information
Showing
13 changed files
with
440 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
115 changes: 115 additions & 0 deletions
115
...java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistoBackedSumAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.analytics.aggregations.metrics; | ||
|
||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.search.ScoreMode; | ||
import org.elasticsearch.common.lease.Releasables; | ||
import org.elasticsearch.common.util.BigArrays; | ||
import org.elasticsearch.common.util.DoubleArray; | ||
import org.elasticsearch.index.fielddata.HistogramValue; | ||
import org.elasticsearch.index.fielddata.HistogramValues; | ||
import org.elasticsearch.search.DocValueFormat; | ||
import org.elasticsearch.search.aggregations.Aggregator; | ||
import org.elasticsearch.search.aggregations.InternalAggregation; | ||
import org.elasticsearch.search.aggregations.LeafBucketCollector; | ||
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; | ||
import org.elasticsearch.search.aggregations.metrics.CompensatedSum; | ||
import org.elasticsearch.search.aggregations.metrics.InternalSum; | ||
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; | ||
import org.elasticsearch.search.aggregations.support.ValuesSource; | ||
import org.elasticsearch.search.internal.SearchContext; | ||
import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
|
||
/** | ||
* Sum aggregator operating over histogram datatypes {@link HistogramValuesSource} | ||
*/ | ||
class HistoBackedSumAggregator extends NumericMetricsAggregator.SingleValue { | ||
|
||
private final ValuesSource valuesSource; | ||
private final DocValueFormat format; | ||
|
||
private DoubleArray sums; | ||
private DoubleArray compensations; | ||
|
||
HistoBackedSumAggregator(String name, ValuesSource valuesSource, DocValueFormat formatter, SearchContext context, | ||
Aggregator parent, Map<String, Object> metadata) throws IOException { | ||
super(name, context, parent, metadata); | ||
this.valuesSource = valuesSource; | ||
this.format = formatter; | ||
if (valuesSource != null) { | ||
sums = context.bigArrays().newDoubleArray(1, true); | ||
compensations = context.bigArrays().newDoubleArray(1, true); | ||
} | ||
} | ||
|
||
@Override | ||
public ScoreMode scoreMode() { | ||
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; | ||
} | ||
|
||
@Override | ||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, | ||
final LeafBucketCollector sub) throws IOException { | ||
if (valuesSource == null) { | ||
return LeafBucketCollector.NO_OP_COLLECTOR; | ||
} | ||
final BigArrays bigArrays = context.bigArrays(); | ||
final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx); | ||
|
||
final CompensatedSum kahanSummation = new CompensatedSum(0, 0); | ||
return new LeafBucketCollectorBase(sub, values) { | ||
@Override | ||
public void collect(int doc, long bucket) throws IOException { | ||
sums = bigArrays.grow(sums, bucket + 1); | ||
compensations = bigArrays.grow(compensations, bucket + 1); | ||
|
||
if (values.advanceExact(doc)) { | ||
final HistogramValue sketch = values.histogram(); | ||
final double sum = sums.get(bucket); | ||
final double compensation = compensations.get(bucket); | ||
kahanSummation.reset(sum, compensation); | ||
while (sketch.next()) { | ||
double d = sketch.value() * sketch.count(); | ||
kahanSummation.add(d); | ||
} | ||
|
||
compensations.set(bucket, kahanSummation.delta()); | ||
sums.set(bucket, kahanSummation.value()); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public double metric(long owningBucketOrd) { | ||
if (valuesSource == null || owningBucketOrd >= sums.size()) { | ||
return 0.0; | ||
} | ||
return sums.get(owningBucketOrd); | ||
} | ||
|
||
@Override | ||
public InternalAggregation buildAggregation(long bucket) { | ||
if (valuesSource == null || bucket >= sums.size()) { | ||
return buildEmptyAggregation(); | ||
} | ||
return new InternalSum(name, sums.get(bucket), format, metadata()); | ||
} | ||
|
||
@Override | ||
public InternalAggregation buildEmptyAggregation() { | ||
return new InternalSum(name, 0.0, format, metadata()); | ||
} | ||
|
||
@Override | ||
public void doClose() { | ||
Releasables.close(sums, compensations); | ||
} | ||
} |
Oops, something went wrong.