Skip to content

Commit

Permalink
[Search pipelines] Pass "adhocness" flag to processor factories
Browse files Browse the repository at this point in the history
A named search pipeline may be created with a PUT request, while
an "anonymous" or "ad hoc" search pipeline can be defined in the
search request source. In the latter case, we don't want to create any
"resource-heavy" processors, since they're potentially increasing the
cost of every search request, whereas names pipeline processors get
reused.

This change passes a configuration flag to a processor factory if it's
being called as part of an ad hoc pipeline. The factory can use that
information to avoid allocating expensive resources (maybe by throwing
an exception instead).

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Jun 20, 2023
1 parent a72edf8 commit 4c8a77e
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321))
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118))
- Allow insecure string settings to warn-log usage and advise to migration of a newer secure variant ([#5496](https://github.com/opensearch-project/OpenSearch/pull/5496))
- [Search Pipelines] Pass "adhocness" flag to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.ParseField;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
Expand Down Expand Up @@ -89,8 +89,8 @@ public SearchRequest processRequest(SearchRequest request) throws Exception {
}

static class Factory implements Processor.Factory<SearchRequestProcessor> {
private static final String QUERY_KEY = "query";
private final NamedXContentRegistry namedXContentRegistry;
public static final ParseField QUERY_FIELD = new ParseField("query");

Factory(NamedXContentRegistry namedXContentRegistry) {
this.namedXContentRegistry = namedXContentRegistry;
Expand All @@ -103,28 +103,18 @@ public FilterQueryRequestProcessor create(
String description,
Map<String, Object> config
) throws Exception {
Map<String, Object> query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY);
if (query == null) {
throw new IllegalArgumentException("Did not specify the " + QUERY_KEY + " property in processor of type " + TYPE);
}
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(query);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
) {
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
}
}
}
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
}
throw new IllegalArgumentException(
"Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.test.AbstractBuilderTestCase;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase {
Expand All @@ -37,12 +38,8 @@ public void testFilterQuery() throws Exception {

public void testFactory() throws Exception {
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
FilterQueryRequestProcessor processor = factory.create(
Collections.emptyMap(),
null,
null,
Map.of("query", Map.of("term", Map.of("field", "value")))
);
Map<String, Object> configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value"))));
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap);
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);

// Missing "query" parameter:
Expand Down
26 changes: 22 additions & 4 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,20 @@ static Pipeline create(
Map<String, Object> config,
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
NamedWriteableRegistry namedWriteableRegistry
NamedWriteableRegistry namedWriteableRegistry,
boolean isAdHoc
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY);
List<SearchRequestProcessor> requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs);
List<SearchRequestProcessor> requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs, isAdHoc);
List<Map<String, Object>> responseProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
config,
RESPONSE_PROCESSORS_KEY
);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs, isAdHoc);
if (config.isEmpty() == false) {
throw new OpenSearchParseException(
"pipeline ["
Expand All @@ -93,7 +94,8 @@ static Pipeline create(

private static <T extends Processor> List<T> readProcessors(
Map<String, Processor.Factory<T>> processorFactories,
List<Map<String, Object>> requestProcessorConfigs
List<Map<String, Object>> requestProcessorConfigs,
boolean isAdHoc
) throws Exception {
List<T> processors = new ArrayList<>();
if (requestProcessorConfigs == null) {
Expand All @@ -106,9 +108,25 @@ private static <T extends Processor> List<T> readProcessors(
throw new IllegalArgumentException("Invalid processor type " + type);
}
Map<String, Object> config = (Map<String, Object>) entry.getValue();
if (isAdHoc) {
config.put(Processor.AD_HOC_PIPELINE, true);
}
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
processors.add(processorFactories.get(type).create(processorFactories, tag, description, config));
config.remove(Processor.AD_HOC_PIPELINE);
if (config.isEmpty() == false) {
String processorName = type;
if (tag != null) {
processorName = processorName + ":" + tag;
}
throw new OpenSearchParseException(
"processor ["
+ processorName
+ "] doesn't support one or more provided configuration parameters "
+ Arrays.toString(config.keySet().toArray())
);
}
}
}
return Collections.unmodifiableList(processors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
* @opensearch.internal
*/
public interface Processor {
/**
* Processor configuration key to let the factory know that the pipeline is defined in a search request.
* For processors whose creation is expensive (e.g. creates a connection pool), the factory can reject
* the request or create a more lightweight (but possibly less efficient) version of the processor.
*/
String AD_HOC_PIPELINE = "ad_hoc_pipeline";

/**
* Gets the type of processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) {
newConfiguration.getConfigAsMap(),
requestProcessorFactories,
responseProcessorFactories,
namedWriteableRegistry
namedWriteableRegistry,
false
);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));

Expand Down Expand Up @@ -276,7 +277,8 @@ void validatePipeline(Map<DiscoveryNode, SearchPipelineInfo> searchPipelineInfos
pipelineConfig,
requestProcessorFactories,
responseProcessorFactories,
namedWriteableRegistry
namedWriteableRegistry,
false
);
List<Exception> exceptions = new ArrayList<>();
for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) {
Expand Down Expand Up @@ -372,7 +374,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce
searchRequest.source().searchPipelineSource(),
requestProcessorFactories,
responseProcessorFactories,
namedWriteableRegistry
namedWriteableRegistry,
true
);
} catch (Exception e) {
throw new SearchPipelineProcessingException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand Down Expand Up @@ -775,7 +776,7 @@ public void testExceptionOnResponseProcessing() throws Exception {
SearchPipelineService searchPipelineService = createWithProcessors(Collections.emptyMap(), throwingResponseProcessorFactory);

Map<String, Object> pipelineSourceMap = new HashMap<>();
pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", Collections.emptyMap())));
pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", new HashMap<>())));

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap);
SearchRequest searchRequest = new SearchRequest().source(sourceBuilder);
Expand All @@ -786,4 +787,38 @@ public void testExceptionOnResponseProcessing() throws Exception {
// Exception thrown when processing response
expectThrows(SearchPipelineProcessingException.class, () -> pipelinedRequest.transformResponse(response));
}

public void testAdHocRejectingProcessor() {
String processorType = "ad_hoc_rejecting";
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories = Map.of(processorType, (pf, t, d, c) -> {
if (ConfigurationUtils.readBooleanProperty(processorType, t, c, Processor.AD_HOC_PIPELINE, false)) {
throw new IllegalArgumentException(processorType + " cannot be created as part of a pipeline defined in a search request");
}
return new FakeRequestProcessor(processorType, t, d, r -> {});
});

SearchPipelineService searchPipelineService = createWithProcessors(requestProcessorFactories, Collections.emptyMap());

String id = "_id";
SearchPipelineService.PipelineHolder pipeline = searchPipelineService.getPipelines().get(id);
assertNull(pipeline);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(
id,
new BytesArray("{\"request_processors\":[" + " { \"" + processorType + "\": {}}" + "]}"),
XContentType.JSON
);
ClusterState previousClusterState = clusterState;
clusterState = SearchPipelineService.innerPut(putRequest, clusterState);
// The following line successfully creates the pipeline:
searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

Map<String, Object> pipelineSourceMap = new HashMap<>();
pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of(processorType, Collections.emptyMap())));

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap);
SearchRequest searchRequest = new SearchRequest().source(sourceBuilder);
expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest));

}
}

0 comments on commit 4c8a77e

Please sign in to comment.