diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 6ca399a5f276a..5265372e8fe4a 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -28,6 +28,7 @@ import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.ConjunctionDISI; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; @@ -54,6 +55,7 @@ import org.elasticsearch.search.profile.query.QueryTimingType; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -138,6 +140,17 @@ private void checkCancelled() { } } + public void search(List leaves, Weight weight, CollectorManager manager) throws IOException { + final List collectors = new ArrayList<>(leaves.size()); + for (LeafReaderContext ctx : leaves) { + final Collector collector = manager.newCollector(); + //TODO: setMinCompetitveScore between Collectors + searchLeaf(ctx, weight, collector); + collectors.add(collector); + } + manager.reduce(collectors); + } + @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { for (LeafReaderContext ctx : leaves) { // search each subreader @@ -151,7 +164,7 @@ protected void search(List leaves, Weight weight, Collector c * {@link LeafCollector#collect(int)} is called for every matching document in * the provided ctx. */ - public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException { + private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException { checkCancelled(); weight = wrapWeight(weight); final LeafCollector leafCollector; @@ -228,6 +241,7 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException { } } + private static BitSet getSparseBitSetOrNull(Bits liveDocs) { if (liveDocs instanceof SparseFixedBitSet) { return (BitSet) liveDocs; diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 11f28c48f6f6c..fd31e1a137d1f 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -31,15 +31,18 @@ import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.Weight; @@ -71,6 +74,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; @@ -236,7 +240,7 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe newFormats[0] = DocValueFormat.RAW; // Add a tiebreak on _doc in order to be able to search // the leaves in any order. This is needed since we reorder - // the leaves based on the minimum value in each segment. + // the leaves based on the minimum/maxim value in each segment. newSortFields[newSortFields.length-1] = SortField.FIELD_DOC; newFormats[newSortFields.length-1] = DocValueFormat.RAW; System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length); @@ -286,61 +290,20 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe } else { checkCancelled = null; } - searcher.setCheckCancelled(checkCancelled); - final boolean doProfile = searchContext.getProfilers() != null; - // create the top docs collector last when the other collectors are known - final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector); - // add the top docs collector, the first collector context in the chain - collectors.addFirst(topDocsFactory); - - final Collector queryCollector; - if (doProfile) { - InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors); - searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector); - queryCollector = profileCollector; + boolean shouldRescore; + // if we are optimizing sort and there are no other collectors + if (sortAndFormatsForRewrittenNumericSort != null && collectors.size() == 0 && searchContext.getProfilers() == null) { + shouldRescore = searchWithCollectorManager(searchContext, searcher, query, leafSorter, timeoutSet); } else { - queryCollector = QueryCollectorContext.createQueryCollector(collectors); - } - - try { - Weight weight = searcher.createWeight(searcher.rewrite(query), queryCollector.scoreMode(), 1f); - // We search the leaves in a different order when the numeric sort optimization is - // activated. Collectors expect leaves in order when searching but this is fine in this - // case since we only have a TopFieldCollector and we force the tiebreak on _doc. - List leaves = new ArrayList<>(searcher.getIndexReader().leaves()); - leafSorter.accept(leaves); - for (LeafReaderContext ctx : leaves) { - searcher.searchLeaf(ctx, weight, queryCollector); - } - } catch (EarlyTerminatingCollector.EarlyTerminationException e) { - queryResult.terminatedEarly(true); - } catch (TimeExceededException e) { - assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set"; - - if (searchContext.request().allowPartialSearchResults() == false) { - // Can't rethrow TimeExceededException because not serializable - throw new QueryPhaseExecutionException(searchContext, "Time exceeded"); - } - queryResult.searchTimedOut(true); - } finally { - searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); - } - if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER - && queryResult.terminatedEarly() == null) { - queryResult.terminatedEarly(false); - } - - final QuerySearchResult result = searchContext.queryResult(); - for (QueryCollectorContext ctx : collectors) { - ctx.postProcess(result); + shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet); } // if we rewrote numeric long or date sort, restore fieldDocs based on the original sort if (sortAndFormatsForRewrittenNumericSort != null) { searchContext.sort(sortAndFormatsForRewrittenNumericSort); // restore SortAndFormats - restoreTopFieldDocs(result, sortAndFormatsForRewrittenNumericSort); + restoreTopFieldDocs(queryResult, sortAndFormatsForRewrittenNumericSort); } ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); @@ -351,14 +314,123 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe } if (searchContext.getProfilers() != null) { ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers()); - result.profileResults(shardResults); + queryResult.profileResults(shardResults); } - return topDocsFactory.shouldRescore(); + return shouldRescore; } catch (Exception e) { throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e); } } + private static boolean searchWithCollector(SearchContext searchContext, ContextIndexSearcher searcher, Query query, + LinkedList collectors, boolean hasFilterCollector, boolean timeoutSet) throws IOException { + // create the top docs collector last when the other collectors are known + final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector); + // add the top docs collector, the first collector context in the chain + collectors.addFirst(topDocsFactory); + + final Collector queryCollector; + if ( searchContext.getProfilers() != null) { + InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors); + searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector); + queryCollector = profileCollector; + } else { + queryCollector = QueryCollectorContext.createQueryCollector(collectors); + } + QuerySearchResult queryResult = searchContext.queryResult(); + try { + searcher.search(query, queryCollector); + } catch (EarlyTerminatingCollector.EarlyTerminationException e) { + queryResult.terminatedEarly(true); + } catch (TimeExceededException e) { + assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set"; + if (searchContext.request().allowPartialSearchResults() == false) { + // Can't rethrow TimeExceededException because not serializable + throw new QueryPhaseExecutionException(searchContext, "Time exceeded"); + } + queryResult.searchTimedOut(true); + } finally { + searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); + } + if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) { + queryResult.terminatedEarly(false); + } + for (QueryCollectorContext ctx : collectors) { + ctx.postProcess(queryResult); + } + return topDocsFactory.shouldRescore(); + } + + // we use collectorManager during sort optimization + // for the sort optimization, we have already checked that there are no other collectors, no filters, + // no search after, no scroll, no collapse, no track scores + // this means we can use TopFieldCollector directly + private static boolean searchWithCollectorManager(SearchContext searchContext, ContextIndexSearcher searcher, Query query, + CheckedConsumer, IOException> leafSorter, boolean timeoutSet) throws IOException { + final IndexReader reader = searchContext.searcher().getIndexReader(); + final int numHits = Math.min(searchContext.from() + searchContext.size(), Math.max(1, reader.numDocs())); + final SortAndFormats sortAndFormats = searchContext.sort(); + + int totalHitsThreshold; + TotalHits totalHits; + if (searchContext.trackTotalHitsUpTo() == SearchContext.TRACK_TOTAL_HITS_DISABLED) { + totalHitsThreshold = 1; + totalHits = new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); + } else { + int hitCount = shortcutTotalHitCount(reader, query); + if (hitCount == -1) { + totalHitsThreshold = searchContext.trackTotalHitsUpTo(); + totalHits = null; // will be computed via the collector + } else { + totalHitsThreshold = 1; + totalHits = new TotalHits(hitCount, TotalHits.Relation.EQUAL_TO); // don't compute hit counts via the collector + } + } + + CollectorManager manager = new CollectorManager<>() { + @Override + public TopFieldCollector newCollector() throws IOException { + return TopFieldCollector.create(sortAndFormats.sort, numHits, null, totalHitsThreshold); + } + @Override + public Void reduce(Collection collectors) throws IOException { + TopFieldDocs[] topDocsArr = new TopFieldDocs[collectors.size()]; + int i = 0; + for (TopFieldCollector collector : collectors) { + topDocsArr[i++] = collector.topDocs(); + } + // we have to set setShardIndex to true, as Lucene can't have ScoreDocs without shardIndex set + TopFieldDocs mergedTopDocs = TopDocs.merge(sortAndFormats.sort, 0, numHits, topDocsArr, true); + // reset shard index for all topDocs; ES will set shard index later during reduce stage + for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) { + scoreDoc.shardIndex = -1; + } + if (totalHits != null) { // we have already precalculated totalHits for the whole index + mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields); + } + searchContext.queryResult().topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), sortAndFormats.formats); + return null; + } + }; + + List leaves = new ArrayList<>(searcher.getIndexReader().leaves()); + leafSorter.accept(leaves); + try { + Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.TOP_SCORES, 1f); + searcher.search(leaves, weight, manager); + } catch (TimeExceededException e) { + assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set"; + if (searchContext.request().allowPartialSearchResults() == false) { + // Can't rethrow TimeExceededException because not serializable + throw new QueryPhaseExecutionException(searchContext, "Time exceeded"); + } + searchContext.queryResult().searchTimedOut(true); + } finally { + searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); + } + return false; // no rescoring when sorting by field + } + private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader reader, Query query, boolean hasFilterCollector) throws IOException { if (searchContext.searchAfter() != null) return null; @@ -399,7 +471,7 @@ private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader if (missingValuesAccordingToSort == false) return null; int docCount = PointValues.getDocCount(reader, fieldName); - // is not worth to run optimization on small index + // is not worth to run optimization on small index if (docCount <= 512) return null; // check for multiple values diff --git a/server/src/test/java/org/elasticsearch/search/SearchCancellationTests.java b/server/src/test/java/org/elasticsearch/search/SearchCancellationTests.java index 4dbacc8ec87de..cdbe140b0f83c 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchCancellationTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchCancellationTests.java @@ -22,14 +22,11 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.TotalHitCountCollector; -import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.TestUtil; @@ -88,13 +85,9 @@ public void testCancellableCollector() throws IOException { throw new TaskCancelledException("cancelled"); } }); - LeafReaderContext leafContext = reader.leaves().get(0); - final Weight weight = searcher.createWeight(new MatchAllDocsQuery(), ScoreMode.COMPLETE, 1f); - searcher.searchLeaf(searcher.getIndexReader().leaves().get(0), weight, collector); - assertThat(collector.getTotalHits(), equalTo(leafContext.reader().numDocs())); + searcher.search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), equalTo(reader.numDocs())); cancelled.set(true); - expectThrows(TaskCancelledException.class, - () -> searcher.searchLeaf(searcher.getIndexReader().leaves().get(0), weight, collector)); expectThrows(TaskCancelledException.class, () -> searcher.search(new MatchAllDocsQuery(), collector)); } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index cfb1083a9cbae..b1da27ecbd60d 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -42,6 +42,7 @@ import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.FieldComparator; @@ -886,14 +887,9 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) { @Override - protected void search(List leaves, Weight weight, Collector collector) throws IOException { - throw new AssertionError(); - } - - @Override - public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException { - collector = new AssertingEarlyTerminationFilterCollector(collector, size); - super.searchLeaf(ctx, weight, collector); + public void search(List leaves, Weight weight, Collector collector) throws IOException { + final Collector in = new AssertingEarlyTerminationFilterCollector(collector, size); + super.search(leaves, weight, in); } }; } @@ -904,12 +900,7 @@ private static ContextIndexSearcher newOptimizedContextSearcher(IndexReader read IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) { @Override - public void search(Query query, Collector results) throws IOException { - throw new AssertionError(); - } - - @Override - public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException { + public void search(List leaves, Weight weight, CollectorManager manager) throws IOException { final Query query = weight.getQuery(); assertTrue(query instanceof BooleanQuery); List clauses = ((BooleanQuery) query).clauses(); @@ -922,7 +913,7 @@ public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector ); } if (queryType == 1) assertTrue(clauses.get(1).getQuery() instanceof DocValuesFieldExistsQuery); - super.searchLeaf(ctx, weight, collector); + super.search(leaves, weight, manager); } }; }