From bdecf2208a1ffc8dd6301f2bcf7705ec0653979c Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 7 Sep 2018 14:05:35 -0400 Subject: [PATCH 1/6] UnmappedTerms should not run pipelines if they delegate reduction UnmappedTerms aggs try to delegate reduction to a sibling object that is not unmapped. That delegated agg will run the reductions, and also reduce any pipeline aggs. But because delegation comes before running pipelines, the UnmappedTerms _also_ tries to run pipeline aggs. This causes the pipeline to run twice, and potentially double it's output in buckets which can create invalid JSON and break when converting to maps. This fixes the issue by toggling a flag in UnmappedTerms if it delegated away reduction so that it knows not to run pipeline aggs either. Closes #33514 --- .../aggregations/InternalAggregation.java | 12 +++- .../bucket/terms/UnmappedTerms.java | 16 ++++- .../aggregations/pipeline/MaxBucketIT.java | 62 +++++++++++++++++++ 3 files changed, 86 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index da328edd7aa4a..31a40f0da8a5b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -134,15 +134,21 @@ public String getName() { public final InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { InternalAggregation aggResult = doReduce(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { - for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - aggResult = pipelineAggregator.reduce(aggResult, reduceContext); - } + aggResult = doPipelineReduce(aggResult, aggregations, reduceContext); } return aggResult; } public abstract InternalAggregation doReduce(List aggregations, ReduceContext reduceContext); + public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, + ReduceContext reduceContext) { + for (PipelineAggregator pipelineAggregator : pipelineAggregators) { + reducedAggregations = pipelineAggregator.reduce(reducedAggregations, reduceContext); + } + return reducedAggregations; + } + /** * Get the value of specified path in the aggregation. * diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 595991dac06dc..7c00e2534df07 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -22,10 +22,10 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.BucketOrder; import java.io.IOException; import java.util.Collections; @@ -40,6 +40,8 @@ public class UnmappedTerms extends InternalTerms { public static final String NAME = "umterms"; + private boolean delegatedReduction = false; + /** * Concrete type that can't be built because Java needs a concrete type so {@link InternalTerms.Bucket} can have a self type but * {@linkplain UnmappedTerms} doesn't ever need to build it because it never returns any buckets. @@ -97,12 +99,24 @@ protected UnmappedTerms create(String name, List buckets, long docCountE public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation agg : aggregations) { if (!(agg instanceof UnmappedTerms)) { + delegatedReduction = true; return agg.reduce(aggregations, reduceContext); } } return this; } + @Override + public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, + ReduceContext reduceContext) { + // If we delegated away the reduction, another aggregation has already run + // pipeline aggs and we should just return the current tree + if (delegatedReduction) { + return reducedAggregations; + } + return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); + } + @Override public final XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { return doXContentCommon(builder, params, 0, 0, Collections.emptyList()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java index c3075da827118..41221f9eec9f2 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java @@ -21,15 +21,26 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.filter.Filter; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.Sum; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder; import org.elasticsearch.test.ESIntegTestCase; import java.util.ArrayList; @@ -475,4 +486,55 @@ public void testNested() throws Exception { assertThat(maxBucketValue.value(), equalTo(maxTermsValue)); assertThat(maxBucketValue.keys(), equalTo(maxTermsKeys.toArray(new String[maxTermsKeys.size()]))); } + + /** + * https://github.com/elastic/elasticsearch/issues/33514 + * + * This bug manifests as the max_bucket agg ("peak") being added to the response twice, because + * the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps. + * The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms + * delegate reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then + * execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values. + * + * Applies to any pipeline agg, not just max. + */ + public void testFieldGetsWrittenOutTwice() throws Exception { + // you need to add an additional index with no fields in order to trigger this (or potentially a shard) + // so that there is an UnmappedTerms in the list to reduce. + createIndex("foo_1"); + + XContentBuilder builder = jsonBuilder().startObject().startObject("properties") + .startObject("@timestamp").field("type", "date").endObject() + .startObject("license").startObject("properties") + .startObject("count").field("type", "long").endObject() + .startObject("partnumber").field("type", "text").startObject("fields").startObject("keyword") + .field("type", "keyword").field("ignore_above", 256) + .endObject().endObject().endObject() + .endObject().endObject().endObject().endObject(); + assertAcked(client().admin().indices().prepareCreate("foo_2") + .addMapping("doc", builder).get()); + + XContentBuilder docBuilder = jsonBuilder().startObject() + .startObject("license").field("partnumber", "foobar").field("count", 2).endObject() + .field("@timestamp", "2018-07-08T08:07:00.599Z") + .endObject(); + + client().prepareIndex("foo_2", "doc").setSource(docBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + client().admin().indices().prepareRefresh(); + + TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber").field("license.partnumber.keyword"); + MaxBucketPipelineAggregationBuilder peakPipelineAggBuilder = + PipelineAggregatorBuilders.maxBucket("peak", "licenses_per_day>total_licenses"); + SumAggregationBuilder sumAggBuilder = AggregationBuilders.sum("total_licenses").field("license.count"); + DateHistogramAggregationBuilder licensePerDayBuilder = + AggregationBuilders.dateHistogram("licenses_per_day").field("@timestamp").dateHistogramInterval(DateHistogramInterval.DAY); + licensePerDayBuilder.subAggregation(sumAggBuilder); + groupByLicenseAgg.subAggregation(licensePerDayBuilder); + groupByLicenseAgg.subAggregation(peakPipelineAggBuilder); + + SearchResponse response = client().prepareSearch("foo_*").setSize(0).addAggregation(groupByLicenseAgg).get(); + BytesReference bytes = XContentHelper.toXContent(response, XContentType.JSON, false); + XContentHelper.convertToMap(bytes, false, XContentType.JSON); + } } From 35ed1d7cb6d4aff30ae857374d23e1cfbbb392ee Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 7 Sep 2018 14:14:04 -0400 Subject: [PATCH 2/6] Other unmapped aggs should get same treatment --- .../bucket/sampler/UnmappedSampler.java | 13 +++++++++++++ .../significant/UnmappedSignificantTerms.java | 14 +++++++++++++- .../aggregations/bucket/terms/UnmappedTerms.java | 9 ++++----- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java index 3459e110d7eff..8a896ef8cc474 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -31,6 +31,7 @@ public class UnmappedSampler extends InternalSampler { public static final String NAME = "unmapped_sampler"; + private boolean delegatedReduction = false; UnmappedSampler(String name, List pipelineAggregators, Map metaData) { super(name, 0, InternalAggregations.EMPTY, pipelineAggregators, metaData); @@ -52,12 +53,24 @@ public String getWriteableName() { public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation agg : aggregations) { if (!(agg instanceof UnmappedSampler)) { + delegatedReduction = true; return agg.reduce(aggregations, reduceContext); } } return this; } + @Override + public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, + ReduceContext reduceContext) { + // If we delegated away the reduction, another aggregation has already run + // pipeline aggs and we should just return the current tree + if (delegatedReduction) { + return reducedAggregations; + } + return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); + } + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.field(Aggregation.CommonFields.DOC_COUNT.getPreferredName(), 0); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index 66fc171bbe3bd..96bb7e5b6321a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -44,7 +44,7 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms { public static final String NAME = "umsigterms"; - + private boolean delegatedReduction = false; /** * Concrete type that can't be built because Java needs a concrete type so {@link InternalTerms.Bucket} can have a self type but * {@linkplain UnmappedTerms} doesn't ever need to build it because it never returns any buckets. @@ -102,12 +102,24 @@ protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, Li public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation aggregation : aggregations) { if (!(aggregation instanceof UnmappedSignificantTerms)) { + delegatedReduction = true; return aggregation.reduce(aggregations, reduceContext); } } return this; } + @Override + public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, + ReduceContext reduceContext) { + // If we delegated away the reduction, another aggregation has already run + // pipeline aggs and we should just return the current tree + if (delegatedReduction) { + return reducedAggregations; + } + return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); + } + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startArray(CommonFields.BUCKETS.getPreferredName()).endArray(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 7c00e2534df07..e4c4bd7b4e55e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -39,7 +39,6 @@ */ public class UnmappedTerms extends InternalTerms { public static final String NAME = "umterms"; - private boolean delegatedReduction = false; /** @@ -111,10 +110,10 @@ public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregati ReduceContext reduceContext) { // If we delegated away the reduction, another aggregation has already run // pipeline aggs and we should just return the current tree - if (delegatedReduction) { - return reducedAggregations; - } - return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); + if (delegatedReduction) { + return reducedAggregations; + } + return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); } @Override From 6279e8ae6f82fb3ad99be289c2dfa3f8bdbe96f8 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 7 Sep 2018 14:37:30 -0400 Subject: [PATCH 3/6] checkstyle --- .../search/aggregations/pipeline/MaxBucketIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java index 41221f9eec9f2..b8ef1b69beab7 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java @@ -523,7 +523,8 @@ public void testFieldGetsWrittenOutTwice() throws Exception { client().admin().indices().prepareRefresh(); - TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber").field("license.partnumber.keyword"); + TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber") + .field("license.partnumber.keyword"); MaxBucketPipelineAggregationBuilder peakPipelineAggBuilder = PipelineAggregatorBuilders.maxBucket("peak", "licenses_per_day>total_licenses"); SumAggregationBuilder sumAggBuilder = AggregationBuilders.sum("total_licenses").field("license.count"); From 7bde61a86d623ff27c3c4d3e61e1f6cf5b5dcd15 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 13 Sep 2018 15:40:11 -0400 Subject: [PATCH 4/6] Rework to use sorting --- .../aggregations/InternalAggregation.java | 12 ++--- .../aggregations/InternalAggregations.java | 12 +++++ .../bucket/sampler/UnmappedSampler.java | 12 +---- .../significant/UnmappedSignificantTerms.java | 17 ++----- .../bucket/terms/UnmappedTerms.java | 12 +---- .../test/InternalAggregationTestCase.java | 47 ++++++++++++------- 6 files changed, 56 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 31a40f0da8a5b..305dea0d82d6c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -134,19 +134,17 @@ public String getName() { public final InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { InternalAggregation aggResult = doReduce(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { - aggResult = doPipelineReduce(aggResult, aggregations, reduceContext); + for (PipelineAggregator pipelineAggregator : pipelineAggregators) { + aggResult = pipelineAggregator.reduce(aggResult, reduceContext); + } } return aggResult; } public abstract InternalAggregation doReduce(List aggregations, ReduceContext reduceContext); - public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, - ReduceContext reduceContext) { - for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - reducedAggregations = pipelineAggregator.reduce(reducedAggregations, reduceContext); - } - return reducedAggregations; + public boolean isMapped() { + return true; } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 57170e2f8ab13..b8ee5800ea7b0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,6 +38,15 @@ public final class InternalAggregations extends Aggregations implements Streamable { public static final InternalAggregations EMPTY = new InternalAggregations(); + private static final Comparator INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> { + if (agg1.isMapped() == agg2.isMapped()) { + return 0; + } else if (agg1.isMapped() && agg2.isMapped() == false) { + return -1; + } else { + return 1; + } + }; private InternalAggregations() { } @@ -73,6 +83,8 @@ public static InternalAggregations reduce(List aggregation List reducedAggregations = new ArrayList<>(); for (Map.Entry> entry : aggByName.entrySet()) { List aggregations = entry.getValue(); + // Sort aggregations so that unmapped aggs come last in the list + aggregations.sort(INTERNAL_AGG_COMPARATOR); InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(aggregations, context)); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java index 8a896ef8cc474..481b57ff60590 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -31,7 +31,6 @@ public class UnmappedSampler extends InternalSampler { public static final String NAME = "unmapped_sampler"; - private boolean delegatedReduction = false; UnmappedSampler(String name, List pipelineAggregators, Map metaData) { super(name, 0, InternalAggregations.EMPTY, pipelineAggregators, metaData); @@ -53,7 +52,6 @@ public String getWriteableName() { public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation agg : aggregations) { if (!(agg instanceof UnmappedSampler)) { - delegatedReduction = true; return agg.reduce(aggregations, reduceContext); } } @@ -61,14 +59,8 @@ public InternalAggregation doReduce(List aggregations, Redu } @Override - public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, - ReduceContext reduceContext) { - // If we delegated away the reduction, another aggregation has already run - // pipeline aggs and we should just return the current tree - if (delegatedReduction) { - return reducedAggregations; - } - return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); + public boolean isMapped() { + return false; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index 96bb7e5b6321a..8f3cde8f566f2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -44,20 +44,20 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms { public static final String NAME = "umsigterms"; - private boolean delegatedReduction = false; + /** * Concrete type that can't be built because Java needs a concrete type so {@link InternalTerms.Bucket} can have a self type but * {@linkplain UnmappedTerms} doesn't ever need to build it because it never returns any buckets. */ protected abstract static class Bucket extends InternalSignificantTerms.Bucket { private Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations, - DocValueFormat format) { + DocValueFormat format) { super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); } } public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, List pipelineAggregators, - Map metaData) { + Map metaData) { super(name, requiredSize, minDocCount, pipelineAggregators, metaData); } @@ -102,7 +102,6 @@ protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, Li public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation aggregation : aggregations) { if (!(aggregation instanceof UnmappedSignificantTerms)) { - delegatedReduction = true; return aggregation.reduce(aggregations, reduceContext); } } @@ -110,14 +109,8 @@ public InternalAggregation doReduce(List aggregations, Redu } @Override - public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, - ReduceContext reduceContext) { - // If we delegated away the reduction, another aggregation has already run - // pipeline aggs and we should just return the current tree - if (delegatedReduction) { - return reducedAggregations; - } - return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); + public boolean isMapped() { + return false; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index e4c4bd7b4e55e..135390200392c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -39,7 +39,6 @@ */ public class UnmappedTerms extends InternalTerms { public static final String NAME = "umterms"; - private boolean delegatedReduction = false; /** * Concrete type that can't be built because Java needs a concrete type so {@link InternalTerms.Bucket} can have a self type but @@ -98,7 +97,6 @@ protected UnmappedTerms create(String name, List buckets, long docCountE public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation agg : aggregations) { if (!(agg instanceof UnmappedTerms)) { - delegatedReduction = true; return agg.reduce(aggregations, reduceContext); } } @@ -106,14 +104,8 @@ public InternalAggregation doReduce(List aggregations, Redu } @Override - public InternalAggregation doPipelineReduce(InternalAggregation reducedAggregations, List aggregations, - ReduceContext reduceContext) { - // If we delegated away the reduction, another aggregation has already run - // pipeline aggs and we should just return the current tree - if (delegatedReduction) { - return reducedAggregations; - } - return super.doPipelineReduce(reducedAggregations, aggregations, reduceContext); + public boolean isMapped() { + return false; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 1149c7b0941ce..facbc6ec84b76 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -86,36 +86,36 @@ import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedAvg; import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedCardinality; +import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds; import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid; -import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedMax; -import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedMin; import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentileRanks; import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles; -import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks; -import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles; import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks; import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ParsedAvg; +import org.elasticsearch.search.aggregations.metrics.ParsedCardinality; +import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats; +import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds; +import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid; +import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks; +import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles; +import org.elasticsearch.search.aggregations.metrics.ParsedMax; +import org.elasticsearch.search.aggregations.metrics.ParsedMin; +import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric; +import org.elasticsearch.search.aggregations.metrics.ParsedStats; +import org.elasticsearch.search.aggregations.metrics.ParsedSum; import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentileRanks; import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentiles; -import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric; +import org.elasticsearch.search.aggregations.metrics.ParsedTopHits; +import org.elasticsearch.search.aggregations.metrics.ParsedValueCount; import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedStats; import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats; -import org.elasticsearch.search.aggregations.metrics.ParsedSum; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedTopHits; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedValueCount; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; @@ -134,6 +134,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -153,6 +154,16 @@ public abstract class InternalAggregationTestCase public static final int DEFAULT_MAX_BUCKETS = 100000; protected static final double TOLERANCE = 1e-10; + private static final Comparator INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> { + if (agg1.isMapped() == agg2.isMapped()) { + return 0; + } else if (agg1.isMapped() && agg2.isMapped() == false) { + return -1; + } else { + return 1; + } + }; + private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables()); @@ -239,6 +250,8 @@ public void testReduceRandom() { inputs.add(t); toReduce.add(t); } + // Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce() + inputs.sort(INTERNAL_AGG_COMPARATOR); ScriptService mockScriptService = mockScriptService(); MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); if (randomBoolean() && toReduce.size() > 1) { From 5aa70f13fc776555016583db38b3e004b2bf1423 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 13 Sep 2018 16:00:53 -0400 Subject: [PATCH 5/6] Remove delegation ability of unmapped aggs --- .../search/aggregations/InternalAggregation.java | 4 ++++ .../search/aggregations/InternalAggregations.java | 1 + .../search/aggregations/bucket/sampler/UnmappedSampler.java | 5 ----- .../bucket/significant/UnmappedSignificantTerms.java | 5 ----- .../search/aggregations/bucket/terms/UnmappedTerms.java | 5 ----- 5 files changed, 5 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 305dea0d82d6c..eafdbe109776b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -143,6 +143,10 @@ public final InternalAggregation reduce(List aggregations, public abstract InternalAggregation doReduce(List aggregations, ReduceContext reduceContext); + /** + * Return true if this aggregation is mapped, and can lead a reduction. If this agg returns + * false, it should return itself if asked to lead a reduction + */ public boolean isMapped() { return true; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index b8ee5800ea7b0..95140b50d2bdf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -84,6 +84,7 @@ public static InternalAggregations reduce(List aggregation for (Map.Entry> entry : aggByName.entrySet()) { List aggregations = entry.getValue(); // Sort aggregations so that unmapped aggs come last in the list + // If all aggs are unmapped, the agg that leads the reduction will just return itself aggregations.sort(INTERNAL_AGG_COMPARATOR); InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(aggregations, context)); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java index 481b57ff60590..293d44259c6a9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -50,11 +50,6 @@ public String getWriteableName() { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - for (InternalAggregation agg : aggregations) { - if (!(agg instanceof UnmappedSampler)) { - return agg.reduce(aggregations, reduceContext); - } - } return this; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index 8f3cde8f566f2..4acbd2c088a4b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -100,11 +100,6 @@ protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, Li @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - for (InternalAggregation aggregation : aggregations) { - if (!(aggregation instanceof UnmappedSignificantTerms)) { - return aggregation.reduce(aggregations, reduceContext); - } - } return this; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 135390200392c..22306324a8bdd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -95,11 +95,6 @@ protected UnmappedTerms create(String name, List buckets, long docCountE @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - for (InternalAggregation agg : aggregations) { - if (!(agg instanceof UnmappedTerms)) { - return agg.reduce(aggregations, reduceContext); - } - } return this; } From 82f2938e767bf3426342ba634be6e921409e6837 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 25 Sep 2018 14:07:02 -0400 Subject: [PATCH 6/6] Rename test, make unmapped aggs return new object --- .../search/aggregations/bucket/sampler/UnmappedSampler.java | 2 +- .../bucket/significant/UnmappedSignificantTerms.java | 2 +- .../search/aggregations/bucket/terms/UnmappedTerms.java | 2 +- .../search/aggregations/pipeline/MaxBucketIT.java | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java index 293d44259c6a9..5f5f557ffd561 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -50,7 +50,7 @@ public String getWriteableName() { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - return this; + return new UnmappedSampler(name, pipelineAggregators(), metaData); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index 4acbd2c088a4b..f2c9f8b29adc2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -100,7 +100,7 @@ protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, Li @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - return this; + return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 22306324a8bdd..17a3e603b6fcf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -95,7 +95,7 @@ protected UnmappedTerms create(String name, List buckets, long docCountE @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - return this; + return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java index b8ef1b69beab7..4841c5e596a16 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java @@ -493,12 +493,12 @@ public void testNested() throws Exception { * This bug manifests as the max_bucket agg ("peak") being added to the response twice, because * the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps. * The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms - * delegate reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then + * delegated reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then * execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values. * * Applies to any pipeline agg, not just max. */ - public void testFieldGetsWrittenOutTwice() throws Exception { + public void testFieldIsntWrittenOutTwice() throws Exception { // you need to add an additional index with no fields in order to trigger this (or potentially a shard) // so that there is an UnmappedTerms in the list to reduce. createIndex("foo_1");