Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search Pipelines] Split search pipeline processor factories by type #7597

Merged
merged 4 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253))
- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470))
- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377))
- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SearchRequest processRequest(SearchRequest request) throws Exception {
return request;
}

static class Factory implements Processor.Factory {
static class Factory implements Processor.Factory<SearchRequestProcessor> {
private final NamedXContentRegistry namedXContentRegistry;
public static final ParseField QUERY_FIELD = new ParseField("query");

Expand All @@ -85,7 +85,7 @@ static class Factory implements Processor.Factory {

@Override
public FilterQueryRequestProcessor create(
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
/**
* This is a factor that creates the RenameResponseProcessor
*/
public static final class Factory implements Processor.Factory {
public static final class Factory implements Processor.Factory<SearchResponseProcessor> {

/**
* Constructor for factory
Expand All @@ -137,7 +137,7 @@ public static final class Factory implements Processor.Factory {

@Override
public RenameFieldResponseProcessor create(
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Map;

Expand All @@ -25,12 +27,12 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi
public SearchPipelineCommonModulePlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
RenameFieldResponseProcessor.TYPE,
new RenameFieldResponseProcessor.Factory()
);
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry));
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
nodes.info: {}

- contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: rename_field } }
- contains: { nodes.$cluster_manager.search_pipelines.request_processors: { type: filter_query } }
- contains: { nodes.$cluster_manager.search_pipelines.response_processors: { type: rename_field } }
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugins;

import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Collections;
import java.util.Map;
Expand All @@ -20,13 +22,24 @@
*/
public interface SearchPipelinePlugin {
/**
* Returns additional search pipeline processor types added by this plugin.
* Returns additional search pipeline request processor types added by this plugin.
*
* The key of the returned {@link Map} is the unique name for the processor which is specified
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}

/**
* Returns additional search pipeline response processor types added by this plugin.
*
* The key of the returned {@link Map} is the unique name for the processor which is specified
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Pipeline {

private final NamedWriteableRegistry namedWriteableRegistry;

Pipeline(
private Pipeline(
String id,
@Nullable String description,
@Nullable Integer version,
Expand All @@ -62,31 +62,24 @@ class Pipeline {
this.namedWriteableRegistry = namedWriteableRegistry;
}

public static Pipeline create(
static Pipeline create(
String id,
Map<String, Object> config,
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
NamedWriteableRegistry namedWriteableRegistry
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY);
List<SearchRequestProcessor> requestProcessors = readProcessors(
SearchRequestProcessor.class,
processorFactories,
requestProcessorConfigs
);
List<SearchRequestProcessor> requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs);
List<Map<String, Object>> responseProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
config,
RESPONSE_PROCESSORS_KEY
);
List<SearchResponseProcessor> responseProcessors = readProcessors(
SearchResponseProcessor.class,
processorFactories,
responseProcessorConfigs
);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
msfroh marked this conversation as resolved.
Show resolved Hide resolved
if (config.isEmpty() == false) {
throw new OpenSearchParseException(
"pipeline ["
Expand All @@ -98,10 +91,8 @@ public static Pipeline create(
return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry);
}

@SuppressWarnings("unchecked") // Cast is checked using isInstance
private static <T extends Processor> List<T> readProcessors(
Class<T> processorType,
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<T>> processorFactories,
List<Map<String, Object>> requestProcessorConfigs
) throws Exception {
List<T> processors = new ArrayList<>();
Expand All @@ -117,22 +108,10 @@ private static <T extends Processor> List<T> readProcessors(
Map<String, Object> config = (Map<String, Object>) entry.getValue();
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config);
if (processorType.isInstance(processor)) {
processors.add((T) processor);
} else {
throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName());
}
processors.add(processorFactories.get(type).create(processorFactories, tag, description, config));
}
}
return processors;
}

List<Processor> flattenAllProcessors() {
List<Processor> allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size());
allProcessors.addAll(searchRequestProcessors);
allProcessors.addAll(searchResponseProcessors);
return allProcessors;
return Collections.unmodifiableList(processors);
}

String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
interface Factory {
interface Factory<T extends Processor> {

/**
* Creates a processor based on the specified map of maps config.
Expand All @@ -65,8 +65,7 @@ interface Factory {
* <b>Note:</b> Implementations are responsible for removing the used configuration
* keys, so that after creation the config map should be empty.
*/
Processor create(Map<String, Factory> processorFactories, String tag, String description, Map<String, Object> config)
throws Exception;
T create(Map<String, Factory<T>> processorFactories, String tag, String description, Map<String, Object> config) throws Exception;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@

package org.opensearch.search.pipeline;

import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.node.ReportingService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

/**
Expand All @@ -26,45 +30,84 @@
*/
public class SearchPipelineInfo implements ReportingService.Info {

private final Set<ProcessorInfo> processors;
private final Map<String, Set<ProcessorInfo>> processors = new TreeMap<>();

public SearchPipelineInfo(List<ProcessorInfo> processors) {
this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order
public SearchPipelineInfo(Map<String, List<ProcessorInfo>> processors) {
for (Map.Entry<String, List<ProcessorInfo>> processorsEntry : processors.entrySet()) {
// we use a treeset here to have a test-able / predictable order
this.processors.put(processorsEntry.getKey(), new TreeSet<>(processorsEntry.getValue()));
}
}

/**
* Read from a stream.
*/
public SearchPipelineInfo(StreamInput in) throws IOException {
processors = new TreeSet<>();
final int size = in.readVInt();
for (int i = 0; i < size; i++) {
processors.add(new ProcessorInfo(in));
// TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0)
if (in.getVersion().onOrBefore(Version.V_2_8_0)) {
// Prior to version 2.8, we had a flat list of processors. For best compatibility, assume they're valid
// request and response processor, since we couldn't tell the difference back then.
msfroh marked this conversation as resolved.
Show resolved Hide resolved
final int size = in.readVInt();
Set<ProcessorInfo> processorInfos = new TreeSet<>();
for (int i = 0; i < size; i++) {
processorInfos.add(new ProcessorInfo(in));
}
processors.put(Pipeline.REQUEST_PROCESSORS_KEY, processorInfos);
processors.put(Pipeline.RESPONSE_PROCESSORS_KEY, processorInfos);
} else {
final int numTypes = in.readVInt();
for (int i = 0; i < numTypes; i++) {
String type = in.readString();
int numProcessors = in.readVInt();
Set<ProcessorInfo> processorInfos = new TreeSet<>();
for (int j = 0; j < numProcessors; j++) {
processorInfos.add(new ProcessorInfo(in));
}
processors.put(type, processorInfos);
}
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("search_pipelines");
builder.startArray("processors");
for (ProcessorInfo info : processors) {
info.toXContent(builder, params);
for (Map.Entry<String, Set<ProcessorInfo>> processorEntry : processors.entrySet()) {
builder.startArray(processorEntry.getKey());
for (ProcessorInfo info : processorEntry.getValue()) {
info.toXContent(builder, params);
}
builder.endArray();
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.write(processors.size());
for (ProcessorInfo info : processors) {
info.writeTo(out);
// TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0)
if (out.getVersion().onOrBefore(Version.V_2_8_0)) {
// Prior to version 2.8, we grouped all processors into a single list.
Set<ProcessorInfo> processorInfos = new TreeSet<>();
processorInfos.addAll(processors.getOrDefault(Pipeline.REQUEST_PROCESSORS_KEY, Collections.emptySet()));
processorInfos.addAll(processors.getOrDefault(Pipeline.RESPONSE_PROCESSORS_KEY, Collections.emptySet()));
out.writeVInt(processorInfos.size());
for (ProcessorInfo processorInfo : processorInfos) {
processorInfo.writeTo(out);
}
} else {
out.write(processors.size());
for (Map.Entry<String, Set<ProcessorInfo>> processorsEntry : processors.entrySet()) {
out.writeString(processorsEntry.getKey());
out.writeVInt(processorsEntry.getValue().size());
for (ProcessorInfo processorInfo : processorsEntry.getValue()) {
processorInfo.writeTo(out);
}
}
}
}

public boolean containsProcessor(String type) {
return processors.contains(new ProcessorInfo(type));
public boolean containsProcessor(String processorType, String type) {
return processors.containsKey(processorType) && processors.get(processorType).contains(new ProcessorInfo(type));
}

@Override
Expand Down
Loading