Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Searching: Support Early Termination #2586

Closed
reta opened this issue Mar 24, 2022 · 3 comments · Fixed by #8306
Closed

Concurrent Searching: Support Early Termination #2586

reta opened this issue Mar 24, 2022 · 3 comments · Fixed by #8306
Assignees
Labels
enhancement Enhancement or improvement to existing feature or request Indexing & Search v2.9.0 'Issues and PRs related to version v2.9.0'

Comments

@reta
Copy link
Collaborator

reta commented Mar 24, 2022

Is your feature request related to a problem? Please describe.
Early termination and time-bounded search are exception-driven and may return partial results (whatever collected up to the point of termination). When search goes over segments concurrently, it is difficult to replicate the sequential behaviour as-is (in this case the flow is interrupted and the reducers are not available, no results).

Describe the solution you'd like
Find the way to propagate early termination and time-bounded search conditions in case of concurrent segments traversal, without introducing the additional synchronization.

Describe alternatives you've considered
The early termination and time-bounded search do not return partial results right now.

Additional context
Add any other context or screenshots about the feature request here.

@jed326
Copy link
Collaborator

jed326 commented Jun 20, 2023

Hey @reta what's the status of this issue? I saw you have a draft PR #4906 where it looks like you took a crack at using the Lucene IndexSearcher timeout but ultimately put it off. If you're not actively working on this do you mind if I pick it up?

@reta
Copy link
Collaborator Author

reta commented Jun 20, 2023

Hey @jed326 , yeah sure, please go ahead, the issue with Lucene's IndexSearcher timeout was related to very closed APIs, it was difficult to extract that from IndexSearcher since all methods were either private or package private (and I didn't want to copy/paste)

@anasalkouz anasalkouz moved this to Todo in Concurrent Search Jun 22, 2023
@jed326 jed326 moved this from Todo to In Progress in Concurrent Search Jun 26, 2023
@jed326
Copy link
Collaborator

jed326 commented Jun 27, 2023

Sharing some more detailed thoughts on this below


Problem Overview

Currently, if the query phase fails for whatever reason, no partial results will be returned for the failed shard in the concurrent search case. If an exception is thrown during searcher.search()

final ReduceableSearchResult result = searcher.search(query, collectorManager);
in the query phase searcher then reduce will not get called and no TopDocs will be collected.

Going a level deeper, in Lucene whenever the search thread is blocking on the index searcher threads, if any of the threads encounter an exception then we will not add the collectors for the rest of the slice segments. This means that even if we call reduce on all the collectors from the query phase, we will still not get the "full" partial results from the concurrent executions.

Lucene Timeout Implementation

