Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use timeout support of the IndexSearcher instead of custom implementation #4906

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Copy `build.sh` over from opensearch-build ([#4887](https://github.com/opensearch-project/OpenSearch/pull/4887))
- Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843))
- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))
- Use timeout support of the IndexSearcher instead of custom implementation([#4906](https://github.com/opensearch-project/OpenSearch/pull/4906))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a "skip-changelog" case now since this is a refactoring and not a functional change, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andrross , have a loooong discussion with Apache Lucene team, will postpone this future a bit (for 2 reasons 1) there are a number of improvements in work 2) the way we use IndexSearcher does not allow us to use this feature as-is, without copying code


### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.QueryTimeoutImpl;
import org.opensearch.lucene.queries.MinDocQuery;
import org.opensearch.lucene.queries.SearchAfterSortedDocQuery;
import org.apache.lucene.search.BooleanClause;
Expand Down Expand Up @@ -88,6 +89,7 @@
* @opensearch.internal
*/
public class QueryPhase {
private static final Runnable NOOP_QUERY_CANCELLATION = () -> {}; /* noop, relying on IndexSearch::setTimeout */
private static final Logger LOGGER = LogManager.getLogger(QueryPhase.class);
// TODO: remove this property
public static final boolean SYS_PROP_REWRITE_SORT = Booleans.parseBoolean(System.getProperty("opensearch.search.rewrite_sort", "true"));
Expand Down Expand Up @@ -257,7 +259,8 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q

final Runnable timeoutRunnable;
if (timeoutSet) {
timeoutRunnable = searcher.addQueryCancellation(createQueryTimeoutChecker(searchContext));
searcher.setTimeout(new QueryTimeoutImpl(searchContext.timeout().millis()));
timeoutRunnable = searcher.addQueryCancellation(NOOP_QUERY_CANCELLATION);
} else {
timeoutRunnable = null;
}
Expand Down Expand Up @@ -301,28 +304,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
}
}

/**
* Create runnable which throws {@link TimeExceededException} when the runnable is called after timeout + runnable creation time
* exceeds currentTime
* @param searchContext to extract timeout from and to get relative time from
* @return the created runnable
*/
static Runnable createQueryTimeoutChecker(final SearchContext searchContext) {
/* for startTime, relative non-cached precise time must be used to prevent false positive timeouts.
* Using cached time for startTime will fail and produce false positive timeouts when maxTime = (startTime + timeout) falls in
* next time cache slot(s) AND time caching lifespan > passed timeout */
final long startTime = searchContext.getRelativeTimeInMillis(false);
final long maxTime = startTime + searchContext.timeout().millis();
return () -> {
/* As long as startTime is non cached time, using cached time here might only produce false negative timeouts within the time
* cache life span which is acceptable */
final long time = searchContext.getRelativeTimeInMillis();
if (time > maxTime) {
throw new TimeExceededException();
}
};
}

private static boolean searchWithCollector(
SearchContext searchContext,
ContextIndexSearcher searcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
Expand All @@ -106,7 +105,6 @@
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.test.TestSearchContext;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -119,14 +117,9 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.opensearch.search.query.TopDocsCollectorContext.hasInfMaxScore;

public class QueryPhaseTests extends IndexShardTestCase {
Expand Down Expand Up @@ -1086,58 +1079,6 @@ public void testCancellationDuringPreprocess() throws IOException {
}
}

public void testQueryTimeoutChecker() throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;

// should throw time exceed exception for sure after timeCacheLifespan*2+timeTolerance (next's next cached time is available)
assertThrows(
QueryPhase.TimeExceededException.class,
() -> createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan * 2 + timeTolerance, true, false)
);

// should not throw time exceed exception after timeCacheLifespan+timeTolerance because new cached time - init time < timeout
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan + timeTolerance, true, false);

// should not throw time exceed exception after timeout < timeCacheLifespan when cached time didn't change
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 2, timeCacheLifespan / 2 + timeTolerance, false, true);
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true);
}

private void createTimeoutCheckerThenWaitThenRun(
long timeout,
long sleepAfterCreation,
boolean checkCachedTimeChanged,
boolean checkCachedTimeHasNotChanged
) throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;
long currentTimeDiffWithCachedTime = TimeValue.nsecToMSec(System.nanoTime()) - threadPool.relativeTimeInMillis();
// need to run this test approximately at the start of cached time window
long timeToAlignTimeWithCachedTimeOffset = timeCacheLifespan - currentTimeDiffWithCachedTime + timeTolerance;
Thread.sleep(timeToAlignTimeWithCachedTimeOffset);

long initialRelativeCachedTime = threadPool.relativeTimeInMillis();
SearchContext mockedSearchContext = mock(SearchContext.class);
when(mockedSearchContext.timeout()).thenReturn(TimeValue.timeValueMillis(timeout));
when(mockedSearchContext.getRelativeTimeInMillis()).thenAnswer(invocation -> threadPool.relativeTimeInMillis());
when(mockedSearchContext.getRelativeTimeInMillis(eq(false))).thenCallRealMethod();
Runnable queryTimeoutChecker = QueryPhase.createQueryTimeoutChecker(mockedSearchContext);
// make sure next time slot become available
Thread.sleep(sleepAfterCreation);
if (checkCachedTimeChanged) {
assertNotEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
if (checkCachedTimeHasNotChanged) {
assertEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
queryTimeoutChecker.run();
verify(mockedSearchContext, times(1)).timeout();
verify(mockedSearchContext, times(1)).getRelativeTimeInMillis(eq(false));
verify(mockedSearchContext, atLeastOnce()).getRelativeTimeInMillis();
verifyNoMoreInteractions(mockedSearchContext);
}

private static class TestSearchContextWithRewriteAndCancellation extends TestSearchContext {

private TestSearchContextWithRewriteAndCancellation(
Expand Down