Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use collector manager for search when necessary #45829

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ConjunctionDISI;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
Expand All @@ -54,6 +55,7 @@
import org.elasticsearch.search.profile.query.QueryTimingType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -138,6 +140,17 @@ private void checkCancelled() {
}
}

public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager) throws IOException {
final List<Collector> collectors = new ArrayList<>(leaves.size());
for (LeafReaderContext ctx : leaves) {
final Collector collector = manager.newCollector();
//TODO: setMinCompetitveScore between Collectors
searchLeaf(ctx, weight, collector);
collectors.add(collector);
}
manager.reduce(collectors);
}

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
Expand All @@ -151,7 +164,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
* {@link LeafCollector#collect(int)} is called for every matching document in
* the provided <code>ctx</code>.
*/
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
checkCancelled();
weight = wrapWeight(weight);
final LeafCollector leafCollector;
Expand Down Expand Up @@ -228,6 +241,7 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
}
}


private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
if (liveDocs instanceof SparseFixedBitSet) {
return (BitSet) liveDocs;
Expand Down
174 changes: 123 additions & 51 deletions server/src/main/java/org/elasticsearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.Weight;
Expand Down Expand Up @@ -71,6 +74,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
Expand Down Expand Up @@ -236,7 +240,7 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
newFormats[0] = DocValueFormat.RAW;
// Add a tiebreak on _doc in order to be able to search
// the leaves in any order. This is needed since we reorder
// the leaves based on the minimum value in each segment.
// the leaves based on the minimum/maxim value in each segment.
newSortFields[newSortFields.length-1] = SortField.FIELD_DOC;
newFormats[newSortFields.length-1] = DocValueFormat.RAW;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need to tiebreak by global doc id because we reorder the leaves in searchWithCollectorManager ?

System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
Expand Down Expand Up @@ -286,61 +290,20 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
} else {
checkCancelled = null;
}

searcher.setCheckCancelled(checkCancelled);

final boolean doProfile = searchContext.getProfilers() != null;
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);

final Collector queryCollector;
if (doProfile) {
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
queryCollector = profileCollector;
boolean shouldRescore;
// if we are optimizing sort and there are no other collectors
if (sortAndFormatsForRewrittenNumericSort != null && collectors.size() == 0 && searchContext.getProfilers() == null) {
shouldRescore = searchWithCollectorManager(searchContext, searcher, query, leafSorter, timeoutSet);
} else {
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}

try {
Weight weight = searcher.createWeight(searcher.rewrite(query), queryCollector.scoreMode(), 1f);
// We search the leaves in a different order when the numeric sort optimization is
// activated. Collectors expect leaves in order when searching but this is fine in this
// case since we only have a TopFieldCollector and we force the tiebreak on _doc.
List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
leafSorter.accept(leaves);
for (LeafReaderContext ctx : leaves) {
searcher.searchLeaf(ctx, weight, queryCollector);
}
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";

if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
}
queryResult.searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER
&& queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}

final QuerySearchResult result = searchContext.queryResult();
for (QueryCollectorContext ctx : collectors) {
ctx.postProcess(result);
shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
}

// if we rewrote numeric long or date sort, restore fieldDocs based on the original sort
if (sortAndFormatsForRewrittenNumericSort != null) {
searchContext.sort(sortAndFormatsForRewrittenNumericSort); // restore SortAndFormats
restoreTopFieldDocs(result, sortAndFormatsForRewrittenNumericSort);
restoreTopFieldDocs(queryResult, sortAndFormatsForRewrittenNumericSort);
}

ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
Expand All @@ -351,14 +314,123 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
}
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
result.profileResults(shardResults);
queryResult.profileResults(shardResults);
}
return topDocsFactory.shouldRescore();
return shouldRescore;
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
}
}

private static boolean searchWithCollector(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
LinkedList<QueryCollectorContext> collectors, boolean hasFilterCollector, boolean timeoutSet) throws IOException {
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);

final Collector queryCollector;
if ( searchContext.getProfilers() != null) {
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
queryCollector = profileCollector;
} else {
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
}
queryResult.searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
for (QueryCollectorContext ctx : collectors) {
ctx.postProcess(queryResult);
}
return topDocsFactory.shouldRescore();
}

