Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ public final InternalAggregation reduce(List<InternalAggregation> aggregations,

public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);

/**
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
* false, it should return itself if asked to lead a reduction
*/
public boolean isMapped() {
return true;
}

/**
* Get the value of specified path in the aggregation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,6 +38,15 @@
public final class InternalAggregations extends Aggregations implements Streamable {

public static final InternalAggregations EMPTY = new InternalAggregations();
private static final Comparator<InternalAggregation> INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {
if (agg1.isMapped() == agg2.isMapped()) {
return 0;
} else if (agg1.isMapped() && agg2.isMapped() == false) {
return -1;
} else {
return 1;
}
};

private InternalAggregations() {
}
Expand Down Expand Up @@ -73,6 +83,9 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
List<InternalAggregation> reducedAggregations = new ArrayList<>();
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue();
// Sort aggregations so that unmapped aggs come last in the list
// If all aggs are unmapped, the agg that leads the reduction will just return itself
aggregations.sort(INTERNAL_AGG_COMPARATOR);
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public String getWriteableName() {

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation agg : aggregations) {
if (!(agg instanceof UnmappedSampler)) {
return agg.reduce(aggregations, reduceContext);
}
}
return this;
return new UnmappedSampler(name, pipelineAggregators(), metaData);
}

@Override
public boolean isMapped() {
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedS
*/
protected abstract static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
private Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations,
DocValueFormat format) {
DocValueFormat format) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
}
}

public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
Map<String, Object> metaData) {
super(name, requiredSize, minDocCount, pipelineAggregators, metaData);
}

Expand Down Expand Up @@ -100,12 +100,12 @@ protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, Li

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation aggregation : aggregations) {
if (!(aggregation instanceof UnmappedSignificantTerms)) {
return aggregation.reduce(aggregations, reduceContext);
}
}
return this;
return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData);
}

@Override
public boolean isMapped() {
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.BucketOrder;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -95,12 +95,12 @@ protected UnmappedTerms create(String name, List<Bucket> buckets, long docCountE

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation agg : aggregations) {
if (!(agg instanceof UnmappedTerms)) {
return agg.reduce(aggregations, reduceContext);
}
}
return this;
return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData);
}

@Override
public boolean isMapped() {
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.ArrayList;
Expand Down Expand Up @@ -475,4 +486,56 @@ public void testNested() throws Exception {
assertThat(maxBucketValue.value(), equalTo(maxTermsValue));
assertThat(maxBucketValue.keys(), equalTo(maxTermsKeys.toArray(new String[maxTermsKeys.size()])));
}

/**
* https://github.com/elastic/elasticsearch/issues/33514
*
* This bug manifests as the max_bucket agg ("peak") being added to the response twice, because
* the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps.
* The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms
* delegated reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then
* execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values.
*
* Applies to any pipeline agg, not just max.
*/
public void testFieldIsntWrittenOutTwice() throws Exception {
// you need to add an additional index with no fields in order to trigger this (or potentially a shard)
// so that there is an UnmappedTerms in the list to reduce.
createIndex("foo_1");

XContentBuilder builder = jsonBuilder().startObject().startObject("properties")
.startObject("@timestamp").field("type", "date").endObject()
.startObject("license").startObject("properties")
.startObject("count").field("type", "long").endObject()
.startObject("partnumber").field("type", "text").startObject("fields").startObject("keyword")
.field("type", "keyword").field("ignore_above", 256)
.endObject().endObject().endObject()
.endObject().endObject().endObject().endObject();
assertAcked(client().admin().indices().prepareCreate("foo_2")
.addMapping("doc", builder).get());

XContentBuilder docBuilder = jsonBuilder().startObject()
.startObject("license").field("partnumber", "foobar").field("count", 2).endObject()
.field("@timestamp", "2018-07-08T08:07:00.599Z")
.endObject();

client().prepareIndex("foo_2", "doc").setSource(docBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

client().admin().indices().prepareRefresh();

TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber")
.field("license.partnumber.keyword");
MaxBucketPipelineAggregationBuilder peakPipelineAggBuilder =
PipelineAggregatorBuilders.maxBucket("peak", "licenses_per_day>total_licenses");
SumAggregationBuilder sumAggBuilder = AggregationBuilders.sum("total_licenses").field("license.count");
DateHistogramAggregationBuilder licensePerDayBuilder =
AggregationBuilders.dateHistogram("licenses_per_day").field("@timestamp").dateHistogramInterval(DateHistogramInterval.DAY);
licensePerDayBuilder.subAggregation(sumAggBuilder);
groupByLicenseAgg.subAggregation(licensePerDayBuilder);
groupByLicenseAgg.subAggregation(peakPipelineAggBuilder);

SearchResponse response = client().prepareSearch("foo_*").setSize(0).addAggregation(groupByLicenseAgg).get();
BytesReference bytes = XContentHelper.toXContent(response, XContentType.JSON, false);
XContentHelper.convertToMap(bytes, false, XContentType.JSON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,36 +86,36 @@
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedMax;
import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedMin;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.ParsedMax;
import org.elasticsearch.search.aggregations.metrics.ParsedMin;
import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.ParsedValueCount;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedValueCount;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
Expand All @@ -134,6 +134,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -153,6 +154,16 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
public static final int DEFAULT_MAX_BUCKETS = 100000;
protected static final double TOLERANCE = 1e-10;

private static final Comparator<InternalAggregation> INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {
if (agg1.isMapped() == agg2.isMapped()) {
return 0;
} else if (agg1.isMapped() && agg2.isMapped() == false) {
return -1;
} else {
return 1;
}
};

private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables());

Expand Down Expand Up @@ -239,6 +250,8 @@ public void testReduceRandom() {
inputs.add(t);
toReduce.add(t);
}
// Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce()
inputs.sort(INTERNAL_AGG_COMPARATOR);
ScriptService mockScriptService = mockScriptService();
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
if (randomBoolean() && toReduce.size() > 1) {
Expand Down