Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Modify logging message in WorkloadGroupService ([#18712](https://github.com/opensearch-project/OpenSearch/pull/18712))
- Add BooleanQuery rewrite moving constant-scoring must clauses to filter clauses ([#18510](https://github.com/opensearch-project/OpenSearch/issues/18510))
- Add functionality for plugins to inject QueryCollectorContext during QueryPhase ([#18637](https://github.com/opensearch-project/OpenSearch/pull/18637))
- Add QueryPhaseListener interface for pre/post collection hooks ([#17593](https://github.com/opensearch-project/OpenSearch/issues/17593))
- Add support for non-timing info in profiler ([#18460](https://github.com/opensearch-project/OpenSearch/issues/18460))
- [Rule-based auto tagging] Bug fix and improvements ([#18726](https://github.com/opensearch-project/OpenSearch/pull/18726))
- Extend Approximation Framework to other numeric types ([#18530](https://github.com/opensearch-project/OpenSearch/issues/18530))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.Query;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;

/**
* Abstract base class for QueryPhaseSearcher implementations that provides
* extension hook execution logic using the template pattern.
*
* @opensearch.internal
*/
@InternalApi
public abstract class AbstractQueryPhaseSearcher implements QueryPhaseSearcher {

@Override
public final boolean searchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
List<QueryPhaseListener> listeners = queryPhaseListeners();

// Execute beforeCollection listeners
for (QueryPhaseListener listener : listeners) {
listener.beforeCollection(searchContext);
}

boolean shouldRescore = doSearchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
// Execute afterCollection listeners
for (QueryPhaseListener listener : listeners) {
listener.afterCollection(searchContext);
}
return shouldRescore;
}

/**
* Template method for actual search implementation.
* Subclasses must implement this to define their specific search behavior.
*/
protected abstract boolean doSearchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException;

/**
* Common method to create QueryCollectorContext that can be used by all implementations.
*/
protected QueryCollectorContext getQueryCollectorContext(SearchContext searchContext, boolean hasFilterCollector) throws IOException {
// create the top docs collector last when the other collectors are known
final Optional<QueryCollectorContext> queryCollectorContextOpt = QueryCollectorContextSpecRegistry.getQueryCollectorContextSpec(
searchContext,
new QueryCollectorArguments.Builder().hasFilterCollector(hasFilterCollector).build()
).map(queryCollectorContextSpec -> new QueryCollectorContext(queryCollectorContextSpec.getContextName()) {
@Override
Collector create(Collector in) throws IOException {
return queryCollectorContextSpec.create(in);
}

@Override
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) throws IOException {
return queryCollectorContextSpec.createManager(in);
}

@Override
void postProcess(QuerySearchResult result) throws IOException {
queryCollectorContextSpec.postProcess(result);
}
});
if (queryCollectorContextOpt.isPresent()) {
return queryCollectorContextOpt.get();
} else {
return createTopDocsCollectorContext(searchContext, hasFilterCollector);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.ProfileCollectorManager;
import org.opensearch.search.query.QueryPhase.DefaultQueryPhaseSearcher;

import java.io.IOException;
import java.util.LinkedList;
Expand All @@ -30,7 +29,7 @@
* The implementation of the {@link QueryPhaseSearcher} which attempts to use concurrent
* search of Apache Lucene segments if it has been enabled.
*/
public class ConcurrentQueryPhaseSearcher extends DefaultQueryPhaseSearcher {
public class ConcurrentQueryPhaseSearcher extends AbstractQueryPhaseSearcher {
private static final Logger LOGGER = LogManager.getLogger(ConcurrentQueryPhaseSearcher.class);
private final AggregationProcessor aggregationProcessor = new ConcurrentAggregationProcessor();

Expand All @@ -40,15 +39,15 @@ public class ConcurrentQueryPhaseSearcher extends DefaultQueryPhaseSearcher {
public ConcurrentQueryPhaseSearcher() {}

@Override
protected boolean searchWithCollector(
protected boolean doSearchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
QueryCollectorContext queryCollectorContext,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
QueryCollectorContext queryCollectorContext = getQueryCollectorContext(searchContext, hasFilterCollector);
return searchWithCollectorManager(
searchContext,
searcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static org.opensearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
import static org.opensearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
import static org.opensearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
import static org.opensearch.search.query.QueryCollectorContext.createMultiCollectorContext;
import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;

/**
* Query phase of a search request, used to run the query and get back from each shard information about the matching documents
Expand Down Expand Up @@ -411,7 +409,7 @@ public static class TimeExceededException extends RuntimeException {
*
* @opensearch.internal
*/
public static class DefaultQueryPhaseSearcher implements QueryPhaseSearcher {
public static class DefaultQueryPhaseSearcher extends AbstractQueryPhaseSearcher {
private final AggregationProcessor aggregationProcessor;

/**
Expand All @@ -422,7 +420,7 @@ protected DefaultQueryPhaseSearcher() {
}

@Override
public boolean searchWith(
protected boolean doSearchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
Expand All @@ -447,47 +445,6 @@ protected boolean searchWithCollector(
boolean hasTimeout
) throws IOException {
QueryCollectorContext queryCollectorContext = getQueryCollectorContext(searchContext, hasFilterCollector);
return searchWithCollector(searchContext, searcher, query, collectors, queryCollectorContext, hasFilterCollector, hasTimeout);
}

private QueryCollectorContext getQueryCollectorContext(SearchContext searchContext, boolean hasFilterCollector) throws IOException {
// create the top docs collector last when the other collectors are known
final Optional<QueryCollectorContext> queryCollectorContextOpt = QueryCollectorContextSpecRegistry.getQueryCollectorContextSpec(
searchContext,
new QueryCollectorArguments.Builder().hasFilterCollector(hasFilterCollector).build()
).map(queryCollectorContextSpec -> new QueryCollectorContext(queryCollectorContextSpec.getContextName()) {
@Override
Collector create(Collector in) throws IOException {
return queryCollectorContextSpec.create(in);
}

@Override
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in)
throws IOException {
return queryCollectorContextSpec.createManager(in);
}

@Override
void postProcess(QuerySearchResult result) throws IOException {
queryCollectorContextSpec.postProcess(result);
}
});
if (queryCollectorContextOpt.isPresent()) {
return queryCollectorContextOpt.get();
} else {
return createTopDocsCollectorContext(searchContext, hasFilterCollector);
}
}

protected boolean searchWithCollector(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
QueryCollectorContext queryCollectorContext,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
return QueryPhase.searchWithCollector(
searchContext,
searcher,
Expand All @@ -498,5 +455,6 @@ protected boolean searchWithCollector(
hasTimeout
);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.search.internal.SearchContext;

/**
* Listener interface that allows plugins to hook into the query phase
* before and after collection. This enables custom CollectorManager
* implementations and data processing for advanced search features like
* hybrid queries and neural search.
*
* <p>This API is experimental and may change in future versions based on
* feedback from plugin implementations.</p>
*
* @opensearch.api
*/
@ExperimentalApi
public interface QueryPhaseListener {

/**
* Called before collection begins in the query phase.
* This allows extensions to set up custom state or modify the search context
* before the main query execution.
*
* @param searchContext the current search context
*/
void beforeCollection(SearchContext searchContext);

/**
* Called after collection completes in the query phase.
* This allows extensions to process collected data or perform
* post-collection operations.
*
* @param searchContext the current search context
*/
void afterCollection(SearchContext searchContext);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

/**
* The extension point which allows to plug in custom search implementation to be
Expand Down Expand Up @@ -53,4 +55,12 @@ boolean searchWith(
default AggregationProcessor aggregationProcessor(SearchContext searchContext) {
return new DefaultAggregationProcessor();
}

/**
* Get the list of query phase listeners that should be executed before and after score collection.
* @return list of query phase listeners, empty list if none
*/
default List<QueryPhaseListener> queryPhaseListeners() {
return Collections.emptyList();
}
}
Loading
Loading