diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index f54f101041d1b..0125084c37099 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -486,7 +486,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection pipelineAggregators = context.aggregations().factories().createPipelineAggregators(); List siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size()); for (PipelineAggregator pipelineAggregator : pipelineAggregators) { @@ -144,7 +143,7 @@ public void execute(SearchContext context) { + "allowed at the top level"); } } - context.queryResult().pipelineAggregators(siblingPipelineAggregators); + context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators)); // disable aggregations so that they don't run on next pages in case of scrolling context.aggregations(null); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 70135c2d51e73..08c389675ad72 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -78,7 +78,7 @@ public InternalAggregations(List aggregations, List getTopLevelPipelineAggregators() { + public List getTopLevelPipelineAggregators() { return topLevelPipelineAggregators; } @@ -92,20 +92,7 @@ public static InternalAggregations reduce(List aggregation if (aggregationsList.isEmpty()) { return null; } - InternalAggregations first = aggregationsList.get(0); - return reduce(aggregationsList, first.topLevelPipelineAggregators, context); - } - - /** - * Reduces the given list of aggregations as well as the provided top-level pipeline aggregators. - * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. - */ - public static InternalAggregations reduce(List aggregationsList, - List topLevelPipelineAggregators, - ReduceContext context) { - if (aggregationsList.isEmpty()) { - return null; - } + List topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators(); // first we collect all aggregations of the same type and list them together Map> aggByName = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 55787dfc53a35..db8b67ca109db 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -29,6 +29,7 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; @@ -38,7 +39,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; @@ -55,7 +55,6 @@ public final class QuerySearchResult extends SearchPhaseResult { private DocValueFormat[] sortValueFormats; private InternalAggregations aggregations; private boolean hasAggs; - private List pipelineAggregators = Collections.emptyList(); private Suggest suggest; private boolean searchTimedOut; private Boolean terminatedEarly = null; @@ -199,14 +198,6 @@ public void profileResults(ProfileShardResult shardResults) { hasProfileResults = shardResults != null; } - public List pipelineAggregators() { - return pipelineAggregators; - } - - public void pipelineAggregators(List pipelineAggregators) { - this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators); - } - public Suggest suggest() { return suggest; } @@ -295,8 +286,18 @@ public void readFromWithId(long id, StreamInput in) throws IOException { if (hasAggs = in.readBoolean()) { aggregations = InternalAggregations.readAggregations(in); } - pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a) - .collect(Collectors.toList()); + if (in.getVersion().before(Version.V_7_1_0)) { + List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() + .map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList()); + if (hasAggs && pipelineAggregators.isEmpty() == false) { + List internalAggs = aggregations.asList().stream() + .map(agg -> (InternalAggregation) agg).collect(Collectors.toList()); + //Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while + //later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of + //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1. + this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators); + } + } if (in.readBoolean()) { suggest = new Suggest(in); } @@ -338,7 +339,16 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); aggregations.writeTo(out); } - out.writeNamedWriteableList(pipelineAggregators); + if (out.getVersion().before(Version.V_7_1_0)) { + //Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, + //while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of + //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on. + if (aggregations == null) { + out.writeNamedWriteableList(Collections.emptyList()); + } else { + out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators()); + } + } if (suggest == null) { out.writeBoolean(false); } else { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index 45107fa4a193f..0165a1b3ae0f8 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -50,18 +50,19 @@ public class InternalAggregationsTests extends ESTestCase { public void testReduceEmptyAggs() { List aggs = Collections.emptyList(); InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean()); - assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext)); + assertNull(InternalAggregations.reduce(aggs, reduceContext)); } public void testNonFinalReduceTopLevelPipelineAggs() { InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); - List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms))); List topLevelPipelineAggs = new ArrayList<>(); MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create()); + List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms), + topLevelPipelineAggs)); InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); - InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext); + InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext); assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(1, reducedAggs.aggregations.size()); } @@ -79,15 +80,15 @@ public void testFinalReduceTopLevelPipelineAggs() { Collections.singletonList(siblingPipelineAggregator)); reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); } else { - InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); - List topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator); - reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext); + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), + Collections.singletonList(siblingPipelineAggregator)); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); } assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); } - public void testSerialization() throws Exception { + public static InternalAggregations createTestInstance() throws Exception { List aggsList = new ArrayList<>(); if (randomBoolean()) { StringTermsTests stringTermsTests = new StringTermsTests(); @@ -116,7 +117,11 @@ public void testSerialization() throws Exception { topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); } } - InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); + return new InternalAggregations(aggsList, topLevelPipelineAggs); + } + + public void testSerialization() throws Exception { + InternalAggregations aggregations = createTestInstance(); writeToAndReadFrom(aggregations, 0); } diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java new file mode 100644 index 0000000000000..29ee7c9238e9f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.query; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalAggregationsTests; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.suggest.SuggestTests; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import java.io.IOException; +import java.util.Base64; +import java.util.List; + +import static java.util.Collections.emptyList; + +public class QuerySearchResultTests extends ESTestCase { + + private final NamedWriteableRegistry namedWriteableRegistry; + + public QuerySearchResultTests() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); + this.namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + } + + private static QuerySearchResult createTestInstance() throws Exception { + ShardId shardId = new ShardId("index", "uuid", randomInt()); + QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE)); + if (randomBoolean()) { + result.terminatedEarly(randomBoolean()); + } + TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]); + result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]); + result.size(randomInt()); + result.from(randomInt()); + if (randomBoolean()) { + result.suggest(SuggestTests.createTestItem()); + } + if (randomBoolean()) { + result.aggregations(InternalAggregationsTests.createTestInstance()); + } + return result; + } + + public void testSerialization() throws Exception { + QuerySearchResult querySearchResult = createTestInstance(); + Version version = VersionUtils.randomVersion(random()); + QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version); + assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId()); + assertNull(deserialized.getSearchShardTarget()); + assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); + assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits); + assertEquals(querySearchResult.from(), deserialized.from()); + assertEquals(querySearchResult.size(), deserialized.size()); + assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); + if (deserialized.hasAggs()) { + Aggregations aggs = querySearchResult.consumeAggs(); + Aggregations deserializedAggs = deserialized.consumeAggs(); + assertEquals(aggs.asList(), deserializedAggs.asList()); + List pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators(); + List deserializedPipelineAggs = + ((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators(); + assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size()); + for (int i = 0; i < pipelineAggs.size(); i++) { + SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i); + SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i); + assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths()); + assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name()); + } + } + assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); + } + + public void testReadFromPre_7_1_0() throws IOException { + String message = "AAAAAAAAAGQAAAEAAAB/wAAAAAEBBnN0ZXJtcwVJblhNRgoDBVNhdWpvAAVrS3l3cwVHSVVZaAAFZXRUbEUFZGN0WVoABXhzYnVrAAEDAfoN" + + "A3JhdwUBAAJRAAAAAAAAA30DBnN0ZXJtcwVNdVVFRwoAAAEDAfoNA3JhdwUBAAdDAAAAAAAAA30AAApQVkFhaUxSdHh5TAAAAAAAAAN9AAAKTVRUeUxnd1hyd" + + "y0AAAAAAAADfQAACnZRQXZ3cWp0SmwPAAAAAAAAA30AAApmYXNyUUhNVWZBCwAAAAAAAAN9AAAKT3FIQ2RMZ1JZUwUAAAAAAAADfQAACm9jT05aZmZ4ZmUmAA" + + "AAAAAAA30AAApvb0tJTkdvbHdzBnN0ZXJtcwVtRmlmZAoAAAEDAfoNA3JhdwUBAARXAAAAAAAAA30AAApZd3BwQlpBZEhpMQAAAAAAAAN9AAAKREZ3UVpTSXh" + + "DSE4AAAAAAAADfQAAClVMZW1YZGtkSHUUAAAAAAAAA30AAApBUVdKVk1kTlF1BnN0ZXJtcwVxbkJGVgoAAAEDAfoNA3JhdwUBAAYJAAAAAAAAA30AAApBS2NL" + + "U1ZVS25EIQAAAAAAAAN9AAAKWGpCbXZBZmduRhsAAAAAAAADfQAACk54TkJEV3pLRmI7AAAAAAAAA30AAApydkdaZnJycXhWSAAAAAAAAAN9AAAKSURVZ3JhQ" + + "lFHSy4AAAAAAAADfQAACmJmZ0x5YlFlVksAClRJZHJlSkpVc1Y4AAAAAAAAA30DBnN0ZXJtcwVNdVVFRwoAAAEDAfoNA3JhdwUBAAdDAAAAAAAAA30AAApQVk" + + "FhaUxSdHh5TAAAAAAAAAN9AAAKTVRUeUxnd1hydy0AAAAAAAADfQAACnZRQXZ3cWp0SmwPAAAAAAAAA30AAApmYXNyUUhNVWZBCwAAAAAAAAN9AAAKT3FIQ2R" + + "MZ1JZUwUAAAAAAAADfQAACm9jT05aZmZ4ZmUmAAAAAAAAA30AAApvb0tJTkdvbHdzBnN0ZXJtcwVtRmlmZAoAAAEDAfoNA3JhdwUBAARXAAAAAAAAA30AAApZ" + + "d3BwQlpBZEhpMQAAAAAAAAN9AAAKREZ3UVpTSXhDSE4AAAAAAAADfQAAClVMZW1YZGtkSHUUAAAAAAAAA30AAApBUVdKVk1kTlF1BnN0ZXJtcwVxbkJGVgoAA" + + "AEDAfoNA3JhdwUBAAYJAAAAAAAAA30AAApBS2NLU1ZVS25EIQAAAAAAAAN9AAAKWGpCbXZBZmduRhsAAAAAAAADfQAACk54TkJEV3pLRmI7AAAAAAAAA30AAA" + + "pydkdaZnJycXhWSAAAAAAAAAN9AAAKSURVZ3JhQlFHSy4AAAAAAAADfQAACmJmZ0x5YlFlVksACm5rdExLUHp3cGgBCm1heF9idWNrZXQFbmFtZTEBB2J1Y2t" + + "ldDH/A3JhdwEBCm1heF9idWNrZXQFbmFtZTEBB2J1Y2tldDH/A3JhdwEAAAIAAf////8AAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + byte[] bytes = Base64.getDecoder().decode(message); + try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry)) { + in.setVersion(Version.V_7_0_0); + QuerySearchResult querySearchResult = new QuerySearchResult(); + querySearchResult.readFrom(in); + assertEquals(100, querySearchResult.getRequestId()); + assertTrue(querySearchResult.hasAggs()); + InternalAggregations aggs = (InternalAggregations)querySearchResult.consumeAggs(); + assertEquals(1, aggs.asList().size()); + //top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately + assertEquals(1, aggs.getTopLevelPipelineAggregators().size()); + } + } +}