Skip to content

Commit

Permalink
Use timeout support of the IndexSearcher instead of custom implementa…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Oct 25, 2022
1 parent 1a40bd5 commit 18da1fc
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 82 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Copy `build.sh` over from opensearch-build ([#4887](https://github.com/opensearch-project/OpenSearch/pull/4887))
- Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843))
- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))
- Use timeout support of the IndexSearcher instead of custom implementation([#4906](https://github.com/opensearch-project/OpenSearch/pull/4906))

>>>>>>> 582840ac891 (Use timeout support of the IndexSearcher instead of custom implementation)
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
27 changes: 4 additions & 23 deletions server/src/main/java/org/opensearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.QueryTimeoutImpl;
import org.opensearch.lucene.queries.MinDocQuery;
import org.opensearch.lucene.queries.SearchAfterSortedDocQuery;
import org.apache.lucene.search.BooleanClause;
Expand Down Expand Up @@ -88,6 +89,7 @@
* @opensearch.internal
*/
public class QueryPhase {
private static final Runnable NOOP_QUERY_CANCELLATION = () -> {}; /* noop, relying on IndexSearch::setTimeout */
private static final Logger LOGGER = LogManager.getLogger(QueryPhase.class);
// TODO: remove this property
public static final boolean SYS_PROP_REWRITE_SORT = Booleans.parseBoolean(System.getProperty("opensearch.search.rewrite_sort", "true"));
Expand Down Expand Up @@ -257,7 +259,8 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q

final Runnable timeoutRunnable;
if (timeoutSet) {
timeoutRunnable = searcher.addQueryCancellation(createQueryTimeoutChecker(searchContext));
searcher.setTimeout(new QueryTimeoutImpl(searchContext.timeout().millis()));
timeoutRunnable = searcher.addQueryCancellation(NOOP_QUERY_CANCELLATION);
} else {
timeoutRunnable = null;
}
Expand Down Expand Up @@ -301,28 +304,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
}
}

/**
* Create runnable which throws {@link TimeExceededException} when the runnable is called after timeout + runnable creation time
* exceeds currentTime
* @param searchContext to extract timeout from and to get relative time from
* @return the created runnable
*/
static Runnable createQueryTimeoutChecker(final SearchContext searchContext) {
/* for startTime, relative non-cached precise time must be used to prevent false positive timeouts.
* Using cached time for startTime will fail and produce false positive timeouts when maxTime = (startTime + timeout) falls in
* next time cache slot(s) AND time caching lifespan > passed timeout */
final long startTime = searchContext.getRelativeTimeInMillis(false);
final long maxTime = startTime + searchContext.timeout().millis();
return () -> {
/* As long as startTime is non cached time, using cached time here might only produce false negative timeouts within the time
* cache life span which is acceptable */
final long time = searchContext.getRelativeTimeInMillis();
if (time > maxTime) {
throw new TimeExceededException();
}
};
}

private static boolean searchWithCollector(
SearchContext searchContext,
ContextIndexSearcher searcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
Expand All @@ -106,7 +105,6 @@
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.test.TestSearchContext;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -119,14 +117,9 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.opensearch.search.query.TopDocsCollectorContext.hasInfMaxScore;

public class QueryPhaseTests extends IndexShardTestCase {
Expand Down Expand Up @@ -1086,58 +1079,6 @@ public void testCancellationDuringPreprocess() throws IOException {
}
}

public void testQueryTimeoutChecker() throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;

// should throw time exceed exception for sure after timeCacheLifespan*2+timeTolerance (next's next cached time is available)
assertThrows(
QueryPhase.TimeExceededException.class,
() -> createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan * 2 + timeTolerance, true, false)
);

// should not throw time exceed exception after timeCacheLifespan+timeTolerance because new cached time - init time < timeout
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan + timeTolerance, true, false);

// should not throw time exceed exception after timeout < timeCacheLifespan when cached time didn't change
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 2, timeCacheLifespan / 2 + timeTolerance, false, true);
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true);
}

private void createTimeoutCheckerThenWaitThenRun(
long timeout,
long sleepAfterCreation,
boolean checkCachedTimeChanged,
boolean checkCachedTimeHasNotChanged
) throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;
long currentTimeDiffWithCachedTime = TimeValue.nsecToMSec(System.nanoTime()) - threadPool.relativeTimeInMillis();
// need to run this test approximately at the start of cached time window
long timeToAlignTimeWithCachedTimeOffset = timeCacheLifespan - currentTimeDiffWithCachedTime + timeTolerance;
Thread.sleep(timeToAlignTimeWithCachedTimeOffset);

long initialRelativeCachedTime = threadPool.relativeTimeInMillis();
SearchContext mockedSearchContext = mock(SearchContext.class);
when(mockedSearchContext.timeout()).thenReturn(TimeValue.timeValueMillis(timeout));
when(mockedSearchContext.getRelativeTimeInMillis()).thenAnswer(invocation -> threadPool.relativeTimeInMillis());
when(mockedSearchContext.getRelativeTimeInMillis(eq(false))).thenCallRealMethod();
Runnable queryTimeoutChecker = QueryPhase.createQueryTimeoutChecker(mockedSearchContext);
// make sure next time slot become available
Thread.sleep(sleepAfterCreation);
if (checkCachedTimeChanged) {
assertNotEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
if (checkCachedTimeHasNotChanged) {
assertEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
queryTimeoutChecker.run();
verify(mockedSearchContext, times(1)).timeout();
verify(mockedSearchContext, times(1)).getRelativeTimeInMillis(eq(false));
verify(mockedSearchContext, atLeastOnce()).getRelativeTimeInMillis();
verifyNoMoreInteractions(mockedSearchContext);
}

private static class TestSearchContextWithRewriteAndCancellation extends TestSearchContext {

private TestSearchContextWithRewriteAndCancellation(
Expand Down

0 comments on commit 18da1fc

Please sign in to comment.