diff --git a/CHANGELOG.md b/CHANGELOG.md index 30fc9be4f57ec..85aba2adef096 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,6 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) - [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118)) - Allow insecure string settings to warn-log usage and advise to migration of a newer secure variant ([#5496](https://github.com/opensearch-project/OpenSearch/pull/5496)) +- [Search Pipelines] Pass "adhocness" flag to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164)) ### Deprecated diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 7deb8faa03af6..a43335b73a72e 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -13,12 +13,12 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchRequestProcessor; @@ -89,8 +89,8 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { } static class Factory implements Processor.Factory { + private static final String QUERY_KEY = "query"; private final NamedXContentRegistry namedXContentRegistry; - public static final ParseField QUERY_FIELD = new ParseField("query"); Factory(NamedXContentRegistry namedXContentRegistry) { this.namedXContentRegistry = namedXContentRegistry; @@ -103,28 +103,18 @@ public FilterQueryRequestProcessor create( String description, Map config ) throws Exception { + Map query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY); + if (query == null) { + throw new IllegalArgumentException("Did not specify the " + QUERY_KEY + " property in processor of type " + TYPE); + } try ( - XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(query); InputStream stream = BytesReference.bytes(builder).streamInput(); XContentParser parser = XContentType.JSON.xContent() .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream) ) { - XContentParser.Token token = parser.nextToken(); - assert token == XContentParser.Token.START_OBJECT; - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); - } - } - } + return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); } - throw new IllegalArgumentException( - "Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE - ); } } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java index 1f355ac97c801..ea3008e005508 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java @@ -16,6 +16,7 @@ import org.opensearch.test.AbstractBuilderTestCase; import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase { @@ -37,12 +38,8 @@ public void testFilterQuery() throws Exception { public void testFactory() throws Exception { FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry()); - FilterQueryRequestProcessor processor = factory.create( - Collections.emptyMap(), - null, - null, - Map.of("query", Map.of("term", Map.of("field", "value"))) - ); + Map configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value")))); + FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap); assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery); // Missing "query" parameter: diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index c9a5f865d507e..ad712c63cb1fe 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -67,19 +67,20 @@ static Pipeline create( Map config, Map> requestProcessorFactories, Map> responseProcessorFactories, - NamedWriteableRegistry namedWriteableRegistry + NamedWriteableRegistry namedWriteableRegistry, + boolean isAdHoc ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); + List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs, isAdHoc); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); + List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs, isAdHoc); if (config.isEmpty() == false) { throw new OpenSearchParseException( "pipeline [" @@ -93,7 +94,8 @@ static Pipeline create( private static List readProcessors( Map> processorFactories, - List> requestProcessorConfigs + List> requestProcessorConfigs, + boolean isAdHoc ) throws Exception { List processors = new ArrayList<>(); if (requestProcessorConfigs == null) { @@ -106,9 +108,25 @@ private static List readProcessors( throw new IllegalArgumentException("Invalid processor type " + type); } Map config = (Map) entry.getValue(); + if (isAdHoc) { + config.put(Processor.AD_HOC_PIPELINE, true); + } String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); + config.remove(Processor.AD_HOC_PIPELINE); + if (config.isEmpty() == false) { + String processorName = type; + if (tag != null) { + processorName = processorName + ":" + tag; + } + throw new OpenSearchParseException( + "processor [" + + processorName + + "] doesn't support one or more provided configuration parameters " + + Arrays.toString(config.keySet().toArray()) + ); + } } } return Collections.unmodifiableList(processors); diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index ee28db1cc334d..dec901953628b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -33,6 +33,12 @@ * @opensearch.internal */ public interface Processor { + /** + * Processor configuration key to let the factory know that the pipeline is defined in a search request. + * For processors whose creation is expensive (e.g. creates a connection pool), the factory can reject + * the request or create a more lightweight (but possibly less efficient) version of the processor. + */ + String AD_HOC_PIPELINE = "ad_hoc_pipeline"; /** * Gets the type of processor diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 87c09bd971284..c8304a9a16d91 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -177,7 +177,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { newConfiguration.getConfigAsMap(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + false ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -276,7 +277,8 @@ void validatePipeline(Map searchPipelineInfos pipelineConfig, requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + false ); List exceptions = new ArrayList<>(); for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { @@ -372,7 +374,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce searchRequest.source().searchPipelineSource(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + true ); } catch (Exception e) { throw new SearchPipelineProcessingException(e); diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index d49d9fd41031c..7fc7c9172f16b 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -35,6 +35,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexSettings; import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -786,4 +787,38 @@ public void testExceptionOnResponseProcessing() throws Exception { // Exception thrown when processing response expectThrows(SearchPipelineProcessingException.class, () -> pipelinedRequest.transformResponse(response)); } + + public void testAdHocRejectingProcessor() { + String processorType = "ad_hoc_rejecting"; + Map> requestProcessorFactories = Map.of(processorType, (pf, t, d, c) -> { + if (ConfigurationUtils.readBooleanProperty(processorType, t, c, Processor.AD_HOC_PIPELINE, false)) { + throw new IllegalArgumentException(processorType + " cannot be created as part of a pipeline defined in a search request"); + } + return new FakeRequestProcessor(processorType, t, d, r -> {}); + }); + + SearchPipelineService searchPipelineService = createWithProcessors(requestProcessorFactories, Collections.emptyMap()); + + String id = "_id"; + SearchPipelineService.PipelineHolder pipeline = searchPipelineService.getPipelines().get(id); + assertNull(pipeline); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest( + id, + new BytesArray("{\"request_processors\":[" + " { \"" + processorType + "\": {}}" + "]}"), + XContentType.JSON + ); + ClusterState previousClusterState = clusterState; + clusterState = SearchPipelineService.innerPut(putRequest, clusterState); + // The following line successfully creates the pipeline: + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of(processorType, Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + + } }