Skip to content

Commit b3f98d9

Browse files
Zachary TongZachary Tong
authored andcommitted
Prefer mapped aggs to lead reductions (#33528)
Previously, unmapped aggs try to delegate reduction to a sibling agg that is mapped. That delegated agg will run the reductions, and also reduce any pipeline aggs. But because delegation comes before running pipelines, the unmapped agg _also_ tries to run pipeline aggs. This causes the pipeline to run twice, and potentially double it's output in buckets which can create invalid JSON (e.g. same key multiple times) and break when converting to maps. This fixes by sorting the list of aggregations ahead of time so that mapped aggs appear first, meaning they preferentially lead the reduction. If all aggs are unmapped, the first unmapped agg simply creates a new unmapped object and returns that for the reduction. This means that unmapped aggs no longer defer and there is no chance for a secondary execution of pipelines (or other side effects caused by deferring execution). Closes #33514
1 parent 18992d3 commit b3f98d9

File tree

7 files changed

+118
-21
lines changed

7 files changed

+118
-21
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,14 @@ public final InternalAggregation reduce(List<InternalAggregation> aggregations,
143143

144144
public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
145145

146+
/**
147+
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
148+
* false, it should return itself if asked to lead a reduction
149+
*/
150+
public boolean isMapped() {
151+
return true;
152+
}
153+
146154
/**
147155
* Get the value of specified path in the aggregation.
148156
*

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.io.IOException;
2727
import java.util.ArrayList;
28+
import java.util.Comparator;
2829
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Map;
@@ -37,6 +38,15 @@
3738
public final class InternalAggregations extends Aggregations implements Streamable {
3839

3940
public static final InternalAggregations EMPTY = new InternalAggregations();
41+
private static final Comparator<InternalAggregation> INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {
42+
if (agg1.isMapped() == agg2.isMapped()) {
43+
return 0;
44+
} else if (agg1.isMapped() && agg2.isMapped() == false) {
45+
return -1;
46+
} else {
47+
return 1;
48+
}
49+
};
4050

4151
private InternalAggregations() {
4252
}
@@ -73,6 +83,9 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
7383
List<InternalAggregation> reducedAggregations = new ArrayList<>();
7484
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
7585
List<InternalAggregation> aggregations = entry.getValue();
86+
// Sort aggregations so that unmapped aggs come last in the list
87+
// If all aggs are unmapped, the agg that leads the reduction will just return itself
88+
aggregations.sort(INTERNAL_AGG_COMPARATOR);
7689
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
7790
reducedAggregations.add(first.reduce(aggregations, context));
7891
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ public String getWriteableName() {
5050

5151
@Override
5252
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
53-
for (InternalAggregation agg : aggregations) {
54-
if (!(agg instanceof UnmappedSampler)) {
55-
return agg.reduce(aggregations, reduceContext);
56-
}
57-
}
58-
return this;
53+
return new UnmappedSampler(name, pipelineAggregators(), metaData);
54+
}
55+
56+
@Override
57+
public boolean isMapped() {
58+
return false;
5959
}
6060

6161
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedS
5151
*/
5252
protected abstract static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
5353
private Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations,
54-
DocValueFormat format) {
54+
DocValueFormat format) {
5555
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
5656
}
5757
}
5858

5959
public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
60-
Map<String, Object> metaData) {
60+
Map<String, Object> metaData) {
6161
super(name, requiredSize, minDocCount, pipelineAggregators, metaData);
6262
}
6363

@@ -100,12 +100,12 @@ protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, Li
100100

101101
@Override
102102
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
103-
for (InternalAggregation aggregation : aggregations) {
104-
if (!(aggregation instanceof UnmappedSignificantTerms)) {
105-
return aggregation.reduce(aggregations, reduceContext);
106-
}
107-
}
108-
return this;
103+
return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData);
104+
}
105+
106+
@Override
107+
public boolean isMapped() {
108+
return false;
109109
}
110110

111111
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.elasticsearch.common.io.stream.StreamOutput;
2323
import org.elasticsearch.common.xcontent.XContentBuilder;
2424
import org.elasticsearch.search.DocValueFormat;
25+
import org.elasticsearch.search.aggregations.BucketOrder;
2526
import org.elasticsearch.search.aggregations.InternalAggregation;
2627
import org.elasticsearch.search.aggregations.InternalAggregations;
2728
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
28-
import org.elasticsearch.search.aggregations.BucketOrder;
2929

3030
import java.io.IOException;
3131
import java.util.Collections;
@@ -95,12 +95,12 @@ protected UnmappedTerms create(String name, List<Bucket> buckets, long docCountE
9595

9696
@Override
9797
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
98-
for (InternalAggregation agg : aggregations) {
99-
if (!(agg instanceof UnmappedTerms)) {
100-
return agg.reduce(aggregations, reduceContext);
101-
}
102-
}
103-
return this;
98+
return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData);
99+
}
100+
101+
@Override
102+
public boolean isMapped() {
103+
return false;
104104
}
105105

106106
@Override

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,26 @@
2121

