Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure either success or failure path for SearchOperationListener is called #37467

Merged
merged 9 commits into from
Jan 23, 2019
171 changes: 95 additions & 76 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchTask;
Expand Down Expand Up @@ -329,7 +328,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask t
} catch (Exception e) {
logger.trace("Dfs phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand Down Expand Up @@ -380,29 +379,24 @@ protected void doRun() {
});
}

private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
boolean queryPhaseSuccess = false;
try {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);

loadOrExecuteQueryPhase(request, context);

if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
contextProcessing(context);
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
afterQueryTime = executor.success();
}
final long afterQueryTime = System.nanoTime();
queryPhaseSuccess = true;
operationListener.onQueryPhase(context, afterQueryTime - time);
if (request.numberOfShards() == 1) {
return executeFetchPhase(context, operationListener, afterQueryTime);
return executeFetchPhase(context, afterQueryTime);
}
return context.queryResult();
} catch (Exception e) {
Expand All @@ -411,56 +405,44 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
}
if (!queryPhaseSuccess) {
operationListener.onFailedQueryPhase(context);
}
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
}

private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener,
long afterQueryTime) {
operationListener.onPreFetchPhase(context);
try {
private QueryFetchSearchResult executeFetchPhase(SearchContext context, long afterQueryTime) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
} catch (Exception e) {
operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
executor.success();
}
operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}

public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener<ScrollQuerySearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
processScroll(request, context);
queryPhase.execute(context);
contextProcessedSuccessfully(context);
operationListener.onQueryPhase(context, System.nanoTime() - time);
executor.success();
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
} catch (Exception e) {
operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand All @@ -471,29 +453,23 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
context.setTask(task);
IndexShard indexShard = context.indexShard();
SearchOperationListener operationListener = indexShard.getSearchOperationListener();
context.incRef();
try {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());

operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
// no hits, we can release the context since there will be no fetch phase
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
operationListener.onQueryPhase(context, System.nanoTime() - time);
executor.success();
return context.queryResult();
} catch (Exception e) {
operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand Down Expand Up @@ -527,28 +503,19 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
ActionListener<ScrollQueryFetchSearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
context.setTask(task);
context.incRef();
try {
context.setTask(task);
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){
contextProcessing(context);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
processScroll(request, context);
operationListener.onPreQueryPhase(context);
final long time = System.nanoTime();
try {
queryPhase.execute(context);
} catch (Exception e) {
operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
long afterQueryTime = System.nanoTime();
operationListener.onQueryPhase(context, afterQueryTime - time);
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
queryPhase.execute(context);
final long afterQueryTime = executor.success();
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime);
return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget());
} catch (Exception e) {
logger.trace("Fetch phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand All @@ -558,7 +525,6 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener<FetchSearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
context.setTask(task);
Expand All @@ -567,21 +533,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchTask task, Action
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
}
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
operationListener.onPreFetchPhase(context);
long time = System.nanoTime();
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
}
executor.success();
}
operationListener.onFetchPhase(context, System.nanoTime() - time);
return context.fetchResult();
} catch (Exception e) {
operationListener.onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand Down Expand Up @@ -661,7 +626,7 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException
context.lowLevelCancellation(lowLevelCancellation);
} catch (Exception e) {
context.close();
throw ExceptionsHelper.convertToRuntime(e);
throw e;
}

return context;
Expand Down Expand Up @@ -733,7 +698,7 @@ public void freeAllScrollContexts() {
}
}

private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
private void contextScrollKeepAlive(SearchContext context, long keepAlive) {
if (keepAlive > maxKeepAlive) {
throw new IllegalArgumentException(
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " +
Expand Down Expand Up @@ -986,7 +951,7 @@ private void shortcutDocIdsToLoad(SearchContext context) {
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
}

private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
// process scroll
context.from(context.from() + context.size());
context.scrollContext().scroll = request.scroll();
Expand Down Expand Up @@ -1142,4 +1107,58 @@ public boolean canMatch() {
return canMatch;
}
}

/**
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
*/
private static final class SearchOperationListenerExecutor implements AutoCloseable {
private final SearchOperationListener listener;
private final SearchContext context;
private final long time;
private final boolean fetch;
private long afterQueryTime = -1;
private boolean closed = false;

SearchOperationListenerExecutor(SearchContext context) {
this(context, false, System.nanoTime());
}

SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
this.listener = context.indexShard().getSearchOperationListener();
this.context = context;
time = startTime;
this.fetch = fetch;
if (fetch) {
listener.onPreFetchPhase(context);
} else {
listener.onPreQueryPhase(context);
}
}

long success() {
return afterQueryTime = System.nanoTime();
}

@Override
public void close() {
assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case";
if (closed == false) {
s1monw marked this conversation as resolved.
Show resolved Hide resolved
closed = true;
if (afterQueryTime != -1) {
if (fetch) {
listener.onFetchPhase(context, afterQueryTime - time);
} else {
listener.onQueryPhase(context, afterQueryTime - time);
}
} else {
if (fetch) {
listener.onFailedFetchPhase(context);
} else {
listener.onFailedQueryPhase(context);
}
}
}
}
}
}