From 76e324525c0e5e0e0b69131ab9b253d6e2fcc5f4 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Wed, 17 May 2023 01:53:15 +0000 Subject: [PATCH] Split search pipeline processor factories by type In the initial search pipelines commit, I threw request and response processor factories into one combined map. I think that was a mistake. We should embrace type-safety by making sure that the kind of processor is clear from end to end. As we add more processor types (e.g. search phase processor), throwing them all in one big map would get messier. As a bonus, we'll be able to reuse processor names across different types of processor. Closes https://github.com/opensearch-project/OpenSearch/issues/7576 Signed-off-by: Michael Froh --- CHANGELOG.md | 1 + .../common/FilterQueryRequestProcessor.java | 4 +- .../SearchPipelineCommonModulePlugin.java | 3 +- .../test/search_pipeline/10_basic.yml | 2 +- .../plugins/SearchPipelinePlugin.java | 17 +++- .../opensearch/search/pipeline/Pipeline.java | 33 ++----- .../opensearch/search/pipeline/Processor.java | 5 +- .../search/pipeline/SearchPipelineInfo.java | 75 +++++++++++---- .../pipeline/SearchPipelineService.java | 93 ++++++++++++------- .../nodesinfo/NodeInfoStreamingTests.java | 2 +- .../pipeline/SearchPipelineServiceTests.java | 73 ++++++++------- 11 files changed, 188 insertions(+), 120 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 798be4678e550..1a3a465659642 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866)) - [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253)) - [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470)) +- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 81c00012daec6..0ca090780bb60 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -75,7 +75,7 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { return request; } - static class Factory implements Processor.Factory { + static class Factory implements Processor.Factory { private final NamedXContentRegistry namedXContentRegistry; public static final ParseField QUERY_FIELD = new ParseField("query"); @@ -85,7 +85,7 @@ static class Factory implements Processor.Factory { @Override public FilterQueryRequestProcessor create( - Map processorFactories, + Map> processorFactories, String tag, String description, Map config diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index caca753caf819..ea61165353a8b 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -11,6 +11,7 @@ import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; import java.util.Map; @@ -25,7 +26,7 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi public SearchPipelineCommonModulePlugin() {} @Override - public Map getProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Processor.Parameters parameters) { return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry)); } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml index 473d92aa18052..621d532048d3e 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml @@ -12,4 +12,4 @@ nodes.info: {} - contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } } - - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } } + - contains: { nodes.$cluster_manager.search_pipelines.request_processors: { type: filter_query } } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java index 8e6fbef6c8b1d..b8ceddecd3d20 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java @@ -9,6 +9,8 @@ package org.opensearch.plugins; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; import java.util.Collections; import java.util.Map; @@ -20,13 +22,24 @@ */ public interface SearchPipelinePlugin { /** - * Returns additional search pipeline processor types added by this plugin. + * Returns additional search pipeline request processor types added by this plugin. * * The key of the returned {@link Map} is the unique name for the processor which is specified * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map getProcessors(Processor.Parameters parameters) { + default Map> getRequestProcessors(Processor.Parameters parameters) { + return Collections.emptyMap(); + } + + /** + * Returns additional search pipeline response processor types added by this plugin. + * + * The key of the returned {@link Map} is the unique name for the processor which is specified + * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} + * to create the processor from a given pipeline configuration. + */ + default Map> getResponseProcessors(Processor.Parameters parameters) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index f5dce8ec728b2..2e349ed7ef7d4 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -65,28 +65,21 @@ class Pipeline { public static Pipeline create( String id, Map config, - Map processorFactories, + Map> requestProcessorFactories, + Map> responseProcessorFactories, NamedWriteableRegistry namedWriteableRegistry ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors( - SearchRequestProcessor.class, - processorFactories, - requestProcessorConfigs - ); + List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors( - SearchResponseProcessor.class, - processorFactories, - responseProcessorConfigs - ); + List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); if (config.isEmpty() == false) { throw new OpenSearchParseException( "pipeline [" @@ -98,10 +91,8 @@ public static Pipeline create( return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry); } - @SuppressWarnings("unchecked") // Cast is checked using isInstance private static List readProcessors( - Class processorType, - Map processorFactories, + Map> processorFactories, List> requestProcessorConfigs ) throws Exception { List processors = new ArrayList<>(); @@ -117,24 +108,12 @@ private static List readProcessors( Map config = (Map) entry.getValue(); String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); - Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config); - if (processorType.isInstance(processor)) { - processors.add((T) processor); - } else { - throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName()); - } + processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); } } return processors; } - List flattenAllProcessors() { - List allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size()); - allProcessors.addAll(searchRequestProcessors); - allProcessors.addAll(searchResponseProcessors); - return allProcessors; - } - String getId() { return id; } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index 44f268242b83c..ee28db1cc334d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -52,7 +52,7 @@ public interface Processor { /** * A factory that knows how to construct a processor based on a map of maps. */ - interface Factory { + interface Factory { /** * Creates a processor based on the specified map of maps config. @@ -65,8 +65,7 @@ interface Factory { * Note: Implementations are responsible for removing the used configuration * keys, so that after creation the config map should be empty. */ - Processor create(Map processorFactories, String tag, String description, Map config) - throws Exception; + T create(Map> processorFactories, String tag, String description, Map config) throws Exception; } /** diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java index 95d1e3720cbb3..b91f18c44cc7b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java @@ -8,15 +8,19 @@ package org.opensearch.search.pipeline; +import org.opensearch.Version; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.node.ReportingService; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; /** @@ -26,45 +30,84 @@ */ public class SearchPipelineInfo implements ReportingService.Info { - private final Set processors; + private final Map> processors = new TreeMap<>(); - public SearchPipelineInfo(List processors) { - this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order + public SearchPipelineInfo(Map> processors) { + for (Map.Entry> processorsEntry : processors.entrySet()) { + // we use a treeset here to have a test-able / predictable order + this.processors.put(processorsEntry.getKey(), new TreeSet<>(processorsEntry.getValue())); + } } /** * Read from a stream. */ public SearchPipelineInfo(StreamInput in) throws IOException { - processors = new TreeSet<>(); - final int size = in.readVInt(); - for (int i = 0; i < size; i++) { - processors.add(new ProcessorInfo(in)); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (in.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we had a flat list of processors. For best compatibility, assume they're valid + // request and response processor, since we couldn't tell the difference back then. + final int size = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int i = 0; i < size; i++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(Pipeline.REQUEST_PROCESSORS_KEY, processorInfos); + processors.put(Pipeline.RESPONSE_PROCESSORS_KEY, processorInfos); + } else { + final int numTypes = in.readVInt(); + for (int i = 0; i < numTypes; i++) { + String type = in.readString(); + int numProcessors = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int j = 0; j < numProcessors; j++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(type, processorInfos); + } } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("search_pipelines"); - builder.startArray("processors"); - for (ProcessorInfo info : processors) { - info.toXContent(builder, params); + for (Map.Entry> processorEntry : processors.entrySet()) { + builder.startArray(processorEntry.getKey()); + for (ProcessorInfo info : processorEntry.getValue()) { + info.toXContent(builder, params); + } + builder.endArray(); } - builder.endArray(); builder.endObject(); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - out.write(processors.size()); - for (ProcessorInfo info : processors) { - info.writeTo(out); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (out.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we grouped all processors into a single list. + Set processorInfos = new TreeSet<>(); + processorInfos.addAll(processors.getOrDefault(Pipeline.REQUEST_PROCESSORS_KEY, Collections.emptySet())); + processorInfos.addAll(processors.getOrDefault(Pipeline.RESPONSE_PROCESSORS_KEY, Collections.emptySet())); + out.writeVInt(processorInfos.size()); + for (ProcessorInfo processorInfo : processorInfos) { + processorInfo.writeTo(out); + } + } else { + out.write(processors.size()); + for (Map.Entry> processorsEntry : processors.entrySet()) { + out.writeString(processorsEntry.getKey()); + out.writeVInt(processorsEntry.getValue().size()); + for (ProcessorInfo processorInfo : processorsEntry.getValue()) { + processorInfo.writeTo(out); + } + } } } - public boolean containsProcessor(String type) { - return processors.contains(new ProcessorInfo(type)); + public boolean containsProcessor(String processorType, String type) { + return processors.containsKey(processorType) && processors.get(processorType).contains(new ProcessorInfo(type)); } @Override diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index f96a6eb4a6b76..a486e636cbb7d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -55,6 +55,8 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; /** * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines @@ -68,7 +70,8 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ private static final Logger logger = LogManager.getLogger(SearchPipelineService.class); private final ClusterService clusterService; private final ScriptService scriptService; - private final Map processorFactories; + private final Map> requestProcessorFactories; + private final Map> responseProcessorFactories; private volatile Map pipelines = Collections.emptyMap(); private final ThreadPool threadPool; private final List> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>(); @@ -95,34 +98,33 @@ public SearchPipelineService( this.scriptService = scriptService; this.threadPool = threadPool; this.namedWriteableRegistry = namedWriteableRegistry; - this.processorFactories = processorFactories( - searchPipelinePlugins, - new Processor.Parameters( - env, - scriptService, - analysisRegistry, - threadPool.getThreadContext(), - threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), - this, - client, - threadPool.generic()::execute, - namedXContentRegistry - ) + Processor.Parameters parameters = new Processor.Parameters( + env, + scriptService, + analysisRegistry, + threadPool.getThreadContext(), + threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), + this, + client, + threadPool.generic()::execute, + namedXContentRegistry ); + this.requestProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getRequestProcessors(parameters)); + this.responseProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getResponseProcessors(parameters)); putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true); deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true); this.isEnabled = isEnabled; } - private static Map processorFactories( + private static Map> processorFactories( List searchPipelinePlugins, - Processor.Parameters parameters + Function>> processorLoader ) { - Map processorFactories = new HashMap<>(); + Map> processorFactories = new HashMap<>(); for (SearchPipelinePlugin searchPipelinePlugin : searchPipelinePlugins) { - Map newProcessors = searchPipelinePlugin.getProcessors(parameters); - for (Map.Entry entry : newProcessors.entrySet()) { + Map> newProcessors = processorLoader.apply(searchPipelinePlugin); + for (Map.Entry> entry : newProcessors.entrySet()) { if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Search processor [" + entry.getKey() + "] is already registered"); } @@ -173,7 +175,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), newConfiguration.getConfigAsMap(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -268,12 +271,27 @@ void validatePipeline(Map searchPipelineInfos throw new IllegalStateException("Search pipeline info is empty"); } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, namedWriteableRegistry); + Pipeline pipeline = Pipeline.create( + request.getId(), + pipelineConfig, + requestProcessorFactories, + responseProcessorFactories, + namedWriteableRegistry + ); List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { + for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { + for (Map.Entry entry : searchPipelineInfos.entrySet()) { + String type = processor.getType(); + if (entry.getValue().containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, type) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); + } + } + } + for (SearchResponseProcessor processor : pipeline.getSearchResponseProcessors()) { for (Map.Entry entry : searchPipelineInfos.entrySet()) { String type = processor.getType(); - if (entry.getValue().containsProcessor(type) == false) { + if (entry.getValue().containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, type) == false) { String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); } @@ -352,7 +370,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = Pipeline.create( AD_HOC_PIPELINE_ID, searchRequest.source().searchPipelineSource(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); } catch (Exception e) { @@ -385,17 +404,27 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce return new PipelinedRequest(pipeline, transformedRequest); } - Map getProcessorFactories() { - return processorFactories; + Map> getRequestProcessorFactories() { + return requestProcessorFactories; + } + + Map> getResponseProcessorFactories() { + return responseProcessorFactories; } @Override public SearchPipelineInfo info() { - List processorInfoList = new ArrayList<>(); - for (Map.Entry entry : processorFactories.entrySet()) { - processorInfoList.add(new ProcessorInfo(entry.getKey())); - } - return new SearchPipelineInfo(processorInfoList); + List requestProcessorInfoList = requestProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + List responseProcessorInfoList = responseProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + return new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, requestProcessorInfoList, Pipeline.RESPONSE_PROCESSORS_KEY, responseProcessorInfoList) + ); } public static List getPipelines(ClusterState clusterState, String... ids) { diff --git a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java index bec921bc5bf5d..cdd1c682b40dc 100644 --- a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java @@ -251,7 +251,7 @@ private static NodeInfo createNodeInfo() { for (int i = 0; i < numProcessors; i++) { processors.add(new org.opensearch.search.pipeline.ProcessorInfo(randomAlphaOfLengthBetween(3, 10))); } - searchPipelineInfo = new SearchPipelineInfo(processors); + searchPipelineInfo = new SearchPipelineInfo(Map.of(randomAlphaOfLengthBetween(3, 10), processors)); } return new NodeInfo( diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 36978d5310810..516227e9a13d8 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -60,9 +60,13 @@ public class SearchPipelineServiceTests extends OpenSearchTestCase { private static final SearchPipelinePlugin DUMMY_PLUGIN = new SearchPipelinePlugin() { @Override - public Map getProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Processor.Parameters parameters) { return Map.of("foo", (factories, tag, description, config) -> null); } + + public Map> getResponseProcessors(Processor.Parameters parameters) { + return Map.of("bar", (factories, tag, description, config) -> null); + } }; private ThreadPool threadPool; @@ -89,9 +93,14 @@ public void testSearchPipelinePlugin() { client, false ); - Map factories = searchPipelineService.getProcessorFactories(); - assertEquals(1, factories.size()); - assertTrue(factories.containsKey("foo")); + Map> requestProcessorFactories = searchPipelineService + .getRequestProcessorFactories(); + assertEquals(1, requestProcessorFactories.size()); + assertTrue(requestProcessorFactories.containsKey("foo")); + Map> responseProcessorFactories = searchPipelineService + .getResponseProcessorFactories(); + assertEquals(1, responseProcessorFactories.size()); + assertTrue(responseProcessorFactories.containsKey("bar")); } public void testSearchPipelinePluginDuplicate() { @@ -235,8 +244,8 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp } private SearchPipelineService createWithProcessors() { - Map processors = new HashMap<>(); - processors.put("scale_request_size", (processorFactories, tag, description, config) -> { + Map> requestProcessors = new HashMap<>(); + requestProcessors.put("scale_request_size", (processorFactories, tag, description, config) -> { float scale = ((Number) config.remove("scale")).floatValue(); return new FakeRequestProcessor( "scale_request_size", @@ -245,11 +254,12 @@ private SearchPipelineService createWithProcessors() { req -> req.source().size((int) (req.source().size() * scale)) ); }); - processors.put("fixed_score", (processorFactories, tag, description, config) -> { + Map> responseProcessors = new HashMap<>(); + responseProcessors.put("fixed_score", (processorFactories, tag, description, config) -> { float score = ((Number) config.remove("score")).floatValue(); return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score))); }); - return createWithProcessors(processors); + return createWithProcessors(requestProcessors, responseProcessors); } @Override @@ -258,7 +268,10 @@ protected NamedWriteableRegistry writableRegistry() { return new NamedWriteableRegistry(searchModule.getNamedWriteables()); } - private SearchPipelineService createWithProcessors(Map processors) { + private SearchPipelineService createWithProcessors( + Map> requestProcessors, + Map> responseProcessors + ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); @@ -274,8 +287,13 @@ private SearchPipelineService createWithProcessors(Map getProcessors(Processor.Parameters parameters) { - return processors; + public Map> getRequestProcessors(Processor.Parameters parameters) { + return requestProcessors; + } + + @Override + public Map> getResponseProcessors(Processor.Parameters parameters) { + return responseProcessors; } }), client, @@ -619,13 +637,14 @@ public void testValidatePipeline() throws Exception { XContentType.JSON ); + SearchPipelineInfo completePipelineInfo = new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor), Pipeline.RESPONSE_PROCESSORS_KEY, List.of(rspProcessor)) + ); + SearchPipelineInfo incompletePipelineInfo = new SearchPipelineInfo(Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor))); // One node is missing a processor expectThrows( OpenSearchParseException.class, - () -> searchPipelineService.validatePipeline( - Map.of(n1, new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), n2, new SearchPipelineInfo(List.of(reqProcessor))), - putRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, incompletePipelineInfo), putRequest) ); // Discovery failed, no infos passed. @@ -644,27 +663,11 @@ public void testValidatePipeline() throws Exception { ); expectThrows( ClassCastException.class, - () -> searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - badPutRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), badPutRequest) ); // Success - searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - putRequest - ); + searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), putRequest); } /** @@ -717,7 +720,7 @@ public void testInlinePipeline() throws Exception { public void testInfo() { SearchPipelineService searchPipelineService = createWithProcessors(); SearchPipelineInfo info = searchPipelineService.info(); - assertTrue(info.containsProcessor("scale_request_size")); - assertTrue(info.containsProcessor("fixed_score")); + assertTrue(info.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, "scale_request_size")); + assertTrue(info.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, "fixed_score")); } }