This was already explored as a part of #4906 and #4487 but I took another look for my own understanding.
Looking at the latest Lucene code, I see a few issues with going down this path

  • The biggest problem is that TimeLimitingBulkScorer.TimeExceededException is package private. Since we are overriding IndexSearcher.search with our own implementation
    @Override
    protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
    if (shouldReverseLeafReaderContexts()) {
    // reverse the segment search order if this flag is true.
    // Certain queries can benefit if we reverse the segment read order,
    // for example time series based queries if searched for desc sort order.
    for (int i = leaves.size() - 1; i >= 0; i--) {
    searchLeaf(leaves.get(i), weight, collector);
    }
    } else {
    for (int i = 0; i < leaves.size(); i++) {
    searchLeaf(leaves.get(i), weight, collector);
    }
    }
    }
    /**
    * Lower-level search API.
    *
    * {@link LeafCollector#collect(int)} is called for every matching document in
    * the provided <code>ctx</code>.
    */
    private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
    // Check if at all we need to call this leaf for collecting results.
    if (canMatch(ctx) == false) {
    return;
    }
    cancellable.checkCancelled();
    weight = wrapWeight(weight);
    // See please https://github.com/apache/lucene/pull/964
    collector.setWeight(weight);
    final LeafCollector leafCollector;
    try {
    leafCollector = collector.getLeafCollector(ctx);
    } catch (CollectionTerminatedException e) {
    // there is no doc of interest in this reader context
    // continue with the following leaf
    return;
    }
    Bits liveDocs = ctx.reader().getLiveDocs();
    BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
    if (liveDocsBitSet == null) {
    BulkScorer bulkScorer = weight.bulkScorer(ctx);
    if (bulkScorer != null) {
    try {
    bulkScorer.score(leafCollector, liveDocs);
    } catch (CollectionTerminatedException e) {
    // collection was terminated prematurely
    // continue with the following leaf
    }
    }
    } else {
    // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
    Scorer scorer = weight.scorer(ctx);
    if (scorer != null) {
    try {
    intersectScorerAndBitSet(
    scorer,
    liveDocsBitSet,
    leafCollector,
    this.cancellable.isEnabled() ? cancellable::checkCancelled : () -> {}
    );
    } catch (CollectionTerminatedException e) {
    // collection was terminated prematurely
    // continue with the following leaf
    }
    }
    }
    }
    , that makes it difficult for us to catch the correct exception in our implementation.
  • The timeout is currently only done by the TimeLimitingBulkScorer, which means that timeout is only checked in the collection phase currently. On the other hand, the OpenSearch timeout implementation is done with a runnable that checks for timeouts
    /**
    * Create runnable which throws {@link TimeExceededException} when the runnable is called after timeout + runnable creation time
    * exceeds currentTime
    * @param searchContext to extract timeout from and to get relative time from
    * @return the created runnable
    */
    static Runnable createQueryTimeoutChecker(final SearchContext searchContext) {
    /* for startTime, relative non-cached precise time must be used to prevent false positive timeouts.
    * Using cached time for startTime will fail and produce false positive timeouts when maxTime = (startTime + timeout) falls in
    * next time cache slot(s) AND time caching lifespan > passed timeout */
    final long startTime = searchContext.getRelativeTimeInMillis(false);
    final long maxTime = startTime + searchContext.timeout().millis();
    return () -> {
    /* As long as startTime is non cached time, using cached time here might only produce false negative timeouts within the time
    * cache life span which is acceptable */
    final long time = searchContext.getRelativeTimeInMillis();
    if (time > maxTime) {
    throw new TimeExceededException();
    }
    };
    }
  • This also does not solve for partial results in the early termination case because a different exception is thrown for that and we need to handle both cases
    try {
    searcher.search(query, queryCollector);
    } catch (EarlyTerminatingCollector.EarlyTerminationException e) {
    queryResult.terminatedEarly(true);
    } catch (TimeExceededException e) {
    assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
    if (searchContext.request().allowPartialSearchResults() == false) {
    // Can't rethrow TimeExceededException because not serializable
    throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
    }
    queryResult.searchTimedOut(true);
    }
    if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
    queryResult.terminatedEarly(false);
    }
    for (QueryCollectorContext ctx : collectors) {
    ctx.postProcess(queryResult);
    }

Exception Handling

For now I believe the best solution is to handle the exceptions in ContextIndexSearcher and keep track of if the search has timed out. It looks like we are already doing something similar with CollectionTerminatedException (

} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
).
The difficulty here is coming up with a way to propagate the the timeout without using the exception. I’ll list out the places I’ve thought of below:

  • We could provide our own implementations of the concurrent search search() methods in IndexSearcher, however that is going to require copying a lot of code since a lot of the dependency methods are package private. It would however be very straightforward to simply catch and throw the timeout exception outside of the blocking for loop.
  • In ContextIndexSearcher.searchLeaf we have access to searchContext, so we should be able to catch the exception there and set the timeout parameter. This seems pretty straightforward to me so I'm planning on following up with a PR for this implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing & Search v2.9.0 'Issues and PRs related to version v2.9.0'
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants