Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure validation of the reader context is executed first #61831

Merged
merged 11 commits into from
Sep 7, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -546,24 +546,27 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause)
}

/**
* This method should be called if a search phase failed to ensure all relevant search contexts and resources are released.
* this method will also notify the listener and sends back a failure to the user.
* This method should be called if a search phase failed to ensure all relevant reader contexts are released.
* This method will also notify the listener and sends back a failure to the user.
*
* @param exception the exception explaining or causing the phase failure
*/
private void raisePhaseFailure(SearchPhaseExecutionException exception) {
results.getSuccessfulResults().forEach((entry) -> {
if (entry.getContextId() != null) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
// we don't release persistent readers (point in time).
if (request.pointInTimeBuilder() == null) {
results.getSuccessfulResults().forEach((entry) -> {
if (entry.getContextId() != null) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
}
}
}
});
});
}
listener.onFailure(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ public void onFailure(Exception exception) {
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
if (context.getRequest().pointInTimeBuilder() == null) {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ public void onFailure(Exception e) {
* Releases shard targets that are not used in the docsIdsToLoad.
*/
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// we only release search context that we did not fetch from if we are not scrolling
// and if it has at lease one hit that didn't make it to the global topDocs
if (context.getRequest().scroll() == null &&
context.getRequest().pointInTimeBuilder() == null &&
queryResult.hasSearchContext()) {
// we only release search context that we did not fetch from, if we are not scrolling
// or using a PIT and if it has at least one hit that didn't make it to the global topDocs
if (queryResult.hasSearchContext()
&& context.getRequest().scroll() == null
&& context.getRequest().pointInTimeBuilder() == null) {
try {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ default void onFreeScrollContext(ReaderContext readerContext) {}
* @param readerContext The reader context used by this request.
* @param transportRequest the request that is going to use the search context
*/
default void validateSearchContext(ReaderContext readerContext, TransportRequest transportRequest) {}
default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
Expand Down Expand Up @@ -238,11 +238,11 @@ public void onFreeScrollContext(ReaderContext readerContext) {
}

@Override
public void validateSearchContext(ReaderContext readerContext, TransportRequest request) {
public void validateReaderContext(ReaderContext readerContext, TransportRequest request) {
Exception exception = null;
for (SearchOperationListener listener : listeners) {
try {
listener.validateSearchContext(readerContext, request);
listener.validateReaderContext(readerContext, request);
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
Expand Down
Loading