Skip to content

Commit

Permalink
Misc cleanups for query phase
Browse files Browse the repository at this point in the history
Avoiding some redundant computation in obvious spots, fixing compile
warnings and using a more specific listener in one spot to save memory
and indirection.
  • Loading branch information
original-brownbear committed Sep 4, 2024
1 parent 069a4d4 commit 03d4a47
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
}

@Override
@SuppressWarnings("rawtypes")
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
in.executeAsyncActions(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public InnerHitsRewriteContext convertToInnerHitsRewriteContext() {
}

@Override
@SuppressWarnings({ "rawtypes" })
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
// InnerHitsRewriteContext does not support async actions at all, and doesn't supply a valid `client` object
throw new UnsupportedOperationException("InnerHitsRewriteContext does not support async actions");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,12 @@ public boolean hasAsyncActions() {
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
* <code>null</code>. The list of registered actions is cleared once this method returns.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
if (asyncActions.isEmpty()) {
listener.onResponse(null);
} else {
CountDown countDown = new CountDown(asyncActions.size());
ActionListener<?> internalListener = new ActionListener() {
ActionListener<?> internalListener = new ActionListener<>() {
@Override
public void onResponse(Object o) {
if (countDown.countDown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,7 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
}

@Override
@SuppressWarnings("rawtypes")
public void executeAsyncActions(ActionListener listener) {
public void executeAsyncActions(ActionListener<Void> listener) {
failIfFrozen();
super.executeAsyncActions(listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,6 @@ final class DefaultSearchContext extends SearchContext {
this.indexShard = readerContext.indexShard();

Engine.Searcher engineSearcher = readerContext.acquireSearcher("search");
int maximumNumberOfSlices = determineMaximumNumberOfSlices(
executor,
request,
resultsType,
enableQueryPhaseParallelCollection,
field -> getFieldCardinality(field, readerContext.indexService(), engineSearcher.getDirectoryReader())
);
if (executor == null) {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
Expand All @@ -202,7 +195,13 @@ final class DefaultSearchContext extends SearchContext {
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor,
maximumNumberOfSlices,
determineMaximumNumberOfSlices(
executor,
request,
resultsType,
enableQueryPhaseParallelCollection,
field -> getFieldCardinality(field, readerContext.indexService(), engineSearcher.getDirectoryReader())
),
minimumDocsPerSlice
);
}
Expand Down
31 changes: 14 additions & 17 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1779,14 +1779,12 @@ private static boolean canMatchAfterRewrite(final ShardSearchRequest request, fi
@SuppressWarnings("unchecked")
public static boolean queryStillMatchesAfterRewrite(ShardSearchRequest request, QueryRewriteContext context) throws IOException {
Rewriteable.rewrite(request.getRewriteable(), context, false);
boolean canMatch = request.getAliasFilter().getQueryBuilder() instanceof MatchNoneQueryBuilder == false;
if (canRewriteToMatchNone(request.source())) {
canMatch &= request.source()
.subSearches()
.stream()
.anyMatch(sqwb -> sqwb.getQueryBuilder() instanceof MatchNoneQueryBuilder == false);
if (request.getAliasFilter().getQueryBuilder() instanceof MatchNoneQueryBuilder) {
return false;
}
return canMatch;
final var source = request.source();
return canRewriteToMatchNone(source) == false
|| source.subSearches().stream().anyMatch(sqwb -> sqwb.getQueryBuilder() instanceof MatchNoneQueryBuilder == false);
}

/**
Expand All @@ -1806,19 +1804,18 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings("unchecked")
private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
ActionListener<Rewriteable> actionListener = listener.delegateFailureAndWrap((l, r) -> {
if (request.readerId() != null) {
l.onResponse(request);
} else {
shard.ensureShardSearchActive(b -> l.onResponse(request));
}
});
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here.
// AliasFilters and other things may need to be rewritten on the data node, but not per individual shard.
// These are uncommon-cases but we are very efficient doing the rewrite here.
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getDataRewriteContext(request::nowInMillis), actionListener);
// These are uncommon-cases, but we are very efficient doing the rewrite here.
Rewriteable.rewriteAndFetch(
request.getRewriteable(),
indicesService.getDataRewriteContext(request::nowInMillis),
request.readerId() == null
? listener.delegateFailureAndWrap((l, r) -> shard.ensureShardSearchActive(b -> l.onResponse(request)))
: listener.safeMap(r -> request)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,22 +587,23 @@ public Rewriteable rewrite(QueryRewriteContext ctx) throws IOException {
SearchSourceBuilder newSource = request.source() == null ? null : Rewriteable.rewrite(request.source(), ctx);
AliasFilter newAliasFilter = Rewriteable.rewrite(request.getAliasFilter(), ctx);
SearchExecutionContext searchExecutionContext = ctx.convertToSearchExecutionContext();
FieldSortBuilder primarySort = FieldSortBuilder.getPrimaryFieldSortOrNull(newSource);
if (searchExecutionContext != null
&& primarySort != null
&& primarySort.isBottomSortShardDisjoint(searchExecutionContext, request.getBottomSortValues())) {
assert newSource != null : "source should contain a primary sort field";
newSource = newSource.shallowCopy();
int trackTotalHitsUpTo = SearchRequest.resolveTrackTotalHitsUpTo(request.scroll, request.source);
if (trackTotalHitsUpTo == TRACK_TOTAL_HITS_DISABLED && newSource.suggest() == null && newSource.aggregations() == null) {
newSource.query(new MatchNoneQueryBuilder());
} else {
newSource.size(0);
if (searchExecutionContext != null) {
final FieldSortBuilder primarySort = FieldSortBuilder.getPrimaryFieldSortOrNull(newSource);
if (primarySort != null && primarySort.isBottomSortShardDisjoint(searchExecutionContext, request.getBottomSortValues())) {
assert newSource != null : "source should contain a primary sort field";
newSource = newSource.shallowCopy();
int trackTotalHitsUpTo = SearchRequest.resolveTrackTotalHitsUpTo(request.scroll, request.source);
if (trackTotalHitsUpTo == TRACK_TOTAL_HITS_DISABLED
&& newSource.suggest() == null
&& newSource.aggregations() == null) {
newSource.query(new MatchNoneQueryBuilder());
} else {
newSource.size(0);
}
request.source(newSource);
request.setBottomSortValues(null);
}
request.source(newSource);
request.setBottomSortValues(null);
}

if (newSource == request.source() && newAliasFilter == request.getAliasFilter()) {
return this;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ public void testRewritePipelineAggregationUnderAggregation() throws Exception {
QueryRewriteContext context = new QueryRewriteContext(parserConfig(), null, () -> 0L);
AggregatorFactories.Builder rewritten = builder.rewrite(context);
CountDownLatch latch = new CountDownLatch(1);
context.executeAsyncActions(new ActionListener<Object>() {
context.executeAsyncActions(new ActionListener<>() {
@Override
public void onResponse(Object response) {
public void onResponse(Void aVoid) {
assertNotSame(builder, rewritten);
Collection<AggregationBuilder> aggregatorFactories = rewritten.getAggregatorFactories();
assertEquals(1, aggregatorFactories.size());
Expand All @@ -289,9 +289,9 @@ public void testRewriteAggregationAtTopLevel() throws Exception {
QueryRewriteContext context = new QueryRewriteContext(parserConfig(), null, () -> 0L);
AggregatorFactories.Builder rewritten = builder.rewrite(context);
CountDownLatch latch = new CountDownLatch(1);
context.executeAsyncActions(new ActionListener<Object>() {
context.executeAsyncActions(new ActionListener<>() {
@Override
public void onResponse(Object response) {
public void onResponse(Void aVoid) {
assertNotSame(builder, rewritten);
PipelineAggregationBuilder rewrittenPipeline = rewritten.getPipelineAggregatorFactories().iterator().next();
assertThat(((RewrittenPipelineAggregationBuilder) rewrittenPipeline).setOnRewrite.get(), equalTo("rewritten"));
Expand Down

0 comments on commit 03d4a47

Please sign in to comment.