2222
import org.elasticsearch.action.index.IndexRequestBuilder;
2323
import org.elasticsearch.action.search.SearchResponse;
24+
import org.elasticsearch.action.support.WriteRequest;
25+
import org.elasticsearch.common.bytes.BytesReference;
26+
import org.elasticsearch.common.xcontent.XContentBuilder;
27+
import org.elasticsearch.common.xcontent.XContentHelper;
28+
import org.elasticsearch.common.xcontent.XContentType;
29+
import org.elasticsearch.search.aggregations.AggregationBuilders;
2430
import org.elasticsearch.search.aggregations.BucketOrder;
2531
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
32+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
33+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
2634
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
2735
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
2836
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
2937
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
38+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
3039
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
40+
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
3141
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3242
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue;
43+
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder;
3344
import org.elasticsearch.test.ESIntegTestCase;
3445

3546
import java.util.ArrayList;
@@ -475,4 +486,56 @@ public void testNested() throws Exception {
475486
assertThat(maxBucketValue.value(), equalTo(maxTermsValue));
476487
assertThat(maxBucketValue.keys(), equalTo(maxTermsKeys.toArray(new String[maxTermsKeys.size()])));
477488
}
489+
490+
/**
491+
* https://github.com/elastic/elasticsearch/issues/33514
492+
*
493+
* This bug manifests as the max_bucket agg ("peak") being added to the response twice, because
494+
* the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps.
495+
* The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms
496+
* delegated reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then
497+
* execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values.
498+
*
499+
* Applies to any pipeline agg, not just max.
500+
*/
501+
public void testFieldIsntWrittenOutTwice() throws Exception {
502+
// you need to add an additional index with no fields in order to trigger this (or potentially a shard)
503+
// so that there is an UnmappedTerms in the list to reduce.
504+
createIndex("foo_1");
505+
506+
XContentBuilder builder = jsonBuilder().startObject().startObject("properties")
507+
.startObject("@timestamp").field("type", "date").endObject()
508+
.startObject("license").startObject("properties")
509+
.startObject("count").field("type", "long").endObject()
510+
.startObject("partnumber").field("type", "text").startObject("fields").startObject("keyword")
511+
.field("type", "keyword").field("ignore_above", 256)
512+
.endObject().endObject().endObject()
513+
.endObject().endObject().endObject().endObject();
514+
assertAcked(client().admin().indices().prepareCreate("foo_2")
515+
.addMapping("doc", builder).get());
516+
517+
XContentBuilder docBuilder = jsonBuilder().startObject()
518+
.startObject("license").field("partnumber", "foobar").field("count", 2).endObject()
519+
.field("@timestamp", "2018-07-08T08:07:00.599Z")
520+
.endObject();
521+
522+
client().prepareIndex("foo_2", "doc").setSource(docBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
523+
524+
client().admin().indices().prepareRefresh();
525+
526+
TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber")
527+
.field("license.partnumber.keyword");
528+
MaxBucketPipelineAggregationBuilder peakPipelineAggBuilder =
529+
PipelineAggregatorBuilders.maxBucket("peak", "licenses_per_day>total_licenses");
530+
SumAggregationBuilder sumAggBuilder = AggregationBuilders.sum("total_licenses").field("license.count");
531+
DateHistogramAggregationBuilder licensePerDayBuilder =
532+
AggregationBuilders.dateHistogram("licenses_per_day").field("@timestamp").dateHistogramInterval(DateHistogramInterval.DAY);
533+
licensePerDayBuilder.subAggregation(sumAggBuilder);
534+
groupByLicenseAgg.subAggregation(licensePerDayBuilder);
535+
groupByLicenseAgg.subAggregation(peakPipelineAggBuilder);
536+
537+
SearchResponse response = client().prepareSearch("foo_*").setSize(0).addAggregation(groupByLicenseAgg).get();
538+
BytesReference bytes = XContentHelper.toXContent(response, XContentType.JSON, false);
539+
XContentHelper.convertToMap(bytes, false, XContentType.JSON);
540+
}
478541
}

test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
import java.io.IOException;
135135
import java.util.ArrayList;
136136
import java.util.Collections;
137+
import java.util.Comparator;
137138
import java.util.HashMap;
138139
import java.util.List;
139140
import java.util.Map;
@@ -153,6 +154,16 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
153154
public static final int DEFAULT_MAX_BUCKETS = 100000;
154155
protected static final double TOLERANCE = 1e-10;
155156

157+
private static final Comparator<InternalAggregation> INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {
158+
if (agg1.isMapped() == agg2.isMapped()) {
159+
return 0;
160+
} else if (agg1.isMapped() && agg2.isMapped() == false) {
161+
return -1;
162+
} else {
163+
return 1;
164+
}
165+
};
166+
156167
private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
157168
new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables());
158169

@@ -239,6 +250,8 @@ public void testReduceRandom() {
239250
inputs.add(t);
240251
toReduce.add(t);
241252
}
253+
// Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce()
254+
inputs.sort(INTERNAL_AGG_COMPARATOR);
242255
ScriptService mockScriptService = mockScriptService();
243256
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
244257
if (randomBoolean() && toReduce.size() > 1) {

0 commit comments

Comments
 (0)