Skip to content

Commit

Permalink
Add the ability to require an ingest pipeline (#46847)
Browse files Browse the repository at this point in the history
This commit adds the ability to require an ingest pipeline on an
index. Today we can have a default pipeline, but that could be
overridden by a request pipeline parameter. This commit introduces a new
index setting index.required_pipeline that acts similarly to
index.default_pipeline, except that it can not be overridden by a
request pipeline parameter. Additionally, a default pipeline and a
request pipeline can not both be set. The required pipeline can be set
to _none to ensure that no pipeline ever runs for index requests on that
index.
jasontedor committed Sep 19, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 251dbd8 commit bd77626
Showing 9 changed files with 513 additions and 32 deletions.
9 changes: 8 additions & 1 deletion docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
@@ -234,13 +234,20 @@ specific index module:
The length of time that a <<delete-versioning,deleted document's version number>> remains available for <<index-versioning,further versioned operations>>.
Defaults to `60s`.

`index.default_pipeline`::
`index.default_pipeline`::

The default <<ingest,ingest node>> pipeline for this index. Index requests will fail
if the default pipeline is set and the pipeline does not exist. The default may be
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
no ingest pipeline should be run.

`index.required_pipeline`::
The required <<ingest,ingest node>> pipeline for this index. Index requests
will fail if the required pipeline is set and the pipeline does not exist.
The required pipeline can not be overridden with the `pipeline` parameter. A
default pipeline and a required pipeline can not both be set. The special
pipeline name `_none` indicates no ingest pipeline will run.

[float]
=== Settings in other index modules

Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test index with required pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }
# required pipeline via index
- do:
indices.create:
index: test
body:
settings:
index:
required_pipeline: "my_pipeline"
aliases:
test_alias: {}

- do:
index:
index: test
id: 1
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via alias
- do:
index:
index: test_alias
id: 2
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via upsert
- do:
update:
index: test
id: 3
body:
script:
source: "ctx._source.ran_script = true"
lang: "painless"
upsert: { "bytes_source_field":"1kb" }
- do:
get:
index: test
id: 3
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via scripted upsert
- do:
update:
index: test
id: 4
body:
script:
source: "ctx._source.bytes_source_field = '1kb'"
lang: "painless"
upsert : {}
scripted_upsert: true
- do:
get:
index: test
id: 4
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via doc_as_upsert
- do:
update:
index: test
id: 5
body:
doc: { "bytes_source_field":"1kb" }
doc_as_upsert: true
- do:
get:
index: test
id: 5
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via bulk upsert
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
# needs to be in the upsert, not the script
- do:
bulk:
refresh: true
body: |
{"update":{"_id":"6","_index":"test"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7","_index":"test"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8","_index":"test"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
{"update":{"_id":"6_alias","_index":"test_alias"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7_alias","_index":"test_alias"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8_alias","_index":"test_alias"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
- do:
mget:
body:
docs:
- { _index: "test", _id: "6" }
- { _index: "test", _id: "7" }
- { _index: "test", _id: "8" }
- { _index: "test", _id: "6_alias" }
- { _index: "test", _id: "7_alias" }
- { _index: "test", _id: "8_alias" }
- match: { docs.0._index: "test" }
- match: { docs.0._id: "6" }
- match: { docs.0._source.bytes_source_field: "1kb" }
- match: { docs.0._source.bytes_target_field: 1024 }
- is_false: docs.0._source.ran_script
- match: { docs.1._index: "test" }
- match: { docs.1._id: "7" }
- match: { docs.1._source.bytes_source_field: "2kb" }
- match: { docs.1._source.bytes_target_field: 2048 }
- match: { docs.2._index: "test" }
- match: { docs.2._id: "8" }
- match: { docs.2._source.bytes_source_field: "3kb" }
- match: { docs.2._source.bytes_target_field: 3072 }
- match: { docs.2._source.ran_script: true }
- match: { docs.3._index: "test" }
- match: { docs.3._id: "6_alias" }
- match: { docs.3._source.bytes_source_field: "1kb" }
- match: { docs.3._source.bytes_target_field: 1024 }
- is_false: docs.3._source.ran_script
- match: { docs.4._index: "test" }
- match: { docs.4._id: "7_alias" }
- match: { docs.4._source.bytes_source_field: "2kb" }
- match: { docs.4._source.bytes_target_field: 2048 }
- match: { docs.5._index: "test" }
- match: { docs.5._id: "8_alias" }
- match: { docs.5._source.bytes_source_field: "3kb" }
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }

# bad request, request pipeline can not be specified
- do:
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
index:
index: test
id: 9
pipeline: "pipeline"
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
@@ -76,6 +77,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -160,11 +162,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);

if (indexRequest != null) {
// get pipeline from request
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
// start to look for default pipeline via settings found in the index meta data
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
@@ -183,34 +188,86 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
}
}
if (indexMetaData != null) {
// Find the default pipeline if one is defined from and existing index.
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// No index exists yet (and is valid request), so matching index templates to look for a default pipeline
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
// order of templates are highest order first, break if we find a default_pipeline
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
break;
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
}
indexRequest.setPipeline(pipeline);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {

if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
}

}

