diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a5f816ac9a85..be5ab5f8e03a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253)) - [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470)) - [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377)) +- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 81c00012daec6..0ca090780bb60 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -75,7 +75,7 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { return request; } - static class Factory implements Processor.Factory { + static class Factory implements Processor.Factory { private final NamedXContentRegistry namedXContentRegistry; public static final ParseField QUERY_FIELD = new ParseField("query"); @@ -85,7 +85,7 @@ static class Factory implements Processor.Factory { @Override public FilterQueryRequestProcessor create( - Map processorFactories, + Map> processorFactories, String tag, String description, Map config diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java index 3a2f0e9fb2492..4c40dda5928f0 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java @@ -128,7 +128,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp /** * This is a factor that creates the RenameResponseProcessor */ - public static final class Factory implements Processor.Factory { + public static final class Factory implements Processor.Factory { /** * Constructor for factory @@ -137,7 +137,7 @@ public static final class Factory implements Processor.Factory { @Override public RenameFieldResponseProcessor create( - Map processorFactories, + Map> processorFactories, String tag, String description, Map config diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index a0e5182f71443..aa56714085b48 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -11,6 +11,8 @@ import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; import java.util.Map; @@ -25,12 +27,12 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi public SearchPipelineCommonModulePlugin() {} @Override - public Map getProcessors(Processor.Parameters parameters) { - return Map.of( - FilterQueryRequestProcessor.TYPE, - new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry), - RenameFieldResponseProcessor.TYPE, - new RenameFieldResponseProcessor.Factory() - ); + public Map> getRequestProcessors(Processor.Parameters parameters) { + return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry)); + } + + @Override + public Map> getResponseProcessors(Processor.Parameters parameters) { + return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory()); } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml index 0d931f8587664..644181d601ea4 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml @@ -12,5 +12,5 @@ nodes.info: {} - contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } } - - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } } - - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: rename_field } } + - contains: { nodes.$cluster_manager.search_pipelines.request_processors: { type: filter_query } } + - contains: { nodes.$cluster_manager.search_pipelines.response_processors: { type: rename_field } } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java index 8e6fbef6c8b1d..b8ceddecd3d20 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java @@ -9,6 +9,8 @@ package org.opensearch.plugins; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; import java.util.Collections; import java.util.Map; @@ -20,13 +22,24 @@ */ public interface SearchPipelinePlugin { /** - * Returns additional search pipeline processor types added by this plugin. + * Returns additional search pipeline request processor types added by this plugin. * * The key of the returned {@link Map} is the unique name for the processor which is specified * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map getProcessors(Processor.Parameters parameters) { + default Map> getRequestProcessors(Processor.Parameters parameters) { + return Collections.emptyMap(); + } + + /** + * Returns additional search pipeline response processor types added by this plugin. + * + * The key of the returned {@link Map} is the unique name for the processor which is specified + * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} + * to create the processor from a given pipeline configuration. + */ + default Map> getResponseProcessors(Processor.Parameters parameters) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index f5dce8ec728b2..c9a5f865d507e 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -46,7 +46,7 @@ class Pipeline { private final NamedWriteableRegistry namedWriteableRegistry; - Pipeline( + private Pipeline( String id, @Nullable String description, @Nullable Integer version, @@ -62,31 +62,24 @@ class Pipeline { this.namedWriteableRegistry = namedWriteableRegistry; } - public static Pipeline create( + static Pipeline create( String id, Map config, - Map processorFactories, + Map> requestProcessorFactories, + Map> responseProcessorFactories, NamedWriteableRegistry namedWriteableRegistry ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors( - SearchRequestProcessor.class, - processorFactories, - requestProcessorConfigs - ); + List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors( - SearchResponseProcessor.class, - processorFactories, - responseProcessorConfigs - ); + List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); if (config.isEmpty() == false) { throw new OpenSearchParseException( "pipeline [" @@ -98,10 +91,8 @@ public static Pipeline create( return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry); } - @SuppressWarnings("unchecked") // Cast is checked using isInstance private static List readProcessors( - Class processorType, - Map processorFactories, + Map> processorFactories, List> requestProcessorConfigs ) throws Exception { List processors = new ArrayList<>(); @@ -117,22 +108,10 @@ private static List readProcessors( Map config = (Map) entry.getValue(); String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); - Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config); - if (processorType.isInstance(processor)) { - processors.add((T) processor); - } else { - throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName()); - } + processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); } } - return processors; - } - - List flattenAllProcessors() { - List allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size()); - allProcessors.addAll(searchRequestProcessors); - allProcessors.addAll(searchResponseProcessors); - return allProcessors; + return Collections.unmodifiableList(processors); } String getId() { diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index 44f268242b83c..ee28db1cc334d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -52,7 +52,7 @@ public interface Processor { /** * A factory that knows how to construct a processor based on a map of maps. */ - interface Factory { + interface Factory { /** * Creates a processor based on the specified map of maps config. @@ -65,8 +65,7 @@ interface Factory { * Note: Implementations are responsible for removing the used configuration * keys, so that after creation the config map should be empty. */ - Processor create(Map processorFactories, String tag, String description, Map config) - throws Exception; + T create(Map> processorFactories, String tag, String description, Map config) throws Exception; } /** diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java index 95d1e3720cbb3..b91f18c44cc7b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java @@ -8,15 +8,19 @@ package org.opensearch.search.pipeline; +import org.opensearch.Version; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.node.ReportingService; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; /** @@ -26,45 +30,84 @@ */ public class SearchPipelineInfo implements ReportingService.Info { - private final Set processors; + private final Map> processors = new TreeMap<>(); - public SearchPipelineInfo(List processors) { - this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order + public SearchPipelineInfo(Map> processors) { + for (Map.Entry> processorsEntry : processors.entrySet()) { + // we use a treeset here to have a test-able / predictable order + this.processors.put(processorsEntry.getKey(), new TreeSet<>(processorsEntry.getValue())); + } } /** * Read from a stream. */ public SearchPipelineInfo(StreamInput in) throws IOException { - processors = new TreeSet<>(); - final int size = in.readVInt(); - for (int i = 0; i < size; i++) { - processors.add(new ProcessorInfo(in)); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (in.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we had a flat list of processors. For best compatibility, assume they're valid + // request and response processor, since we couldn't tell the difference back then. + final int size = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int i = 0; i < size; i++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(Pipeline.REQUEST_PROCESSORS_KEY, processorInfos); + processors.put(Pipeline.RESPONSE_PROCESSORS_KEY, processorInfos); + } else { + final int numTypes = in.readVInt(); + for (int i = 0; i < numTypes; i++) { + String type = in.readString(); + int numProcessors = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int j = 0; j < numProcessors; j++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(type, processorInfos); + } } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("search_pipelines"); - builder.startArray("processors"); - for (ProcessorInfo info : processors) { - info.toXContent(builder, params); + for (Map.Entry> processorEntry : processors.entrySet()) { + builder.startArray(processorEntry.getKey()); + for (ProcessorInfo info : processorEntry.getValue()) { + info.toXContent(builder, params); + } + builder.endArray(); } - builder.endArray(); builder.endObject(); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - out.write(processors.size()); - for (ProcessorInfo info : processors) { - info.writeTo(out); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (out.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we grouped all processors into a single list. + Set processorInfos = new TreeSet<>(); + processorInfos.addAll(processors.getOrDefault(Pipeline.REQUEST_PROCESSORS_KEY, Collections.emptySet())); + processorInfos.addAll(processors.getOrDefault(Pipeline.RESPONSE_PROCESSORS_KEY, Collections.emptySet())); + out.writeVInt(processorInfos.size()); + for (ProcessorInfo processorInfo : processorInfos) { + processorInfo.writeTo(out); + } + } else { + out.write(processors.size()); + for (Map.Entry> processorsEntry : processors.entrySet()) { + out.writeString(processorsEntry.getKey()); + out.writeVInt(processorsEntry.getValue().size()); + for (ProcessorInfo processorInfo : processorsEntry.getValue()) { + processorInfo.writeTo(out); + } + } } } - public boolean containsProcessor(String type) { - return processors.contains(new ProcessorInfo(type)); + public boolean containsProcessor(String processorType, String type) { + return processors.containsKey(processorType) && processors.get(processorType).contains(new ProcessorInfo(type)); } @Override diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index f96a6eb4a6b76..a486e636cbb7d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -55,6 +55,8 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; /** * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines @@ -68,7 +70,8 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ private static final Logger logger = LogManager.getLogger(SearchPipelineService.class); private final ClusterService clusterService; private final ScriptService scriptService; - private final Map processorFactories; + private final Map> requestProcessorFactories; + private final Map> responseProcessorFactories; private volatile Map pipelines = Collections.emptyMap(); private final ThreadPool threadPool; private final List> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>(); @@ -95,34 +98,33 @@ public SearchPipelineService( this.scriptService = scriptService; this.threadPool = threadPool; this.namedWriteableRegistry = namedWriteableRegistry; - this.processorFactories = processorFactories( - searchPipelinePlugins, - new Processor.Parameters( - env, - scriptService, - analysisRegistry, - threadPool.getThreadContext(), - threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), - this, - client, - threadPool.generic()::execute, - namedXContentRegistry - ) + Processor.Parameters parameters = new Processor.Parameters( + env, + scriptService, + analysisRegistry, + threadPool.getThreadContext(), + threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), + this, + client, + threadPool.generic()::execute, + namedXContentRegistry ); + this.requestProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getRequestProcessors(parameters)); + this.responseProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getResponseProcessors(parameters)); putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true); deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true); this.isEnabled = isEnabled; } - private static Map processorFactories( + private static Map> processorFactories( List searchPipelinePlugins, - Processor.Parameters parameters + Function>> processorLoader ) { - Map processorFactories = new HashMap<>(); + Map> processorFactories = new HashMap<>(); for (SearchPipelinePlugin searchPipelinePlugin : searchPipelinePlugins) { - Map newProcessors = searchPipelinePlugin.getProcessors(parameters); - for (Map.Entry entry : newProcessors.entrySet()) { + Map> newProcessors = processorLoader.apply(searchPipelinePlugin); + for (Map.Entry> entry : newProcessors.entrySet()) { if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Search processor [" + entry.getKey() + "] is already registered"); } @@ -173,7 +175,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), newConfiguration.getConfigAsMap(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -268,12 +271,27 @@ void validatePipeline(Map searchPipelineInfos throw new IllegalStateException("Search pipeline info is empty"); } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, namedWriteableRegistry); + Pipeline pipeline = Pipeline.create( + request.getId(), + pipelineConfig, + requestProcessorFactories, + responseProcessorFactories, + namedWriteableRegistry + ); List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { + for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { + for (Map.Entry entry : searchPipelineInfos.entrySet()) { + String type = processor.getType(); + if (entry.getValue().containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, type) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); + } + } + } + for (SearchResponseProcessor processor : pipeline.getSearchResponseProcessors()) { for (Map.Entry entry : searchPipelineInfos.entrySet()) { String type = processor.getType(); - if (entry.getValue().containsProcessor(type) == false) { + if (entry.getValue().containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, type) == false) { String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); } @@ -352,7 +370,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = Pipeline.create( AD_HOC_PIPELINE_ID, searchRequest.source().searchPipelineSource(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); } catch (Exception e) { @@ -385,17 +404,27 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce return new PipelinedRequest(pipeline, transformedRequest); } - Map getProcessorFactories() { - return processorFactories; + Map> getRequestProcessorFactories() { + return requestProcessorFactories; + } + + Map> getResponseProcessorFactories() { + return responseProcessorFactories; } @Override public SearchPipelineInfo info() { - List processorInfoList = new ArrayList<>(); - for (Map.Entry entry : processorFactories.entrySet()) { - processorInfoList.add(new ProcessorInfo(entry.getKey())); - } - return new SearchPipelineInfo(processorInfoList); + List requestProcessorInfoList = requestProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + List responseProcessorInfoList = responseProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + return new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, requestProcessorInfoList, Pipeline.RESPONSE_PROCESSORS_KEY, responseProcessorInfoList) + ); } public static List getPipelines(ClusterState clusterState, String... ids) { diff --git a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java index bec921bc5bf5d..cdd1c682b40dc 100644 --- a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java @@ -251,7 +251,7 @@ private static NodeInfo createNodeInfo() { for (int i = 0; i < numProcessors; i++) { processors.add(new org.opensearch.search.pipeline.ProcessorInfo(randomAlphaOfLengthBetween(3, 10))); } - searchPipelineInfo = new SearchPipelineInfo(processors); + searchPipelineInfo = new SearchPipelineInfo(Map.of(randomAlphaOfLengthBetween(3, 10), processors)); } return new NodeInfo( diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineInfoTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineInfoTests.java new file mode 100644 index 0000000000000..6eb137cb28e8f --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineInfoTests.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class SearchPipelineInfoTests extends OpenSearchTestCase { + public void testSerializationRoundtrip() throws IOException { + SearchPipelineInfo searchPipelineInfo = new SearchPipelineInfo( + Map.of( + "a", + List.of(new ProcessorInfo("a1"), new ProcessorInfo("a2"), new ProcessorInfo("a3")), + "b", + List.of(new ProcessorInfo("b1"), new ProcessorInfo("b2")), + "c", + List.of(new ProcessorInfo("c1")) + ) + ); + SearchPipelineInfo deserialized; + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + searchPipelineInfo.writeTo(bytesStreamOutput); + try (StreamInput bytesStreamInput = bytesStreamOutput.bytes().streamInput()) { + deserialized = new SearchPipelineInfo(bytesStreamInput); + } + } + assertTrue(deserialized.containsProcessor("a", "a1")); + assertTrue(deserialized.containsProcessor("a", "a2")); + assertTrue(deserialized.containsProcessor("a", "a3")); + assertTrue(deserialized.containsProcessor("b", "b1")); + assertTrue(deserialized.containsProcessor("b", "b2")); + assertTrue(deserialized.containsProcessor("c", "c1")); + } + + /** + * When serializing / deserializing to / from old versions, processor type info is lost. + * + * Also, we only supported request/response processors. + */ + public void testSerializationRoundtripBackcompat() throws IOException { + SearchPipelineInfo searchPipelineInfo = new SearchPipelineInfo( + Map.of( + Pipeline.REQUEST_PROCESSORS_KEY, + List.of(new ProcessorInfo("a1"), new ProcessorInfo("a2"), new ProcessorInfo("a3")), + Pipeline.RESPONSE_PROCESSORS_KEY, + List.of(new ProcessorInfo("b1"), new ProcessorInfo("b2")) + ) + ); + SearchPipelineInfo deserialized; + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + bytesStreamOutput.setVersion(Version.V_2_7_0); + searchPipelineInfo.writeTo(bytesStreamOutput); + try (StreamInput bytesStreamInput = bytesStreamOutput.bytes().streamInput()) { + bytesStreamInput.setVersion(Version.V_2_7_0); + deserialized = new SearchPipelineInfo(bytesStreamInput); + } + } + for (String proc : List.of("a1", "a2", "a3", "b1", "b2")) { + assertTrue(deserialized.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, proc)); + assertTrue(deserialized.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, proc)); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 36978d5310810..516227e9a13d8 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -60,9 +60,13 @@ public class SearchPipelineServiceTests extends OpenSearchTestCase { private static final SearchPipelinePlugin DUMMY_PLUGIN = new SearchPipelinePlugin() { @Override - public Map getProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Processor.Parameters parameters) { return Map.of("foo", (factories, tag, description, config) -> null); } + + public Map> getResponseProcessors(Processor.Parameters parameters) { + return Map.of("bar", (factories, tag, description, config) -> null); + } }; private ThreadPool threadPool; @@ -89,9 +93,14 @@ public void testSearchPipelinePlugin() { client, false ); - Map factories = searchPipelineService.getProcessorFactories(); - assertEquals(1, factories.size()); - assertTrue(factories.containsKey("foo")); + Map> requestProcessorFactories = searchPipelineService + .getRequestProcessorFactories(); + assertEquals(1, requestProcessorFactories.size()); + assertTrue(requestProcessorFactories.containsKey("foo")); + Map> responseProcessorFactories = searchPipelineService + .getResponseProcessorFactories(); + assertEquals(1, responseProcessorFactories.size()); + assertTrue(responseProcessorFactories.containsKey("bar")); } public void testSearchPipelinePluginDuplicate() { @@ -235,8 +244,8 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp } private SearchPipelineService createWithProcessors() { - Map processors = new HashMap<>(); - processors.put("scale_request_size", (processorFactories, tag, description, config) -> { + Map> requestProcessors = new HashMap<>(); + requestProcessors.put("scale_request_size", (processorFactories, tag, description, config) -> { float scale = ((Number) config.remove("scale")).floatValue(); return new FakeRequestProcessor( "scale_request_size", @@ -245,11 +254,12 @@ private SearchPipelineService createWithProcessors() { req -> req.source().size((int) (req.source().size() * scale)) ); }); - processors.put("fixed_score", (processorFactories, tag, description, config) -> { + Map> responseProcessors = new HashMap<>(); + responseProcessors.put("fixed_score", (processorFactories, tag, description, config) -> { float score = ((Number) config.remove("score")).floatValue(); return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score))); }); - return createWithProcessors(processors); + return createWithProcessors(requestProcessors, responseProcessors); } @Override @@ -258,7 +268,10 @@ protected NamedWriteableRegistry writableRegistry() { return new NamedWriteableRegistry(searchModule.getNamedWriteables()); } - private SearchPipelineService createWithProcessors(Map processors) { + private SearchPipelineService createWithProcessors( + Map> requestProcessors, + Map> responseProcessors + ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); @@ -274,8 +287,13 @@ private SearchPipelineService createWithProcessors(Map getProcessors(Processor.Parameters parameters) { - return processors; + public Map> getRequestProcessors(Processor.Parameters parameters) { + return requestProcessors; + } + + @Override + public Map> getResponseProcessors(Processor.Parameters parameters) { + return responseProcessors; } }), client, @@ -619,13 +637,14 @@ public void testValidatePipeline() throws Exception { XContentType.JSON ); + SearchPipelineInfo completePipelineInfo = new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor), Pipeline.RESPONSE_PROCESSORS_KEY, List.of(rspProcessor)) + ); + SearchPipelineInfo incompletePipelineInfo = new SearchPipelineInfo(Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor))); // One node is missing a processor expectThrows( OpenSearchParseException.class, - () -> searchPipelineService.validatePipeline( - Map.of(n1, new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), n2, new SearchPipelineInfo(List.of(reqProcessor))), - putRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, incompletePipelineInfo), putRequest) ); // Discovery failed, no infos passed. @@ -644,27 +663,11 @@ public void testValidatePipeline() throws Exception { ); expectThrows( ClassCastException.class, - () -> searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - badPutRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), badPutRequest) ); // Success - searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - putRequest - ); + searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), putRequest); } /** @@ -717,7 +720,7 @@ public void testInlinePipeline() throws Exception { public void testInfo() { SearchPipelineService searchPipelineService = createWithProcessors(); SearchPipelineInfo info = searchPipelineService.info(); - assertTrue(info.containsProcessor("scale_request_size")); - assertTrue(info.containsProcessor("fixed_score")); + assertTrue(info.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, "scale_request_size")); + assertTrue(info.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, "fixed_score")); } }