Skip to content

Commit

Permalink
Moved SearchPhaseName enum in separate class and fixed comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Navneet Verma <navneev@amazon.com>
  • Loading branch information
navneet1v committed Jun 28, 2023
1 parent 958763a commit db8b3e0
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
final SearchPhase nextPhase = getNextPhase(results, this);
if (request instanceof PipelinedRequest && nextPhase != null) {
((PipelinedRequest) request).transformSearchPhase(results, this, this.getName(), nextPhase.getName());
((PipelinedRequest) request).transformSearchPhaseResults(results, this, this.getName(), nextPhase.getName());
}
executeNextPhase(this, nextPhase);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* @opensearch.internal
*/
public abstract class SearchPhase implements CheckedRunnable<IOException> {
abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;

protected SearchPhase(String name) {
Expand All @@ -64,26 +64,4 @@ public String getName() {
public SearchPhaseName getSearchPhaseName() {
return SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT));
}

/**
* Enum for different Search Phases in OpenSearch
* @opensearch.internal
*/
public enum SearchPhaseName {
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
EXPAND("expand"),
CAN_MATCH("can_match");

private final String name;

SearchPhaseName(final String name) {
this.name = name;
}

public String getName() {
return name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

/**
* Enum for different Search Phases in OpenSearch
* @opensearch.internal
*/
public enum SearchPhaseName {
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
EXPAND("expand"),
CAN_MATCH("can_match");

private final String name;

SearchPhaseName(final String name) {
this.name = name;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ protected SearchPhase sendResponsePhase(
SearchPhaseController.ReducedQueryPhase queryPhase,
final AtomicArray<? extends SearchPhaseResult> fetchResults
) {
return new SearchPhase(SearchPhase.SearchPhaseName.FETCH.getName()) {
return new SearchPhase(SearchPhaseName.FETCH.getName()) {
@Override
public void run() throws IOException {
sendResponse(queryPhase, fetchResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void executeInitialPhase(

@Override
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
return new SearchPhase(SearchPhase.SearchPhaseName.FETCH.getName()) {
return new SearchPhase(SearchPhaseName.FETCH.getName()) {
@Override
public void run() {
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProce
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getPhaseResultsProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ SearchResponse transformResponse(SearchRequest request, SearchResponse response)
null
);

<Result extends SearchPhaseResult> void runSearchPhaseTransformer(
<Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
SearchPhaseResults<Result> searchPhaseResult,
SearchPhaseContext context,
String currentPhase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public SearchResponse transformResponse(SearchResponse response) {
return pipeline.transformResponse(this, response);
}

public <Result extends SearchPhaseResult> void transformSearchPhase(
public <Result extends SearchPhaseResult> void transformSearchPhaseResults(
final SearchPhaseResults<Result> searchPhaseResult,
final SearchPhaseContext searchPhaseContext,
final String currentPhase,
final String nextPhase
) {
pipeline.runSearchPhaseTransformer(searchPhaseResult, searchPhaseContext, currentPhase, nextPhase);
pipeline.runSearchPhaseResultsTransformer(searchPhaseResult, searchPhaseContext, currentPhase, nextPhase);
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.opensearch.search.pipeline;

import org.opensearch.action.search.SearchPhase;
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchPhaseResults;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -21,8 +21,8 @@
public interface SearchPhaseResultsProcessor extends Processor {

/**
* Processes the {@link SearchPhaseResults} obtained from a {@link SearchPhase} which will be returned to next
* {@link SearchPhase}.
* Processes the {@link SearchPhaseResults} obtained from a SearchPhase which will be returned to next
* SearchPhase.
* @param searchPhaseResult {@link SearchPhaseResults}
* @param searchPhaseContext {@link SearchContext}
* @param <Result> {@link SearchPhaseResult}
Expand All @@ -34,14 +34,14 @@ <Result extends SearchPhaseResult> void process(

/**
* The phase which should have run before, this processor can start executing.
* @return {@link SearchPhase.SearchPhaseName}
* @return {@link SearchPhaseName}
*/
SearchPhase.SearchPhaseName getBeforePhase();
SearchPhaseName getBeforePhase();

/**
* The phase which should run after, this processor execution.
* @return {@link SearchPhase.SearchPhaseName}
* @return {@link SearchPhaseName}
*/
SearchPhase.SearchPhaseName getAfterPhase();
SearchPhaseName getAfterPhase();

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ public SearchPipelineService(
);
this.requestProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getRequestProcessors(parameters));
this.responseProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getResponseProcessors(parameters));
this.phaseInjectorProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getPhaseResultsProcessors(parameters));
this.phaseInjectorProcessorFactories = processorFactories(
searchPipelinePlugins,
p -> p.getSearchPhaseResultsProcessors(parameters)
);
putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true);
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true);
this.isEnabled = isEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.opensearch.action.search.MockSearchPhaseContext;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.search.QueryPhaseResultConsumer;
import org.opensearch.action.search.SearchPhase;
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchPhaseResults;
import org.opensearch.action.search.SearchProgressListener;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -85,7 +85,9 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
}

@Override
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getPhaseResultsProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(
Processor.Parameters parameters
) {
return Map.of("zoe", (factories, tag, description, config) -> null);
}
};
Expand Down Expand Up @@ -288,13 +290,13 @@ public <Result extends SearchPhaseResult> void process(
}

@Override
public SearchPhase.SearchPhaseName getBeforePhase() {
return SearchPhase.SearchPhaseName.QUERY;
public SearchPhaseName getBeforePhase() {
return SearchPhaseName.QUERY;
}

@Override
public SearchPhase.SearchPhaseName getAfterPhase() {
return SearchPhase.SearchPhaseName.FETCH;
public SearchPhaseName getAfterPhase() {
return SearchPhaseName.FETCH;
}
}

Expand Down Expand Up @@ -361,7 +363,7 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
}

@Override
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getPhaseResultsProcessors(
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(
Processor.Parameters parameters
) {
return phaseProcessors;
Expand Down Expand Up @@ -683,23 +685,23 @@ public void testTransformSearchPhase() {
SearchRequest searchRequest = new SearchRequest();
PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest);
AtomicArray<SearchPhaseResult> notTransformedSearchPhaseResults = searchPhaseResults.getAtomicArray();
pipelinedRequest.transformSearchPhase(
pipelinedRequest.transformSearchPhaseResults(
searchPhaseResults,
searchPhaseContext,
SearchPhase.SearchPhaseName.QUERY.getName(),
SearchPhase.SearchPhaseName.FETCH.getName()
SearchPhaseName.QUERY.getName(),
SearchPhaseName.FETCH.getName()
);
assertSame(searchPhaseResults.getAtomicArray(), notTransformedSearchPhaseResults);

// Now set the pipeline as p1
searchRequest = new SearchRequest().pipeline("p1");
pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest);

pipelinedRequest.transformSearchPhase(
pipelinedRequest.transformSearchPhaseResults(
searchPhaseResults,
searchPhaseContext,
SearchPhase.SearchPhaseName.QUERY.getName(),
SearchPhase.SearchPhaseName.FETCH.getName()
SearchPhaseName.QUERY.getName(),
SearchPhaseName.FETCH.getName()
);

List<SearchPhaseResult> resultAtomicArray = searchPhaseResults.getAtomicArray().asList();
Expand All @@ -713,11 +715,11 @@ public void testTransformSearchPhase() {
searchRequest = new SearchRequest().pipeline("p1");
pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest);
AtomicArray<SearchPhaseResult> notTransformedSearchPhaseResult = searchPhaseResults.getAtomicArray();
pipelinedRequest.transformSearchPhase(
pipelinedRequest.transformSearchPhaseResults(
searchPhaseResults,
searchPhaseContext,
SearchPhase.SearchPhaseName.DFS_QUERY.getName(),
SearchPhase.SearchPhaseName.QUERY.getName()
SearchPhaseName.DFS_QUERY.getName(),
SearchPhaseName.QUERY.getName()
);

assertSame(searchPhaseResults.getAtomicArray(), notTransformedSearchPhaseResult);
Expand Down

0 comments on commit db8b3e0

Please sign in to comment.