diff --git a/CHANGELOG.md b/CHANGELOG.md index f481fb514cdb5..af28fc97fe499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Move ZSTD compression codecs out of the sandbox ([#7908](https://github.com/opensearch-project/OpenSearch/pull/7908)) - Update ZSTD default compression level ([#8471](https://github.com/opensearch-project/OpenSearch/pull/8471)) - Improved performance of parsing floating point numbers ([#8467](https://github.com/opensearch-project/OpenSearch/pull/8467)) +- [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164)) ### Deprecated diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 7deb8faa03af6..d8862aa59cede 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -13,12 +13,12 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchRequestProcessor; @@ -89,8 +89,8 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { } static class Factory implements Processor.Factory { + private static final String QUERY_KEY = "query"; private final NamedXContentRegistry namedXContentRegistry; - public static final ParseField QUERY_FIELD = new ParseField("query"); Factory(NamedXContentRegistry namedXContentRegistry) { this.namedXContentRegistry = namedXContentRegistry; @@ -101,30 +101,21 @@ public FilterQueryRequestProcessor create( Map> processorFactories, String tag, String description, - Map config + Map config, + PipelineContext pipelineContext ) throws Exception { + Map query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY); + if (query == null) { + throw new IllegalArgumentException("Did not specify the " + QUERY_KEY + " property in processor of type " + TYPE); + } try ( - XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(query); InputStream stream = BytesReference.bytes(builder).streamInput(); XContentParser parser = XContentType.JSON.xContent() .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream) ) { - XContentParser.Token token = parser.nextToken(); - assert token == XContentParser.Token.START_OBJECT; - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); - } - } - } + return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); } - throw new IllegalArgumentException( - "Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE - ); } } } diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java index 4c40dda5928f0..c8b3c06a71562 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java @@ -140,7 +140,8 @@ public RenameFieldResponseProcessor create( Map> processorFactories, String tag, String description, - Map config + Map config, + PipelineContext pipelineContext ) throws Exception { String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field"); String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field"); diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java index 015411e0701a4..43ab3d4622d6b 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java @@ -29,7 +29,8 @@ import org.opensearch.search.pipeline.common.helpers.SearchRequestMap; import java.io.InputStream; -import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; @@ -127,6 +128,8 @@ SearchScript getPrecompiledSearchScript() { * Factory class for creating {@link ScriptRequestProcessor}. */ public static final class Factory implements Processor.Factory { + private static final List SCRIPT_CONFIG_KEYS = List.of("id", "source", "inline", "lang", "params", "options"); + private final ScriptService scriptService; /** @@ -138,33 +141,29 @@ public Factory(ScriptService scriptService) { this.scriptService = scriptService; } - /** - * Creates a new instance of {@link ScriptRequestProcessor}. - * - * @param registry The registry of processor factories. - * @param processorTag The processor's tag. - * @param description The processor's description. - * @param config The configuration options for the processor. - * @return The created {@link ScriptRequestProcessor} instance. - * @throws Exception if an error occurs during the creation process. - */ @Override public ScriptRequestProcessor create( Map> registry, String processorTag, String description, - Map config + Map config, + PipelineContext pipelineContext ) throws Exception { + Map scriptConfig = new HashMap<>(); + for (String key : SCRIPT_CONFIG_KEYS) { + Object val = config.remove(key); + if (val != null) { + scriptConfig.put(key, val); + } + } try ( - XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(scriptConfig); InputStream stream = BytesReference.bytes(builder).streamInput(); XContentParser parser = XContentType.JSON.xContent() .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream) ) { Script script = Script.parse(parser); - Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove); - // verify script is able to be compiled before successfully creating processor. SearchScript searchScript = null; try { diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index dc25de460fdba..49681b80fdead 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -33,7 +33,7 @@ public SearchPipelineCommonModulePlugin() {} * @return A map of processor factories, where the keys are the processor types and the values are the corresponding factory instances. */ @Override - public Map> getRequestProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Parameters parameters) { return Map.of( FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry), @@ -43,7 +43,7 @@ public Map> getRequestProcesso } @Override - public Map> getResponseProcessors(Processor.Parameters parameters) { + public Map> getResponseProcessors(Parameters parameters) { return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory()); } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java index 1f355ac97c801..ecf746af556a2 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java @@ -16,6 +16,7 @@ import org.opensearch.test.AbstractBuilderTestCase; import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase { @@ -37,15 +38,14 @@ public void testFilterQuery() throws Exception { public void testFactory() throws Exception { FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry()); - FilterQueryRequestProcessor processor = factory.create( - Collections.emptyMap(), - null, - null, - Map.of("query", Map.of("term", Map.of("field", "value"))) - ); + Map configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value")))); + FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null); assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery); // Missing "query" parameter: - expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap())); + expectThrows( + IllegalArgumentException.class, + () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null) + ); } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java index a2fc7f6acfa7c..7f3a2acfbdc08 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java @@ -115,12 +115,15 @@ public void testFactory() throws Exception { config.put("target_field", newField); RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory(); - RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config); + RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null); assertEquals(processor.getType(), "rename_field"); assertEquals(processor.getOldField(), oldField); assertEquals(processor.getNewField(), newField); assertFalse(processor.isIgnoreMissing()); - expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap())); + expectThrows( + OpenSearchParseException.class, + () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null) + ); } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml index 9d855e8a1861a..7a01e68acf75c 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml @@ -17,6 +17,7 @@ teardown: "request_processors": [ { "script" : { + "tag": "empty_script", "lang": "painless", "source" : "" } @@ -38,6 +39,7 @@ teardown: "request_processors": [ { "script" : { + "tag": "working", "lang" : "painless", "source" : "ctx._source['size'] += 10; ctx._source['from'] = ctx._source['from'] <= 0 ? ctx._source['from'] : ctx._source['from'] - 1 ; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];" } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java index 3d76bab93a60c..d2ef2b65c5944 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java @@ -8,13 +8,24 @@ package org.opensearch.plugins; +import org.opensearch.client.Client; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.script.ScriptService; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; +import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.pipeline.SearchRequestProcessor; import org.opensearch.search.pipeline.SearchResponseProcessor; +import org.opensearch.threadpool.Scheduler; import java.util.Collections; import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.LongSupplier; /** * An extension point for {@link Plugin} implementation to add custom search pipeline processors. @@ -29,7 +40,7 @@ public interface SearchPipelinePlugin { * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map> getRequestProcessors(Processor.Parameters parameters) { + default Map> getRequestProcessors(Parameters parameters) { return Collections.emptyMap(); } @@ -40,7 +51,7 @@ default Map> getRequestProcess * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map> getResponseProcessors(Processor.Parameters parameters) { + default Map> getResponseProcessors(Parameters parameters) { return Collections.emptyMap(); } @@ -51,7 +62,78 @@ default Map> getResponseProce * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map> getSearchPhaseResultsProcessors(Processor.Parameters parameters) { + default Map> getSearchPhaseResultsProcessors(Parameters parameters) { return Collections.emptyMap(); } + + /** + * Infrastructure class that holds services that can be used by processor factories to create processor instances + * and that gets passed around to all {@link SearchPipelinePlugin}s. + */ + class Parameters { + + /** + * Useful to provide access to the node's environment like config directory to processor factories. + */ + public final Environment env; + + /** + * Provides processors script support. + */ + public final ScriptService scriptService; + + /** + * Provide analyzer support + */ + public final AnalysisRegistry analysisRegistry; + + /** + * Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter} + * instances that have run while handling the current search. + */ + public final ThreadContext threadContext; + + public final LongSupplier relativeTimeSupplier; + + public final SearchPipelineService searchPipelineService; + + public final Consumer genericExecutor; + + public final NamedXContentRegistry namedXContentRegistry; + + /** + * Provides scheduler support + */ + public final BiFunction scheduler; + + /** + * Provides access to the node's cluster client + */ + public final Client client; + + public Parameters( + Environment env, + ScriptService scriptService, + AnalysisRegistry analysisRegistry, + ThreadContext threadContext, + LongSupplier relativeTimeSupplier, + BiFunction scheduler, + SearchPipelineService searchPipelineService, + Client client, + Consumer genericExecutor, + NamedXContentRegistry namedXContentRegistry + ) { + this.env = env; + this.scriptService = scriptService; + this.threadContext = threadContext; + this.analysisRegistry = analysisRegistry; + this.relativeTimeSupplier = relativeTimeSupplier; + this.scheduler = scheduler; + this.searchPipelineService = searchPipelineService; + this.client = client; + this.genericExecutor = genericExecutor; + this.namedXContentRegistry = namedXContentRegistry; + } + + } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java index 612e979e56070..060894a37e5ed 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineWithMetrics.java @@ -77,19 +77,28 @@ static PipelineWithMetrics create( Map> phaseResultsProcessorFactories, NamedWriteableRegistry namedWriteableRegistry, OperationMetrics totalRequestProcessingMetrics, - OperationMetrics totalResponseProcessingMetrics + OperationMetrics totalResponseProcessingMetrics, + Processor.PipelineContext pipelineContext ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); + List requestProcessors = readProcessors( + requestProcessorFactories, + requestProcessorConfigs, + pipelineContext + ); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); + List responseProcessors = readProcessors( + responseProcessorFactories, + responseProcessorConfigs, + pipelineContext + ); List> phaseResultsProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, @@ -98,7 +107,8 @@ static PipelineWithMetrics create( ); List phaseResultsProcessors = readProcessors( phaseResultsProcessorFactories, - phaseResultsProcessorConfigs + phaseResultsProcessorConfigs, + pipelineContext ); if (config.isEmpty() == false) { throw new OpenSearchParseException( @@ -125,7 +135,8 @@ static PipelineWithMetrics create( private static List readProcessors( Map> processorFactories, - List> requestProcessorConfigs + List> requestProcessorConfigs, + Processor.PipelineContext pipelineContext ) throws Exception { List processors = new ArrayList<>(); if (requestProcessorConfigs == null) { @@ -140,7 +151,19 @@ private static List readProcessors( Map config = (Map) entry.getValue(); String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); - processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); + processors.add(processorFactories.get(type).create(processorFactories, tag, description, config, pipelineContext)); + if (config.isEmpty() == false) { + String processorName = type; + if (tag != null) { + processorName = processorName + ":" + tag; + } + throw new OpenSearchParseException( + "processor [" + + processorName + + "] doesn't support one or more provided configuration parameters: " + + Arrays.toString(config.keySet().toArray()) + ); + } } } return Collections.unmodifiableList(processors); diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index ee28db1cc334d..cc96132479c74 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -8,19 +8,7 @@ package org.opensearch.search.pipeline; -import org.opensearch.client.Client; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.env.Environment; -import org.opensearch.index.analysis.AnalysisRegistry; -import org.opensearch.plugins.SearchPipelinePlugin; -import org.opensearch.script.ScriptService; -import org.opensearch.threadpool.Scheduler; - import java.util.Map; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.LongSupplier; /** * A processor implementation may modify the request or response from a search call. @@ -33,6 +21,12 @@ * @opensearch.internal */ public interface Processor { + /** + * Processor configuration key to let the factory know the context for pipeline creation. + *

+ * See {@link PipelineSource}. + */ + String PIPELINE_SOURCE = "pipeline_source"; /** * Gets the type of processor @@ -61,81 +55,45 @@ interface Factory { * @param tag The tag for the processor * @param description A short description of what this processor does * @param config The configuration for the processor - * * Note: Implementations are responsible for removing the used configuration * keys, so that after creation the config map should be empty. + * @param pipelineContext Contextual information about the enclosing pipeline. */ - T create(Map> processorFactories, String tag, String description, Map config) throws Exception; + T create( + Map> processorFactories, + String tag, + String description, + Map config, + PipelineContext pipelineContext + ) throws Exception; } /** - * Infrastructure class that holds services that can be used by processor factories to create processor instances - * and that gets passed around to all {@link SearchPipelinePlugin}s. + * Contextual information about the enclosing pipeline. A processor factory may change processor initialization behavior or + * pass this information to the created processor instance. */ - class Parameters { + class PipelineContext { + private final PipelineSource pipelineSource; - /** - * Useful to provide access to the node's environment like config directory to processor factories. - */ - public final Environment env; - - /** - * Provides processors script support. - */ - public final ScriptService scriptService; - - /** - * Provide analyzer support - */ - public final AnalysisRegistry analysisRegistry; - - /** - * Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter} - * instances that have run while handling the current search. - */ - public final ThreadContext threadContext; - - public final LongSupplier relativeTimeSupplier; - - public final SearchPipelineService searchPipelineService; - - public final Consumer genericExecutor; - - public final NamedXContentRegistry namedXContentRegistry; - - /** - * Provides scheduler support - */ - public final BiFunction scheduler; - - /** - * Provides access to the node's cluster client - */ - public final Client client; + public PipelineContext(PipelineSource pipelineSource) { + this.pipelineSource = pipelineSource; + } - public Parameters( - Environment env, - ScriptService scriptService, - AnalysisRegistry analysisRegistry, - ThreadContext threadContext, - LongSupplier relativeTimeSupplier, - BiFunction scheduler, - SearchPipelineService searchPipelineService, - Client client, - Consumer genericExecutor, - NamedXContentRegistry namedXContentRegistry - ) { - this.env = env; - this.scriptService = scriptService; - this.threadContext = threadContext; - this.analysisRegistry = analysisRegistry; - this.relativeTimeSupplier = relativeTimeSupplier; - this.scheduler = scheduler; - this.searchPipelineService = searchPipelineService; - this.client = client; - this.genericExecutor = genericExecutor; - this.namedXContentRegistry = namedXContentRegistry; + public PipelineSource getPipelineSource() { + return pipelineSource; } + } + /** + * A processor factory may change the processor initialization behavior based on the creation context (e.g. avoiding + * creating expensive resources during validation or in a request-scoped pipeline.) + */ + enum PipelineSource { + // A named pipeline is being created or updated + UPDATE_PIPELINE, + // Pipeline is defined within a search request + SEARCH_REQUEST, + // A named pipeline is being validated before being written to cluster state + VALIDATE_PIPELINE } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 70dc8546a077f..83a7a0564467e 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -103,7 +103,7 @@ public SearchPipelineService( this.scriptService = scriptService; this.threadPool = threadPool; this.namedWriteableRegistry = namedWriteableRegistry; - Processor.Parameters parameters = new Processor.Parameters( + SearchPipelinePlugin.Parameters parameters = new SearchPipelinePlugin.Parameters( env, scriptService, analysisRegistry, @@ -189,7 +189,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { phaseInjectorProcessorFactories, namedWriteableRegistry, totalRequestProcessingMetrics, - totalResponseProcessingMetrics + totalResponseProcessingMetrics, + new Processor.PipelineContext(Processor.PipelineSource.UPDATE_PIPELINE) ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -289,7 +290,8 @@ void validatePipeline(Map searchPipelineInfos phaseInjectorProcessorFactories, namedWriteableRegistry, new OperationMetrics(), // Use ephemeral metrics for validation - new OperationMetrics() + new OperationMetrics(), + new Processor.PipelineContext(Processor.PipelineSource.VALIDATE_PIPELINE) ); List exceptions = new ArrayList<>(); for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { @@ -388,7 +390,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { phaseInjectorProcessorFactories, namedWriteableRegistry, totalRequestProcessingMetrics, - totalResponseProcessingMetrics + totalResponseProcessingMetrics, + new Processor.PipelineContext(Processor.PipelineSource.SEARCH_REQUEST) ); } catch (Exception e) { throw new SearchPipelineProcessingException(e); diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 2ac0b2136ddd9..84f39e4bdab42 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -77,19 +77,17 @@ public class SearchPipelineServiceTests extends OpenSearchTestCase { private static final SearchPipelinePlugin DUMMY_PLUGIN = new SearchPipelinePlugin() { @Override - public Map> getRequestProcessors(Processor.Parameters parameters) { - return Map.of("foo", (factories, tag, description, config) -> null); + public Map> getRequestProcessors(Parameters parameters) { + return Map.of("foo", (factories, tag, description, config, ctx) -> null); } - public Map> getResponseProcessors(Processor.Parameters parameters) { - return Map.of("bar", (factories, tag, description, config) -> null); + public Map> getResponseProcessors(Parameters parameters) { + return Map.of("bar", (factories, tag, description, config, ctx) -> null); } @Override - public Map> getSearchPhaseResultsProcessors( - Processor.Parameters parameters - ) { - return Map.of("zoe", (factories, tag, description, config) -> null); + public Map> getSearchPhaseResultsProcessors(Parameters parameters) { + return Map.of("zoe", (factories, tag, description, config, ctx) -> null); } }; @@ -303,7 +301,7 @@ public SearchPhaseName getAfterPhase() { private SearchPipelineService createWithProcessors() { Map> requestProcessors = new HashMap<>(); - requestProcessors.put("scale_request_size", (processorFactories, tag, description, config) -> { + requestProcessors.put("scale_request_size", (processorFactories, tag, description, config, ctx) -> { float scale = ((Number) config.remove("scale")).floatValue(); return new FakeRequestProcessor( "scale_request_size", @@ -313,13 +311,13 @@ private SearchPipelineService createWithProcessors() { ); }); Map> responseProcessors = new HashMap<>(); - responseProcessors.put("fixed_score", (processorFactories, tag, description, config) -> { + responseProcessors.put("fixed_score", (processorFactories, tag, description, config, ctx) -> { float score = ((Number) config.remove("score")).floatValue(); return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score))); }); Map> searchPhaseProcessors = new HashMap<>(); - searchPhaseProcessors.put("max_score", (processorFactories, tag, description, config) -> { + searchPhaseProcessors.put("max_score", (processorFactories, tag, description, config, context) -> { final float finalScore = config.containsKey("score") ? ((Number) config.remove("score")).floatValue() : 100f; final Consumer querySearchResultConsumer = (result) -> result.queryResult().topDocs().maxScore = finalScore; return new FakeSearchPhaseResultsProcessor("max_score", tag, description, querySearchResultConsumer); @@ -354,19 +352,17 @@ private SearchPipelineService createWithProcessors( this.writableRegistry(), Collections.singletonList(new SearchPipelinePlugin() { @Override - public Map> getRequestProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Parameters parameters) { return requestProcessors; } @Override - public Map> getResponseProcessors(Processor.Parameters parameters) { + public Map> getResponseProcessors(Parameters parameters) { return responseProcessors; } @Override - public Map> getSearchPhaseResultsProcessors( - Processor.Parameters parameters - ) { + public Map> getSearchPhaseResultsProcessors(Parameters parameters) { return phaseProcessors; } @@ -897,10 +893,9 @@ public void testInfo() { } public void testExceptionOnPipelineCreation() { - Map> badFactory = Map.of( - "bad_factory", - (pf, t, f, c) -> { throw new RuntimeException(); } - ); + Map> badFactory = Map.of("bad_factory", (pf, t, f, c, ctx) -> { + throw new RuntimeException(); + }); SearchPipelineService searchPipelineService = createWithProcessors(badFactory, Collections.emptyMap(), Collections.emptyMap()); Map pipelineSourceMap = new HashMap<>(); @@ -920,7 +915,7 @@ public void testExceptionOnRequestProcessing() { }); Map> throwingRequestProcessorFactory = Map.of( "throwing_request", - (pf, t, f, c) -> throwingRequestProcessor + (pf, t, f, c, ctx) -> throwingRequestProcessor ); SearchPipelineService searchPipelineService = createWithProcessors( @@ -945,7 +940,7 @@ public void testExceptionOnResponseProcessing() throws Exception { }); Map> throwingResponseProcessorFactory = Map.of( "throwing_response", - (pf, t, f, c) -> throwingResponseProcessor + (pf, t, f, c, ctx) -> throwingResponseProcessor ); SearchPipelineService searchPipelineService = createWithProcessors( @@ -955,7 +950,7 @@ public void testExceptionOnResponseProcessing() throws Exception { ); Map pipelineSourceMap = new HashMap<>(); - pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", Collections.emptyMap()))); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", new HashMap<>()))); SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); @@ -973,18 +968,18 @@ public void testStats() throws Exception { }); Map> requestProcessors = Map.of( "successful_request", - (pf, t, f, c) -> new FakeRequestProcessor("successful_request", "2", null, r -> {}), + (pf, t, f, c, ctx) -> new FakeRequestProcessor("successful_request", "2", null, r -> {}), "throwing_request", - (pf, t, f, c) -> throwingRequestProcessor + (pf, t, f, c, ctx) -> throwingRequestProcessor ); SearchResponseProcessor throwingResponseProcessor = new FakeResponseProcessor("throwing_response", "3", null, r -> { throw new RuntimeException(); }); Map> responseProcessors = Map.of( "successful_response", - (pf, t, f, c) -> new FakeResponseProcessor("successful_response", "4", null, r -> {}), + (pf, t, f, c, ctx) -> new FakeResponseProcessor("successful_response", "4", null, r -> {}), "throwing_response", - (pf, t, f, c) -> throwingResponseProcessor + (pf, t, f, c, ctx) -> throwingResponseProcessor ); SearchPipelineService searchPipelineService = createWithProcessors(requestProcessors, responseProcessors, Collections.emptyMap()); @@ -1088,4 +1083,64 @@ private static void assertPipelineStats(OperationStats stats, long count, long f assertEquals(stats.getCount(), count); assertEquals(stats.getFailedCount(), failed); } + + public void testAdHocRejectingProcessor() { + String processorType = "ad_hoc_rejecting"; + Map> requestProcessorFactories = Map.of(processorType, (pf, t, d, c, ctx) -> { + if (ctx.getPipelineSource() == Processor.PipelineSource.SEARCH_REQUEST) { + throw new IllegalArgumentException(processorType + " cannot be created as part of a pipeline defined in a search request"); + } + return new FakeRequestProcessor(processorType, t, d, r -> {}); + }); + + SearchPipelineService searchPipelineService = createWithProcessors( + requestProcessorFactories, + Collections.emptyMap(), + Collections.emptyMap() + ); + + String id = "_id"; + SearchPipelineService.PipelineHolder pipeline = searchPipelineService.getPipelines().get(id); + assertNull(pipeline); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest( + id, + new BytesArray("{\"request_processors\":[" + " { \"" + processorType + "\": {}}" + "]}"), + XContentType.JSON + ); + ClusterState previousClusterState = clusterState; + clusterState = SearchPipelineService.innerPut(putRequest, clusterState); + // The following line successfully creates the pipeline: + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of(processorType, Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + } + + public void testExtraParameterInProcessorConfig() { + SearchPipelineService searchPipelineService = createWithProcessors(); + + Map pipelineSourceMap = new HashMap<>(); + Map processorConfig = new HashMap<>( + Map.of("score", 1.0f, "tag", "my_tag", "comment", "I just like to add extra parameters so that I feel like I'm being heard.") + ); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("fixed_score", processorConfig))); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + try { + searchPipelineService.resolvePipeline(searchRequest); + fail("Exception should have been thrown"); + } catch (SearchPipelineProcessingException e) { + assertTrue( + e.getMessage() + .contains("processor [fixed_score:my_tag] doesn't support one or more provided configuration parameters: [comment]") + ); + } catch (Exception e) { + fail("Wrong exception type: " + e.getClass()); + } + } }