Skip to content

Commit

Permalink
[Search pipelines] Pass "adhocness" flag to processor factories (open…
Browse files Browse the repository at this point in the history
…search-project#8164)

* [Search pipelines] Pass "adhocness" flag to processor factories

A named search pipeline may be created with a PUT request, while
an "anonymous" or "ad hoc" search pipeline can be defined in the
search request source. In the latter case, we don't want to create any
"resource-heavy" processors, since they're potentially increasing the
cost of every search request, whereas names pipeline processors get
reused.

This change passes a configuration flag to a processor factory if it's
being called as part of an ad hoc pipeline. The factory can use that
information to avoid allocating expensive resources (maybe by throwing
an exception instead).

Signed-off-by: Michael Froh <froh@amazon.com>

* Pass pipeline creation source as enum

Thanks to @dblock for the suggestion to pass the pipeline creation
source in a way that accounts for possible future pipeline sources (and
lets us distinguish between actual named pipeline creation and the
validation create() that executes before we write a pipeline definition
to cluster state).

Signed-off-by: Michael Froh <froh@amazon.com>

* Move PipelineSource into PipelineContext and explicitly pass to create

Signed-off-by: Michael Froh <froh@amazon.com>

* Fix formatting on merge conflict resolution

Signed-off-by: Michael Froh <froh@amazon.com>

---------

Signed-off-by: Michael Froh <froh@amazon.com>
(cherry picked from commit 431b246)
  • Loading branch information
msfroh committed Jul 7, 2023
1 parent 56a24bb commit dab49cb
Show file tree
Hide file tree
Showing 13 changed files with 281 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Move ZSTD compression codecs out of the sandbox ([#7908](https://github.com/opensearch-project/OpenSearch/pull/7908))
- Update ZSTD default compression level ([#8471](https://github.com/opensearch-project/OpenSearch/pull/8471))
- Improved performance of parsing floating point numbers ([#8467](https://github.com/opensearch-project/OpenSearch/pull/8467))
- [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.ParseField;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
Expand Down Expand Up @@ -89,8 +89,8 @@ public SearchRequest processRequest(SearchRequest request) throws Exception {
}

static class Factory implements Processor.Factory<SearchRequestProcessor> {
private static final String QUERY_KEY = "query";
private final NamedXContentRegistry namedXContentRegistry;
public static final ParseField QUERY_FIELD = new ParseField("query");

Factory(NamedXContentRegistry namedXContentRegistry) {
this.namedXContentRegistry = namedXContentRegistry;
Expand All @@ -101,30 +101,21 @@ public FilterQueryRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Map<String, Object> query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY);
if (query == null) {
throw new IllegalArgumentException("Did not specify the " + QUERY_KEY + " property in processor of type " + TYPE);
}
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(query);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
) {
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
}
}
}
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
}
throw new IllegalArgumentException(
"Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public RenameFieldResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
Expand Down Expand Up @@ -127,6 +128,8 @@ SearchScript getPrecompiledSearchScript() {
* Factory class for creating {@link ScriptRequestProcessor}.
*/
public static final class Factory implements Processor.Factory<SearchRequestProcessor> {
private static final List<String> SCRIPT_CONFIG_KEYS = List.of("id", "source", "inline", "lang", "params", "options");

private final ScriptService scriptService;

/**
Expand All @@ -138,33 +141,29 @@ public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

/**
* Creates a new instance of {@link ScriptRequestProcessor}.
*
* @param registry The registry of processor factories.
* @param processorTag The processor's tag.
* @param description The processor's description.
* @param config The configuration options for the processor.
* @return The created {@link ScriptRequestProcessor} instance.
* @throws Exception if an error occurs during the creation process.
*/
@Override
public ScriptRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Map<String, Object> scriptConfig = new HashMap<>();
for (String key : SCRIPT_CONFIG_KEYS) {
Object val = config.remove(key);
if (val != null) {
scriptConfig.put(key, val);
}
}
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(scriptConfig);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
) {
Script script = Script.parse(parser);

Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove);

// verify script is able to be compiled before successfully creating processor.
SearchScript searchScript = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SearchPipelineCommonModulePlugin() {}
* @return A map of processor factories, where the keys are the processor types and the values are the corresponding factory instances.
*/
@Override
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
Expand All @@ -43,7 +43,7 @@ public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcesso
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.test.AbstractBuilderTestCase;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase {
Expand All @@ -37,15 +38,14 @@ public void testFilterQuery() throws Exception {

public void testFactory() throws Exception {
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
FilterQueryRequestProcessor processor = factory.create(
Collections.emptyMap(),
null,
null,
Map.of("query", Map.of("term", Map.of("field", "value")))
);
Map<String, Object> configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value"))));
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null);
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);

// Missing "query" parameter:
expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
expectThrows(
IllegalArgumentException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,15 @@ public void testFactory() throws Exception {
config.put("target_field", newField);

RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory();
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config);
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null);
assertEquals(processor.getType(), "rename_field");
assertEquals(processor.getOldField(), oldField);
assertEquals(processor.getNewField(), newField);
assertFalse(processor.isIgnoreMissing());

expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
expectThrows(
OpenSearchParseException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ teardown:
"request_processors": [
{
"script" : {
"tag": "empty_script",
"lang": "painless",
"source" : ""
}
Expand All @@ -38,6 +39,7 @@ teardown:
"request_processors": [
{
"script" : {
"tag": "working",
"lang" : "painless",
"source" : "ctx._source['size'] += 10; ctx._source['from'] = ctx._source['from'] <= 0 ? ctx._source['from'] : ctx._source['from'] - 1 ; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@

package org.opensearch.plugins;

import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.script.ScriptService;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.threadpool.Scheduler;

import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

/**
* An extension point for {@link Plugin} implementation to add custom search pipeline processors.
Expand All @@ -29,7 +40,7 @@ public interface SearchPipelinePlugin {
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Collections.emptyMap();
}

Expand All @@ -40,7 +51,7 @@ default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcess
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Collections.emptyMap();
}

Expand All @@ -51,7 +62,78 @@ default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProce
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Parameters parameters) {
return Collections.emptyMap();
}

/**
* Infrastructure class that holds services that can be used by processor factories to create processor instances
* and that gets passed around to all {@link SearchPipelinePlugin}s.
*/
class Parameters {

/**
* Useful to provide access to the node's environment like config directory to processor factories.
*/
public final Environment env;

/**
* Provides processors script support.
*/
public final ScriptService scriptService;

/**
* Provide analyzer support
*/
public final AnalysisRegistry analysisRegistry;

/**
* Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter}
* instances that have run while handling the current search.
*/
public final ThreadContext threadContext;

public final LongSupplier relativeTimeSupplier;

public final SearchPipelineService searchPipelineService;

public final Consumer<Runnable> genericExecutor;

public final NamedXContentRegistry namedXContentRegistry;

/**
* Provides scheduler support
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;

/**
* Provides access to the node's cluster client
*/
public final Client client;

public Parameters(
Environment env,
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
ThreadContext threadContext,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
SearchPipelineService searchPipelineService,
Client client,
Consumer<Runnable> genericExecutor,
NamedXContentRegistry namedXContentRegistry
) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.searchPipelineService = searchPipelineService;
this.client = client;
this.genericExecutor = genericExecutor;
this.namedXContentRegistry = namedXContentRegistry;
}

}
}
Loading

0 comments on commit dab49cb

Please sign in to comment.