// we use collectorManager during sort optimization
// for the sort optimization, we have already checked that there are no other collectors, no filters,
// no search after, no scroll, no collapse, no track scores
// this means we can use TopFieldCollector directly
private static boolean searchWithCollectorManager(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter, boolean timeoutSet) throws IOException {
final IndexReader reader = searchContext.searcher().getIndexReader();
final int numHits = Math.min(searchContext.from() + searchContext.size(), Math.max(1, reader.numDocs()));
final SortAndFormats sortAndFormats = searchContext.sort();

int totalHitsThreshold;
TotalHits totalHits;
if (searchContext.trackTotalHitsUpTo() == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
totalHitsThreshold = 1;
totalHits = new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
} else {
int hitCount = shortcutTotalHitCount(reader, query);
if (hitCount == -1) {
totalHitsThreshold = searchContext.trackTotalHitsUpTo();
totalHits = null; // will be computed via the collector
} else {
totalHitsThreshold = 1;
totalHits = new TotalHits(hitCount, TotalHits.Relation.EQUAL_TO); // don't compute hit counts via the collector
}
}

CollectorManager<TopFieldCollector, Void> manager = new CollectorManager<>() {
@Override
public TopFieldCollector newCollector() throws IOException {
return TopFieldCollector.create(sortAndFormats.sort, numHits, null, totalHitsThreshold);
}
@Override
public Void reduce(Collection<TopFieldCollector> collectors) throws IOException {
TopFieldDocs[] topDocsArr = new TopFieldDocs[collectors.size()];
int i = 0;
for (TopFieldCollector collector : collectors) {
topDocsArr[i++] = collector.topDocs();
}
// we have to set setShardIndex to true, as Lucene can't have ScoreDocs without shardIndex set
TopFieldDocs mergedTopDocs = TopDocs.merge(sortAndFormats.sort, 0, numHits, topDocsArr, true);
// reset shard index for all topDocs; ES will set shard index later during reduce stage
for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) {
scoreDoc.shardIndex = -1;
}
if (totalHits != null) { // we have already precalculated totalHits for the whole index
mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields);
}
searchContext.queryResult().topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), sortAndFormats.formats);
return null;
}
};

List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
leafSorter.accept(leaves);
try {
Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.TOP_SCORES, 1f);
searcher.search(leaves, weight, manager);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
}
searchContext.queryResult().searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
return false; // no rescoring when sorting by field
}

private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader reader,
Query query, boolean hasFilterCollector) throws IOException {
if (searchContext.searchAfter() != null) return null;
Expand Down Expand Up @@ -399,7 +471,7 @@ private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader
if (missingValuesAccordingToSort == false) return null;

int docCount = PointValues.getDocCount(reader, fieldName);
// is not worth to run optimization on small index
// is not worth to run optimization on small index
if (docCount <= 512) return null;

// check for multiple values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.TestUtil;
Expand Down Expand Up @@ -88,13 +85,9 @@ public void testCancellableCollector() throws IOException {
throw new TaskCancelledException("cancelled");
}
});
LeafReaderContext leafContext = reader.leaves().get(0);
final Weight weight = searcher.createWeight(new MatchAllDocsQuery(), ScoreMode.COMPLETE, 1f);
searcher.searchLeaf(searcher.getIndexReader().leaves().get(0), weight, collector);
assertThat(collector.getTotalHits(), equalTo(leafContext.reader().numDocs()));
searcher.search(new MatchAllDocsQuery(), collector);
assertThat(collector.getTotalHits(), equalTo(reader.numDocs()));
cancelled.set(true);
expectThrows(TaskCancelledException.class,
() -> searcher.searchLeaf(searcher.getIndexReader().leaves().get(0), weight, collector));
expectThrows(TaskCancelledException.class,
() -> searcher.search(new MatchAllDocsQuery(), collector));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldComparator;
Expand Down Expand Up @@ -886,14 +887,9 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) {

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
throw new AssertionError();
}

@Override
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
collector = new AssertingEarlyTerminationFilterCollector(collector, size);
super.searchLeaf(ctx, weight, collector);
public void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
final Collector in = new AssertingEarlyTerminationFilterCollector(collector, size);
super.search(leaves, weight, in);
}
};
}
Expand All @@ -904,12 +900,7 @@ private static ContextIndexSearcher newOptimizedContextSearcher(IndexReader read
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) {

@Override
public void search(Query query, Collector results) throws IOException {
throw new AssertionError();
}

@Override
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager) throws IOException {
final Query query = weight.getQuery();
assertTrue(query instanceof BooleanQuery);
List<BooleanClause> clauses = ((BooleanQuery) query).clauses();
Expand All @@ -922,7 +913,7 @@ public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector
);
}
if (queryType == 1) assertTrue(clauses.get(1).getQuery() instanceof DocValuesFieldExistsQuery);
super.searchLeaf(ctx, weight, collector);
super.search(leaves, weight, manager);
}
};
}
Expand Down