From b0deccb770f249effab4ef1b889df4c2c66f867b Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 20 Jun 2023 00:18:45 +0000 Subject: [PATCH] [Search pipelines] Pass "adhocness" flag to processor factories A named search pipeline may be created with a PUT request, while an "anonymous" or "ad hoc" search pipeline can be defined in the search request source. In the latter case, we don't want to create any "resource-heavy" processors, since they're potentially increasing the cost of every search request, whereas names pipeline processors get reused. This change passes a configuration flag to a processor factory if it's being called as part of an ad hoc pipeline. The factory can use that information to avoid allocating expensive resources (maybe by throwing an exception instead). Signed-off-by: Michael Froh --- .../common/FilterQueryRequestProcessor.java | 26 +++++--------- .../FilterQueryRequestProcessorTests.java | 9 ++--- .../opensearch/search/pipeline/Pipeline.java | 29 ++++++++++++--- .../opensearch/search/pipeline/Processor.java | 6 ++++ .../pipeline/SearchPipelineService.java | 9 +++-- .../pipeline/SearchPipelineServiceTests.java | 36 +++++++++++++++++++ 6 files changed, 84 insertions(+), 31 deletions(-) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 7deb8faa03af6..a43335b73a72e 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -13,12 +13,12 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchRequestProcessor; @@ -89,8 +89,8 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { } static class Factory implements Processor.Factory { + private static final String QUERY_KEY = "query"; private final NamedXContentRegistry namedXContentRegistry; - public static final ParseField QUERY_FIELD = new ParseField("query"); Factory(NamedXContentRegistry namedXContentRegistry) { this.namedXContentRegistry = namedXContentRegistry; @@ -103,28 +103,18 @@ public FilterQueryRequestProcessor create( String description, Map config ) throws Exception { + Map query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY); + if (query == null) { + throw new IllegalArgumentException("Did not specify the " + QUERY_KEY + " property in processor of type " + TYPE); + } try ( - XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); + XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(query); InputStream stream = BytesReference.bytes(builder).streamInput(); XContentParser parser = XContentType.JSON.xContent() .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream) ) { - XContentParser.Token token = parser.nextToken(); - assert token == XContentParser.Token.START_OBJECT; - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); - } - } - } + return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); } - throw new IllegalArgumentException( - "Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE - ); } } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java index 1f355ac97c801..ea3008e005508 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java @@ -16,6 +16,7 @@ import org.opensearch.test.AbstractBuilderTestCase; import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase { @@ -37,12 +38,8 @@ public void testFilterQuery() throws Exception { public void testFactory() throws Exception { FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry()); - FilterQueryRequestProcessor processor = factory.create( - Collections.emptyMap(), - null, - null, - Map.of("query", Map.of("term", Map.of("field", "value"))) - ); + Map configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value")))); + FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap); assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery); // Missing "query" parameter: diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index c9a5f865d507e..e3b4d26afb916 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,19 +68,20 @@ static Pipeline create( Map config, Map> requestProcessorFactories, Map> responseProcessorFactories, - NamedWriteableRegistry namedWriteableRegistry + NamedWriteableRegistry namedWriteableRegistry, + boolean isAdHoc ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); + List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs, isAdHoc); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); + List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs, isAdHoc); if (config.isEmpty() == false) { throw new OpenSearchParseException( "pipeline [" @@ -93,7 +95,8 @@ static Pipeline create( private static List readProcessors( Map> processorFactories, - List> requestProcessorConfigs + List> requestProcessorConfigs, + boolean isAdHoc ) throws Exception { List processors = new ArrayList<>(); if (requestProcessorConfigs == null) { @@ -106,9 +109,27 @@ private static List readProcessors( throw new IllegalArgumentException("Invalid processor type " + type); } Map config = (Map) entry.getValue(); + if (isAdHoc) { + Map newConfig = new HashMap<>(config); + newConfig.put(Processor.AD_HOC_PIPELINE, true); + config = newConfig; + } String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); + config.remove(Processor.AD_HOC_PIPELINE); + if (config.isEmpty() == false) { + String processorName = type; + if (tag != null) { + processorName = processorName + ":" + tag; + } + throw new OpenSearchParseException( + "processor [" + + processorName + + "] doesn't support one or more provided configuration parameters " + + Arrays.toString(config.keySet().toArray()) + ); + } } } return Collections.unmodifiableList(processors); diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index ee28db1cc334d..dec901953628b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -33,6 +33,12 @@ * @opensearch.internal */ public interface Processor { + /** + * Processor configuration key to let the factory know that the pipeline is defined in a search request. + * For processors whose creation is expensive (e.g. creates a connection pool), the factory can reject + * the request or create a more lightweight (but possibly less efficient) version of the processor. + */ + String AD_HOC_PIPELINE = "ad_hoc_pipeline"; /** * Gets the type of processor diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 87c09bd971284..c8304a9a16d91 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -177,7 +177,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { newConfiguration.getConfigAsMap(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + false ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -276,7 +277,8 @@ void validatePipeline(Map searchPipelineInfos pipelineConfig, requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + false ); List exceptions = new ArrayList<>(); for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { @@ -372,7 +374,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce searchRequest.source().searchPipelineSource(), requestProcessorFactories, responseProcessorFactories, - namedWriteableRegistry + namedWriteableRegistry, + true ); } catch (Exception e) { throw new SearchPipelineProcessingException(e); diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index d49d9fd41031c..9d360399524bb 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -35,6 +35,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexSettings; import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -786,4 +787,39 @@ public void testExceptionOnResponseProcessing() throws Exception { // Exception thrown when processing response expectThrows(SearchPipelineProcessingException.class, () -> pipelinedRequest.transformResponse(response)); } + + public void testAdHocRejectingProcessor() { + String processorType = "ad_hoc_rejecting"; + Map> requestProcessorFactories = Map.of( + processorType, + (pf, t, d, c) -> { + if (ConfigurationUtils.readBooleanProperty(processorType, t, c, Processor.AD_HOC_PIPELINE, false)) { + throw new IllegalArgumentException(processorType + " cannot be created as part of a pipeline defined in a search request"); + } + return new FakeRequestProcessor(processorType, t, d, r -> {}); + } + ); + + SearchPipelineService searchPipelineService = createWithProcessors(requestProcessorFactories, Collections.emptyMap()); + + String id = "_id"; + SearchPipelineService.PipelineHolder pipeline = searchPipelineService.getPipelines().get(id); + assertNull(pipeline); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(id, new BytesArray("{\"request_processors\":[" + + " { \"" + processorType + "\": {}}" + + "]}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = SearchPipelineService.innerPut(putRequest, clusterState); + // The following line successfully creates the pipeline: + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of(processorType, Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + + } }