From 0039ef195b053f20562c844ef5505417930409e2 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 11 Dec 2019 16:34:48 -0500 Subject: [PATCH] SingleBucket aggs need to reduce their bucket's pipelines first When decoupling the pipeline reduction from regular agg reduction, MultiBucket aggs were modified to reduce their bucket's pipeline aggs first before reducing the sibling aggs. This modification was missed on SingleBucket aggs, meaning any SingleBucket would fail to reduce any pipeline sub-aggs --- .../InternalSingleBucketAggregation.java | 16 ++++ .../bucket/nested/NestedAggregatorTests.java | 86 +++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 0a34e7a92b8f1..bb0b2ad0a3050 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -109,6 +109,22 @@ public InternalAggregation reduce(List aggregations, Reduce return newAggregation(getName(), docCount, aggs); } + /** + * Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a single-bucket + * agg needs to first reduce the aggs in it's bucket (and their parent pipelines) before allowing sibling pipelines + * to reduce + */ + @Override + public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) { + assert reduceContext.isFinalReduce(); + List aggs = new ArrayList<>(); + for (Aggregation agg : getAggregations().asList()) { + aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext)); + } + InternalAggregations reducedSubAggs = new InternalAggregations(aggs); + return super.reducePipelines(create(reducedSubAggs), reduceContext); + } + @Override public Object getProperty(List path) { if (path.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java index f3c1ea7ca529a..4983575860f86 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java @@ -39,6 +39,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -47,11 +48,19 @@ import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -62,12 +71,16 @@ import org.elasticsearch.search.aggregations.metrics.Min; import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.elasticsearch.search.aggregations.support.ValueType; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -86,6 +99,7 @@ public class NestedAggregatorTests extends AggregatorTestCase { private static final String NESTED_AGG = "nestedAgg"; private static final String MAX_AGG_NAME = "maxAgg"; private static final String SUM_AGG_NAME = "sumAgg"; + private static final String INVERSE_SCRIPT = "inverse"; private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); @@ -99,6 +113,18 @@ protected Map getFieldAliases(MappedFieldType... fieldT Function.identity())); } + @Override + protected ScriptService getMockScriptService() { + Map, Object>> scripts = new HashMap<>(); + scripts.put(INVERSE_SCRIPT, vars -> -((Number) vars.get("_value")).doubleValue()); + MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, + scripts, + Collections.emptyMap()); + Map engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine); + + return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS); + } + public void testNoDocs() throws IOException { try (Directory directory = newDirectory()) { try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) { @@ -709,6 +735,66 @@ public void testFieldAlias() throws IOException { } } + /** + * This tests to make sure pipeline aggs embedded under a SingleBucket agg (like nested) + * are properly reduced + */ + public void testNestedWithPipeline() throws IOException { + int numRootDocs = randomIntBetween(1, 20); + int expectedNestedDocs = 0; + double expectedMaxValue = Double.NEGATIVE_INFINITY; + try (Directory directory = newDirectory()) { + try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) { + for (int i = 0; i < numRootDocs; i++) { + List documents = new ArrayList<>(); + expectedMaxValue = Math.max(expectedMaxValue, + generateMaxDocs(documents, 1, i, NESTED_OBJECT, VALUE_FIELD_NAME)); + expectedNestedDocs += 1; + + Document document = new Document(); + document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(i)), IdFieldMapper.Defaults.FIELD_TYPE)); + document.add(new Field(TypeFieldMapper.NAME, "test", + TypeFieldMapper.Defaults.FIELD_TYPE)); + document.add(sequenceIDFields.primaryTerm); + documents.add(document); + iw.addDocuments(documents); + } + iw.commit(); + } + try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) { + NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT) + .subAggregation(new TermsAggregationBuilder("terms", ValueType.NUMERIC).field(VALUE_FIELD_NAME) + .subAggregation(new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME)) + .subAggregation(new BucketScriptPipelineAggregationBuilder("bucketscript", + Collections.singletonMap("_value", MAX_AGG_NAME), + new Script(ScriptType.INLINE, MockScriptEngine.NAME, INVERSE_SCRIPT, Collections.emptyMap())))); + + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + fieldType.setName(VALUE_FIELD_NAME); + + InternalNested nested = searchAndReduce(newSearcher(indexReader, false, true), + new MatchAllDocsQuery(), nestedBuilder, fieldType); + + assertEquals(expectedNestedDocs, nested.getDocCount()); + assertEquals(NESTED_AGG, nested.getName()); + assertEquals(expectedNestedDocs, nested.getDocCount()); + + InternalTerms terms = (InternalTerms) nested.getProperty("terms"); + assertNotNull(terms); + + for (LongTerms.Bucket bucket : terms.getBuckets()) { + InternalMax max = (InternalMax) bucket.getAggregations().asMap().get(MAX_AGG_NAME); + InternalSimpleValue bucketScript = (InternalSimpleValue) bucket.getAggregations().asMap().get("bucketscript"); + assertNotNull(max); + assertNotNull(bucketScript); + assertEquals(max.getValue(), -bucketScript.getValue(), Double.MIN_VALUE); + } + + assertTrue(AggregationInspectionHelper.hasValue(nested)); + } + } + } + private double generateMaxDocs(List documents, int numNestedDocs, int id, String path, String fieldName) { return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName)) .max().orElse(Double.NEGATIVE_INFINITY);