From f6a868ef60809bf488e28249556e5bb0fa4c3779 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 27 Dec 2018 11:40:30 +0100 Subject: [PATCH] Skip final reduction if SearchRequest holds a cluster alias With #36997 we added the ability to provide a cluster alias with a SearchRequest. The next step is to disable the final reduction whenever a cluster alias is provided with the SearchRequest. A cluster alias will be provided when executing a cross-cluster search request with alternate execution mode, where each cluster does its own reduction locally. In order for the CCS node to be able to later perform an additional reduction of the results, we need to make sure that all the needed info stays available. This means that terms aggregations can be reduced but not pruned, and pipeline aggs should not be executed. The final reduction will happen later in the CCS coordinating node. Relates to #36997 & #32125 --- .../action/search/SearchPhaseController.java | 29 ++++++---- .../search/SearchPhaseControllerTests.java | 56 +++++++++++++------ 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 82f7760c1abdc..66b0317146faa 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -407,17 +407,18 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedScrollQueryPhase(Collection queryResults) { - return reducedQueryPhase(queryResults, true, true); + ReducedQueryPhase reducedScrollQueryPhase(Collection queryResults) { + return reducedQueryPhase(queryResults, true, true, true); } /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedQueryPhase(Collection queryResults, - boolean isScrollRequest, boolean trackTotalHits) { - return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest); + ReducedQueryPhase reducedQueryPhase(Collection queryResults, + boolean isScrollRequest, boolean trackTotalHits, boolean performFinalReduce) { + return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest, + performFinalReduce); } /** @@ -433,7 +434,8 @@ public ReducedQueryPhase reducedQueryPhase(Collection queryResults, List bufferedAggs, List bufferedTopDocs, - TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) { + TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest, + boolean performFinalReduce) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase boolean timedOut = false; @@ -499,7 +501,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection= 2 if there is more than one expected result"); @@ -644,6 +647,7 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR this.hasTopDocs = hasTopDocs; this.hasAggs = hasAggs; this.bufferSize = bufferSize; + this.performFinalReduce = performFinalReduce; } @Override @@ -693,7 +697,7 @@ private synchronized List getRemainingTopDocs() { @Override public ReducedQueryPhase reduce() { return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, - numReducePhases, false); + numReducePhases, false, performFinalReduce); } /** @@ -715,18 +719,19 @@ InitialSearchPhase.ArraySearchPhaseResults newSearchPhaseResu final boolean hasAggs = source != null && source.aggregations() != null; final boolean hasTopDocs = source == null || source.size() != 0; final boolean trackTotalHits = source == null || source.trackTotalHits(); + final boolean finalReduce = request.getLocalClusterAlias() == null; if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs); + return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, finalReduce); } } return new InitialSearchPhase.ArraySearchPhaseResults(numShards) { @Override ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits); + return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, finalReduce); } }; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 585108fef8a32..3859e3b7f384a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -56,28 +56,34 @@ import org.junit.Before; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class SearchPhaseControllerTests extends ESTestCase { private SearchPhaseController searchPhaseController; + private List reductions; @Before public void setup() { + reductions = new CopyOnWriteArrayList<>(); searchPhaseController = new SearchPhaseController( - (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); + (finalReduce) -> { + reductions.add(finalReduce); + return new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, finalReduce); + }); } public void testSort() { @@ -158,7 +164,7 @@ public void testMerge() { AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); for (boolean trackTotalHits : new boolean[] {true, false}) { SearchPhaseController.ReducedQueryPhase reducedQueryPhase = - searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits); + searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits, true); AtomicArray fetchResults = generateFetchResults(nShards, reducedQueryPhase.sortedTopDocs.scoreDocs, reducedQueryPhase.suggest); InternalSearchResponse mergedResponse = searchPhaseController.merge(false, @@ -308,14 +314,15 @@ private static AtomicArray generateFetchResults(int nShards, public void testConsumer() { int bufferSize = randomIntBetween(2, 3); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); + assertEquals(0, reductions.size()); QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); - InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW, + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(0); @@ -324,7 +331,7 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); - aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW, + aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(2); @@ -333,23 +340,29 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); - aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW, + aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(1); consumer.consumeResult(result); - int numTotalReducePhases = 1; + final int numTotalReducePhases; if (bufferSize == 2) { assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases()); assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered()); - numTotalReducePhases++; + assertEquals(1, reductions.size()); + assertEquals(false, reductions.get(0)); + numTotalReducePhases = 2; } else { assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class))); + assertEquals(0, reductions.size()); + numTotalReducePhases = 1; } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); assertEquals(numTotalReducePhases, reduce.numReducePhases); + assertEquals(numTotalReducePhases, reductions.size()); + assertFinalReduction(request); InternalMax max = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(3.0D, max.getValue(), 0.0D); assertFalse(reduce.sortedTopDocs.isSortedByField); @@ -362,7 +375,7 @@ public void testConsumerConcurrently() throws InterruptedException { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -378,7 +391,7 @@ public void testConsumerConcurrently() throws InterruptedException { result.topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(0, number)}), number), new DocValueFormat[0]); - InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(id); @@ -392,6 +405,7 @@ public void testConsumerConcurrently() throws InterruptedException { threads[i].join(); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(max.get(), internalMax.getValue(), 0.0D); assertEquals(1, reduce.sortedTopDocs.scoreDocs.length); @@ -407,7 +421,7 @@ public void testConsumerConcurrently() throws InterruptedException { public void testConsumerOnlyAggs() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -419,7 +433,7 @@ public void testConsumerOnlyAggs() { QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), number), new DocValueFormat[0]); - InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(i); @@ -427,6 +441,7 @@ public void testConsumerOnlyAggs() { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(max.get(), internalMax.getValue(), 0.0D); assertEquals(0, reduce.sortedTopDocs.scoreDocs.length); @@ -441,7 +456,7 @@ public void testConsumerOnlyAggs() { public void testConsumerOnlyHits() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); if (randomBoolean()) { request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10))); } @@ -460,6 +475,7 @@ public void testConsumerOnlyHits() { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); assertEquals(1, reduce.sortedTopDocs.scoreDocs.length); assertEquals(max.get(), reduce.maxScore, 0.0f); assertEquals(expectedNumResults, reduce.totalHits.value); @@ -470,6 +486,12 @@ public void testConsumerOnlyHits() { assertNull(reduce.sortedTopDocs.collapseValues); } + private void assertFinalReduction(SearchRequest searchRequest) { + assertThat(reductions.size(), greaterThanOrEqualTo(1)); + //the last reduction step was the final one only if no cluster alias was provided with the search request + assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1)); + } + public void testNewSearchPhaseResults() { for (int i = 0; i < 10; i++) { int expectedNumResults = randomIntBetween(1, 10); @@ -540,7 +562,7 @@ public void testReduceTopNWithFromOffset() { public void testConsumerSortByField() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); int size = randomIntBetween(1, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -560,6 +582,7 @@ public void testConsumerSortByField() { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); assertEquals(Math.min(expectedNumResults, size), reduce.sortedTopDocs.scoreDocs.length); assertEquals(expectedNumResults, reduce.totalHits.value); assertEquals(max.get(), ((FieldDoc)reduce.sortedTopDocs.scoreDocs[0]).fields[0]); @@ -574,7 +597,7 @@ public void testConsumerSortByField() { public void testConsumerFieldCollapsing() { int expectedNumResults = randomIntBetween(30, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); int size = randomIntBetween(5, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -596,6 +619,7 @@ public void testConsumerFieldCollapsing() { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); assertEquals(3, reduce.sortedTopDocs.scoreDocs.length); assertEquals(expectedNumResults, reduce.totalHits.value); assertEquals(a, ((FieldDoc)reduce.sortedTopDocs.scoreDocs[0]).fields[0]);