diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 4fee684276288..e49973dc07082 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,7 +24,6 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchTask; @@ -329,7 +328,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask t } catch (Exception e) { logger.trace("Dfs phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -380,29 +379,24 @@ protected void doRun() { }); } - private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { + private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception { final SearchContext context = createAndPutContext(request); - final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); - boolean queryPhaseSuccess = false; try { context.setTask(task); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); - contextProcessing(context); - - loadOrExecuteQueryPhase(request, context); - - if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + final long afterQueryTime; + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + contextProcessing(context); + loadOrExecuteQueryPhase(request, context); + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); + } + afterQueryTime = executor.success(); } - final long afterQueryTime = System.nanoTime(); - queryPhaseSuccess = true; - operationListener.onQueryPhase(context, afterQueryTime - time); if (request.numberOfShards() == 1) { - return executeFetchPhase(context, operationListener, afterQueryTime); + return executeFetchPhase(context, afterQueryTime); } return context.queryResult(); } catch (Exception e) { @@ -411,21 +405,16 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa e = (e.getCause() == null || e.getCause() instanceof Exception) ? (Exception) e.getCause() : new ElasticsearchException(e.getCause()); } - if (!queryPhaseSuccess) { - operationListener.onFailedQueryPhase(context); - } logger.trace("Query phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } } - private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener, - long afterQueryTime) { - operationListener.onPreFetchPhase(context); - try { + private QueryFetchSearchResult executeFetchPhase(SearchContext context, long afterQueryTime) { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){ shortcutDocIdsToLoad(context); fetchPhase.execute(context); if (fetchPhaseShouldFreeContext(context)) { @@ -433,34 +422,27 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp } else { contextProcessedSuccessfully(context); } - } catch (Exception e) { - operationListener.onFailedFetchPhase(context); - throw ExceptionsHelper.convertToRuntime(e); + executor.success(); } - operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); - try { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { context.setTask(task); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); contextProcessing(context); processScroll(request, context); queryPhase.execute(context); contextProcessedSuccessfully(context); - operationListener.onQueryPhase(context, System.nanoTime() - time); + executor.success(); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (Exception e) { - operationListener.onFailedQueryPhase(context); logger.trace("Query phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -471,15 +453,10 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.setTask(task); - IndexShard indexShard = context.indexShard(); - SearchOperationListener operationListener = indexShard.getSearchOperationListener(); context.incRef(); - try { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { contextProcessing(context); context.searcher().setAggregatedDfs(request.dfs()); - - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); queryPhase.execute(context); if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase @@ -487,13 +464,12 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio } else { contextProcessedSuccessfully(context); } - operationListener.onQueryPhase(context, System.nanoTime() - time); + executor.success(); return context.queryResult(); } catch (Exception e) { - operationListener.onFailedQueryPhase(context); logger.trace("Query phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -527,28 +503,19 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); + context.setTask(task); context.incRef(); - try { - context.setTask(task); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){ contextProcessing(context); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); processScroll(request, context); - operationListener.onPreQueryPhase(context); - final long time = System.nanoTime(); - try { - queryPhase.execute(context); - } catch (Exception e) { - operationListener.onFailedQueryPhase(context); - throw ExceptionsHelper.convertToRuntime(e); - } - long afterQueryTime = System.nanoTime(); - operationListener.onQueryPhase(context, afterQueryTime - time); - QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime); + queryPhase.execute(context); + final long afterQueryTime = executor.success(); + QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget()); } catch (Exception e) { logger.trace("Fetch phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -558,7 +525,6 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); - final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); try { context.setTask(task); @@ -567,21 +533,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchTask task, Action context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); - operationListener.onPreFetchPhase(context); - long time = System.nanoTime(); - fetchPhase.execute(context); - if (fetchPhaseShouldFreeContext(context)) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) { + fetchPhase.execute(context); + if (fetchPhaseShouldFreeContext(context)) { + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); + } + executor.success(); } - operationListener.onFetchPhase(context, System.nanoTime() - time); return context.fetchResult(); } catch (Exception e) { - operationListener.onFailedFetchPhase(context); logger.trace("Fetch phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -661,7 +626,7 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException context.lowLevelCancellation(lowLevelCancellation); } catch (Exception e) { context.close(); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } return context; @@ -733,7 +698,7 @@ public void freeAllScrollContexts() { } } - private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException { + private void contextScrollKeepAlive(SearchContext context, long keepAlive) { if (keepAlive > maxKeepAlive) { throw new IllegalArgumentException( "Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " + @@ -986,7 +951,7 @@ private void shortcutDocIdsToLoad(SearchContext context) { context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); } - private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException { + private void processScroll(InternalScrollSearchRequest request, SearchContext context) { // process scroll context.from(context.from() + context.size()); context.scrollContext().scroll = request.scroll(); @@ -1142,4 +1107,58 @@ public boolean canMatch() { return canMatch; } } + + /** + * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}. + * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}. + */ + private static final class SearchOperationListenerExecutor implements AutoCloseable { + private final SearchOperationListener listener; + private final SearchContext context; + private final long time; + private final boolean fetch; + private long afterQueryTime = -1; + private boolean closed = false; + + SearchOperationListenerExecutor(SearchContext context) { + this(context, false, System.nanoTime()); + } + + SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) { + this.listener = context.indexShard().getSearchOperationListener(); + this.context = context; + time = startTime; + this.fetch = fetch; + if (fetch) { + listener.onPreFetchPhase(context); + } else { + listener.onPreQueryPhase(context); + } + } + + long success() { + return afterQueryTime = System.nanoTime(); + } + + @Override + public void close() { + assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case"; + if (closed == false) { + closed = true; + if (afterQueryTime != -1) { + if (fetch) { + listener.onFetchPhase(context, afterQueryTime - time); + } else { + listener.onQueryPhase(context, afterQueryTime - time); + } + } else { + if (fetch) { + listener.onFailedFetchPhase(context); + } else { + listener.onFailedQueryPhase(context); + } + } + } + } + } }