if (hasIndexRequestsWithPipelines) {
@@ -221,6 +278,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
if (Assertions.ENABLED) {
final boolean allAreForwardedRequests = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.allMatch(IndexRequest::isPipelineResolved);
assert allAreForwardedRequests : bulkRequest;
}
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
Original file line number Diff line number Diff line change
@@ -102,6 +102,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement

private String pipeline;

private boolean isPipelineResolved;

/**
* Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
* provided ID.
@@ -131,6 +133,9 @@ public IndexRequest(StreamInput in) throws IOException {
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
isPipelineResolved = in.readBoolean();
}
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
if (in.readBoolean()) {
@@ -261,7 +266,7 @@ public XContentType getContentType() {
@Override
public String type() {
if (type == null) {
return MapperService.SINGLE_MAPPING_NAME;
return MapperService.SINGLE_MAPPING_NAME;
}
return type;
}
@@ -290,7 +295,7 @@ public IndexRequest defaultTypeIfNull(String defaultType) {
type = defaultType;
}
return this;
}
}
/**
* The id of the indexed document. If not set, will be automatically generated.
*/
@@ -345,6 +350,26 @@ public String getPipeline() {
return this.pipeline;
}

/**
* Sets if the pipeline for this request has been resolved by the coordinating node.
*
* @param isPipelineResolved true if the pipeline has been resolved
* @return the request
*/
public IndexRequest isPipelineResolved(final boolean isPipelineResolved) {
this.isPipelineResolved = isPipelineResolved;
return this;
}

/**
* Returns whether or not the pipeline for this request has been resolved by the coordinating node.
*
* @return true if the pipeline has been resolved
*/
public boolean isPipelineResolved() {
return this.isPipelineResolved;
}

/**
* The source of the document to index, recopied to a new array if it is unsafe.
*/
@@ -633,8 +658,8 @@ public void resolveRouting(MetaData metaData) {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
out.writeOptionalString(type());
out.writeOptionalString(id);
out.writeOptionalString(routing);
@@ -653,6 +678,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(isPipelineResolved);
}
out.writeBoolean(isRetry);
out.writeLong(autoGeneratedTimestamp);
if (contentType != null) {
Original file line number Diff line number Diff line change
@@ -166,6 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,
IndexSettings.REQUIRED_PIPELINE,
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,

// validate that built-in similarities don't get redefined
85 changes: 76 additions & 9 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
@@ -35,8 +35,10 @@
import org.elasticsearch.node.Node;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -299,12 +301,67 @@ public final class IndexSettings {
1000, 1, Property.Dynamic, Property.IndexScope);

public static final Setting<String> DEFAULT_PIPELINE =
new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> {
if (s == null || s.isEmpty()) {
throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string.");
}
return s;
}, Property.Dynamic, Property.IndexScope);
new Setting<>("index.default_pipeline",
IngestService.NOOP_PIPELINE_NAME,
Function.identity(),
new DefaultPipelineValidator(),
Property.Dynamic,
Property.IndexScope);

public static final Setting<String> REQUIRED_PIPELINE =
new Setting<>("index.required_pipeline",
IngestService.NOOP_PIPELINE_NAME,
Function.identity(),
new RequiredPipelineValidator(),
Property.Dynamic,
Property.IndexScope);

static class DefaultPipelineValidator implements Setting.Validator<String> {

@Override
public void validate(final String value) {

}

@Override
public void validate(final String value, final Map<Setting<String>, String> settings) {
final String requiredPipeline = settings.get(IndexSettings.REQUIRED_PIPELINE);
if (value.equals(IngestService.NOOP_PIPELINE_NAME) == false
&& requiredPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) {
throw new IllegalArgumentException(
"index has a default pipeline [" + value + "] and a required pipeline [" + requiredPipeline + "]");
}
}

@Override
public Iterator<Setting<String>> settings() {
return Collections.singletonList(REQUIRED_PIPELINE).iterator();
}

}

static class RequiredPipelineValidator implements Setting.Validator<String> {

@Override
public void validate(final String value) {

}

@Override
public void validate(final String value, final Map<Setting<String>, String> settings) {
final String defaultPipeline = settings.get(IndexSettings.DEFAULT_PIPELINE);
if (value.equals(IngestService.NOOP_PIPELINE_NAME) && defaultPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) {
throw new IllegalArgumentException(
"index has a required pipeline [" + value + "] and a default pipeline [" + defaultPipeline + "]");
}
}

@Override
public Iterator<Setting<String>> settings() {
return Collections.singletonList(DEFAULT_PIPELINE).iterator();
}

}

/**
* Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently
@@ -384,6 +441,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile int maxAnalyzedOffset;
private volatile int maxTermsCount;
private volatile String defaultPipeline;
private volatile String requiredPipeline;
private volatile boolean searchThrottled;

/**
@@ -555,6 +613,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
scopedSettings.addSettingsUpdateConsumer(REQUIRED_PIPELINE, this::setRequiredPipeline);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);
@@ -746,7 +805,7 @@ public TimeValue getTranslogSyncInterval() {
public void setTranslogSyncInterval(TimeValue translogSyncInterval) {
this.syncInterval = translogSyncInterval;
}

/**
* Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
*/
@@ -825,7 +884,7 @@ private void setMaxInnerResultWindow(int maxInnerResultWindow) {
* Returns the max number of filters in adjacency_matrix aggregation search requests
* @deprecated This setting will be removed in 8.0
*/
@Deprecated
@Deprecated
public int getMaxAdjacencyMatrixFilters() {
return this.maxAdjacencyMatrixFilters;
}
@@ -834,7 +893,7 @@ public int getMaxAdjacencyMatrixFilters() {
* @param maxAdjacencyFilters the max number of filters in adjacency_matrix aggregation search requests
* @deprecated This setting will be removed in 8.0
*/
@Deprecated
@Deprecated
private void setMaxAdjacencyMatrixFilters(int maxAdjacencyFilters) {
this.maxAdjacencyMatrixFilters = maxAdjacencyFilters;
}
@@ -992,6 +1051,14 @@ public void setDefaultPipeline(String defaultPipeline) {
this.defaultPipeline = defaultPipeline;
}

public String getRequiredPipeline() {
return requiredPipeline;
}

public void setRequiredPipeline(final String requiredPipeline) {
this.requiredPipeline = requiredPipeline;
}

/**
* Returns <code>true</code> if soft-delete is enabled.
*/
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -106,6 +107,9 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
ClusterState state = mock(ClusterState.class);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(clusterService.localNode()).thenReturn(localNode);
when(localNode.isIngestNode()).thenReturn(randomBoolean());
final ThreadPool threadPool = mock(ThreadPool.class);
final ExecutorService direct = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(direct);
133 changes: 133 additions & 0 deletions server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasToString;

public class RequiredPipelineIT extends ESIntegTestCase {

public void testRequiredPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
createIndex("index", settings);

// this asserts that the required_pipeline was used, without us having to actually create the pipeline etc.
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(e, hasToString(containsString("pipeline with id [required_pipeline] does not exist")));
}

public void testDefaultAndRequiredPipeline() {
final Settings settings = Settings.builder()
.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
.put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline")
.build();
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> createIndex("index", settings));
assertThat(
e,
hasToString(containsString("index has a default pipeline [default_pipeline] and a required pipeline [required_pipeline]")));
}

public void testDefaultAndRequiredPipelineFromTemplates() {
final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
final int requiredPipelineOrder;
final int defaultPipelineOrder;
if (randomBoolean()) {
defaultPipelineOrder = lowOrder;
requiredPipelineOrder = highOrder;
} else {
defaultPipelineOrder = highOrder;
requiredPipelineOrder = lowOrder;
}
final Settings defaultPipelineSettings =
Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
admin().indices()
.preparePutTemplate("default")
.setPatterns(Collections.singletonList("index*"))
.setOrder(defaultPipelineOrder)
.setSettings(defaultPipelineSettings)
.get();
final Settings requiredPipelineSettings =
Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
admin().indices()
.preparePutTemplate("required")
.setPatterns(Collections.singletonList("index*"))
.setOrder(requiredPipelineOrder)
.setSettings(requiredPipelineSettings)
.get();
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(
e,
hasToString(containsString(
"required pipeline [required_pipeline] and default pipeline [default_pipeline] can not both be set")));
}

public void testHighOrderRequiredPipelinePreferred() throws IOException {
final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
final Settings defaultPipelineSettings =
Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "low_order_required_pipeline").build();
admin().indices()
.preparePutTemplate("default")
.setPatterns(Collections.singletonList("index*"))
.setOrder(lowOrder)
.setSettings(defaultPipelineSettings)
.get();
final Settings requiredPipelineSettings =
Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "high_order_required_pipeline").build();
admin().indices()
.preparePutTemplate("required")
.setPatterns(Collections.singletonList("index*"))
.setOrder(highOrder)
.setSettings(requiredPipelineSettings)
.get();

// this asserts that the high_order_required_pipeline was selected, without us having to actually create the pipeline etc.
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("index", "_doc", "1").setSource(Collections.singletonMap("field", "value")).get());
assertThat(e, hasToString(containsString("pipeline with id [high_order_required_pipeline] does not exist")));
}

public void testRequiredPipelineAndRequestPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
createIndex("index", settings);
final IndexRequestBuilder builder = client().prepareIndex("index", "_doc", "1");
builder.setSource(Collections.singletonMap("field", "value"));
builder.setPipeline("request_pipeline");
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::get);
assertThat(
e,
hasToString(containsString("request pipeline [request_pipeline] can not override required pipeline [required_pipeline]")));
}

}
Original file line number Diff line number Diff line change
@@ -397,6 +397,7 @@ static String[] extractLeaderShardHistoryUUIDs(Map<String, String> ccrIndexMetaD
nonReplicatedSettings.add(IndexSettings.MAX_SLICES_PER_SCROLL);
nonReplicatedSettings.add(IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING);
nonReplicatedSettings.add(IndexSettings.DEFAULT_PIPELINE);
nonReplicatedSettings.add(IndexSettings.REQUIRED_PIPELINE);
nonReplicatedSettings.add(IndexSettings.INDEX_SEARCH_THROTTLED);
nonReplicatedSettings.add(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING);

0 comments on commit bd77626

Please sign in to comment.