Skip to content

Commit

Permalink
Provide factory for pluggable deciders
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Ramadurai <gramadur@icloud.com>
  • Loading branch information
Gankris96 authored and Ganesh Ramadurai committed Sep 5, 2024
1 parent 2f1e209 commit 5b2e58b
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 117 deletions.
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.query.QueryPhase;
Expand Down Expand Up @@ -1344,7 +1344,7 @@ protected Node(
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool),
taskResourceTrackingService,
searchModule.getConcurrentSearchDeciders()
searchModule.getConcurrentSearchRequestDeciderFactories()
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -2004,7 +2004,7 @@ protected SearchService newSearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchDecider> concurrentSearchDecidersList
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) {
return new SearchService(
clusterService,
Expand All @@ -2018,7 +2018,7 @@ protected SearchService newSearchService(
circuitBreakerService,
indexSearcherExecutor,
taskResourceTrackingService,
concurrentSearchDecidersList
concurrentSearchDeciderFactories
);
}

Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand Down Expand Up @@ -141,12 +141,12 @@ default Map<String, Highlighter> getHighlighters() {
}

/**
* Allows plugins to register custom decider for concurrent search
* @return A {@link ConcurrentSearchDecider}
* Allows plugins to register a factory to create custom decider for concurrent search
* @return A {@link ConcurrentSearchRequestDecider.Factory}
*/
@ExperimentalApi
default ConcurrentSearchDecider getConcurrentSearchDecider() {
return null;
default Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.deciders.ConcurrentSearchVisitor;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -106,13 +106,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
Expand All @@ -137,7 +138,7 @@ final class DefaultSearchContext extends SearchContext {
private final ShardSearchRequest request;
private final SearchShardTarget shardTarget;
private final LongSupplier relativeTimeSupplier;
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
private SearchType searchType;
private final BigArrays bigArrays;
private final IndexShard indexShard;
Expand Down Expand Up @@ -223,7 +224,7 @@ final class DefaultSearchContext extends SearchContext {
boolean validate,
Executor executor,
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand Down Expand Up @@ -267,7 +268,7 @@ final class DefaultSearchContext extends SearchContext {

this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
this.concurrentSearchDeciders = concurrentSearchDeciders;
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
}

Expand Down Expand Up @@ -932,14 +933,21 @@ public boolean shouldUseConcurrentSearch() {

private boolean evaluateAutoMode() {

// filter out deciders that want to opt-out of decision-making
final Set<ConcurrentSearchDecider> filteredDeciders = concurrentSearchDeciders.stream()
.filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings()))
.collect(Collectors.toSet());
final Set<ConcurrentSearchRequestDecider> concurrentSearchRequestDeciders = new HashSet<>();

// create the ConcurrentSearchRequestDeciders using registered factories
for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) {
final Optional<ConcurrentSearchRequestDecider> concurrentSearchRequestDecider = deciderFactory.create(
indexService.getIndexSettings()
);
concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add);

}

// evaluate based on concurrent search query visitor
if (filteredDeciders.size() > 0) {
if (concurrentSearchRequestDeciders.size() > 0) {
ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor(
filteredDeciders,
concurrentSearchRequestDeciders,
indexService.getIndexSettings()
);
if (request().source() != null && request().source().query() != null) {
Expand All @@ -949,7 +957,7 @@ private boolean evaluateAutoMode() {
}

final List<ConcurrentSearchDecision> decisions = new ArrayList<>();
for (ConcurrentSearchDecider decider : filteredDeciders) {
for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) {
ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision();
if (decision != null) {
if (logger.isDebugEnabled()) {
Expand Down
24 changes: 11 additions & 13 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.ExplainPhase;
Expand Down Expand Up @@ -318,7 +318,7 @@ public class SearchModule {
private final QueryPhaseSearcher queryPhaseSearcher;
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;

private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;

/**
* Constructs a new SearchModule object
Expand Down Expand Up @@ -348,25 +348,23 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins);
concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins);
}

private Collection<ConcurrentSearchDecider> registerConcurrentSearchDeciders(List<SearchPlugin> plugins) {
List<ConcurrentSearchDecider> concurrentSearchDeciders = new ArrayList<>();
private Collection<ConcurrentSearchRequestDecider.Factory> registerConcurrentSearchDeciderFactories(List<SearchPlugin> plugins) {
List<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories = new ArrayList<>();
for (SearchPlugin plugin : plugins) {
ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider();
if (decider != null) {
concurrentSearchDeciders.add(decider);
}
final Optional<ConcurrentSearchRequestDecider.Factory> deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory();
deciderFactory.ifPresent(concurrentSearchDeciderFactories::add);
}
return concurrentSearchDeciders;
return concurrentSearchDeciderFactories;
}

/**
* Returns the concurrent search deciders that the plugins have registered
* Returns the concurrent search decider factories that the plugins have registered
*/
public Collection<ConcurrentSearchDecider> getConcurrentSearchDeciders() {
return concurrentSearchDeciders;
public Collection<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactories() {
return concurrentSearchDeciderFactories;
}

public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.dfs.DfsPhase;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -364,7 +364,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final QueryPhase queryPhase;

private final FetchPhase fetchPhase;
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;

private volatile long defaultKeepAlive;

Expand Down Expand Up @@ -410,7 +410,7 @@ public SearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand Down Expand Up @@ -466,7 +466,7 @@ public SearchService(
allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField);

this.concurrentSearchDeciders = concurrentSearchDeciders;
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -1167,7 +1167,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
validate,
indexSearcherExecutor,
this::aggReduceContextBuilder,
concurrentSearchDeciders
concurrentSearchDeciderFactories
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.Collection;

/**
* This Class defines the decisions that a {@link ConcurrentSearchDecider#getConcurrentSearchDecision} can return.
* This Class defines the decisions that a {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} can return.
*
*/
@ExperimentalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.QueryBuilder;

import java.util.Optional;

/**
* {@link ConcurrentSearchDecider} allows pluggable way to evaluate if a query in the search request
* {@link ConcurrentSearchRequestDecider} allows pluggable way to evaluate if a query in the search request
* can use concurrent segment search using the passed in queryBuilders from query tree and index settings
* on a per shard request basis.
* Implementations can also opt out of the evaluation process for certain indices based on the index settings.
* For all the deciders which can evaluate query tree for an index, its evaluateForQuery method
* will be called for each node in the query tree. After traversing of the query tree is completed, the final
* decision from the deciders will be obtained using {@link ConcurrentSearchDecider#getConcurrentSearchDecision}
* Implementations will need to implement the Factory interface that can be used to create the ConcurrentSearchRequestDecider
* This factory will be called on each shard search request to create the ConcurrentSearchRequestDecider and get the
* concurrent search decision from the created decider on a per-request basis.
* For all the deciders the evaluateForQuery method will be called for each node in the query tree.
* After traversing of the query tree is completed, the final decision from the deciders will be
* obtained using {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision}
*/
@ExperimentalApi
public abstract class ConcurrentSearchDecider {
public abstract class ConcurrentSearchRequestDecider {

/**
* Evaluate for the passed in queryBuilder node in the query tree of the search request
Expand All @@ -31,14 +35,6 @@ public abstract class ConcurrentSearchDecider {
*/
public abstract void evaluateForQuery(QueryBuilder queryBuilder, IndexSettings indexSettings);

/**
* Provides a way for deciders to opt out of decision-making process for certain requests based on
* index settings.
* Return true if interested in decision making for index,
* false, otherwise
*/
public abstract boolean canEvaluateForIndex(IndexSettings indexSettings);

/**
* Provide the final decision for concurrent search based on all evaluations
* Plugins may need to maintain internal state of evaluations to provide a final decision
Expand All @@ -47,4 +43,16 @@ public abstract class ConcurrentSearchDecider {
*/
public abstract ConcurrentSearchDecision getConcurrentSearchDecision();

/**
* Factory interface that can be implemented to create the ConcurrentSearchRequestDecider object.
* Implementations can use the passed in indexSettings to decide whether to create the decider object or
* return {@link Optional#empty()}.
*/
@ExperimentalApi
public interface Factory {
default Optional<ConcurrentSearchRequestDecider> create(IndexSettings indexSettings) {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

/**
* Class to traverse the QueryBuilder tree and invoke the
* {@link ConcurrentSearchDecider#evaluateForQuery} at each node of the query tree
* {@link ConcurrentSearchRequestDecider#evaluateForQuery} at each node of the query tree
*/
@ExperimentalApi
public class ConcurrentSearchVisitor implements QueryBuilderVisitor {

private final Set<ConcurrentSearchDecider> deciders;
private final Set<ConcurrentSearchRequestDecider> deciders;
private final IndexSettings indexSettings;

public ConcurrentSearchVisitor(Set<ConcurrentSearchDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
public ConcurrentSearchVisitor(Set<ConcurrentSearchRequestDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
Objects.requireNonNull(concurrentSearchVisitorDeciders, "Concurrent search deciders cannot be null");
deciders = concurrentSearchVisitorDeciders;
indexSettings = idxSettings;
Expand Down
Loading

0 comments on commit 5b2e58b

Please sign in to comment.