Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search pipelines] Pass "adhocness" flag to processor factories #8164

Merged
merged 4 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Check UTF16 string size before converting to String to avoid OOME ([#7963](https://github.com/opensearch-project/OpenSearch/pull/7963))
- Move ZSTD compression codecs out of the sandbox ([#7908](https://github.com/opensearch-project/OpenSearch/pull/7908))
- Update ZSTD default compression level ([#8471](https://github.com/opensearch-project/OpenSearch/pull/8471))
- [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164))


### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.ParseField;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
Expand Down Expand Up @@ -89,8 +89,8 @@ public SearchRequest processRequest(SearchRequest request) throws Exception {
}

static class Factory implements Processor.Factory<SearchRequestProcessor> {
private static final String QUERY_KEY = "query";
private final NamedXContentRegistry namedXContentRegistry;
public static final ParseField QUERY_FIELD = new ParseField("query");

Factory(NamedXContentRegistry namedXContentRegistry) {
this.namedXContentRegistry = namedXContentRegistry;
Expand All @@ -101,30 +101,21 @@ public FilterQueryRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Map<String, Object> query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY);
if (query == null) {
throw new IllegalArgumentException("Did not specify the " + QUERY_KEY + " property in processor of type " + TYPE);
}
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(query);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
) {
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
}
}
}
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
}
throw new IllegalArgumentException(
"Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public RenameFieldResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
Expand Down Expand Up @@ -127,6 +128,8 @@ SearchScript getPrecompiledSearchScript() {
* Factory class for creating {@link ScriptRequestProcessor}.
*/
public static final class Factory implements Processor.Factory<SearchRequestProcessor> {
private static final List<String> SCRIPT_CONFIG_KEYS = List.of("id", "source", "inline", "lang", "params", "options");

private final ScriptService scriptService;

/**
Expand All @@ -138,33 +141,29 @@ public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

/**
* Creates a new instance of {@link ScriptRequestProcessor}.
*
* @param registry The registry of processor factories.
* @param processorTag The processor's tag.
* @param description The processor's description.
* @param config The configuration options for the processor.
* @return The created {@link ScriptRequestProcessor} instance.
* @throws Exception if an error occurs during the creation process.
*/
@Override
public ScriptRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Map<String, Object> scriptConfig = new HashMap<>();
for (String key : SCRIPT_CONFIG_KEYS) {
Object val = config.remove(key);
if (val != null) {
scriptConfig.put(key, val);
}
}
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(scriptConfig);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
) {
Script script = Script.parse(parser);

Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove);

// verify script is able to be compiled before successfully creating processor.
SearchScript searchScript = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SearchPipelineCommonModulePlugin() {}
* @return A map of processor factories, where the keys are the processor types and the values are the corresponding factory instances.
*/
@Override
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
Expand All @@ -43,7 +43,7 @@ public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcesso
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.test.AbstractBuilderTestCase;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase {
Expand All @@ -37,15 +38,14 @@ public void testFilterQuery() throws Exception {

public void testFactory() throws Exception {
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
FilterQueryRequestProcessor processor = factory.create(
Collections.emptyMap(),
null,
null,
Map.of("query", Map.of("term", Map.of("field", "value")))
);
Map<String, Object> configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value"))));
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null);
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);

// Missing "query" parameter:
expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
expectThrows(
IllegalArgumentException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,15 @@ public void testFactory() throws Exception {
config.put("target_field", newField);

RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory();
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config);
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null);
assertEquals(processor.getType(), "rename_field");
assertEquals(processor.getOldField(), oldField);
assertEquals(processor.getNewField(), newField);
assertFalse(processor.isIgnoreMissing());

expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
expectThrows(
OpenSearchParseException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ teardown:
"request_processors": [
{
"script" : {
"tag": "empty_script",
"lang": "painless",
"source" : ""
}
Expand All @@ -38,6 +39,7 @@ teardown:
"request_processors": [
{
"script" : {
"tag": "working",
"lang" : "painless",
"source" : "ctx._source['size'] += 10; ctx._source['from'] = ctx._source['from'] <= 0 ? ctx._source['from'] : ctx._source['from'] - 1 ; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@

package org.opensearch.plugins;

import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.script.ScriptService;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.threadpool.Scheduler;

import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

/**
* An extension point for {@link Plugin} implementation to add custom search pipeline processors.
Expand All @@ -29,7 +40,7 @@ public interface SearchPipelinePlugin {
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Collections.emptyMap();
}

Expand All @@ -40,7 +51,7 @@ default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcess
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Collections.emptyMap();
}

Expand All @@ -51,7 +62,78 @@ default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProce
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Parameters parameters) {
return Collections.emptyMap();
}

/**
* Infrastructure class that holds services that can be used by processor factories to create processor instances
* and that gets passed around to all {@link SearchPipelinePlugin}s.
*/
class Parameters {

/**
* Useful to provide access to the node's environment like config directory to processor factories.
*/
public final Environment env;

/**
* Provides processors script support.
*/
public final ScriptService scriptService;

/**
* Provide analyzer support
*/
public final AnalysisRegistry analysisRegistry;

/**
* Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter}
* instances that have run while handling the current search.
*/
public final ThreadContext threadContext;

public final LongSupplier relativeTimeSupplier;

public final SearchPipelineService searchPipelineService;

public final Consumer<Runnable> genericExecutor;

public final NamedXContentRegistry namedXContentRegistry;

/**
* Provides scheduler support
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;

/**
* Provides access to the node's cluster client
*/
public final Client client;

public Parameters(
Environment env,
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
ThreadContext threadContext,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
SearchPipelineService searchPipelineService,
Client client,
Consumer<Runnable> genericExecutor,
NamedXContentRegistry namedXContentRegistry
) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.searchPipelineService = searchPipelineService;
this.client = client;
this.genericExecutor = genericExecutor;
this.namedXContentRegistry = namedXContentRegistry;
}

}
}
Loading