From 12aad7e1c6dc17df54d9ad6ab254e245b39e0220 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Sat, 2 Mar 2019 16:50:18 -0600 Subject: [PATCH 1/5] main fix and tests, still need itests --- .../test/ingest/220_drop_processor.yml | 2 +- .../action/bulk/TransportBulkAction.java | 121 ++++++++++-------- .../bulk/TransportBulkActionIngestTests.java | 60 ++++++++- 3 files changed, 128 insertions(+), 55 deletions(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml index d1bb3b063a7c4..77a1df81a296a 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml @@ -91,4 +91,4 @@ teardown: get: index: test id: 3 -- match: { found: false } + diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 567b7fb808090..a1d054f1d7939 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -47,8 +47,10 @@ import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; @@ -151,6 +153,68 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener responses = new AtomicArray<>(bulkRequest.requests.size()); + boolean hasIndexRequestsWithPipelines = false; + final MetaData metaData = clusterService.state().getMetaData(); + ImmutableOpenMap indicesMetaData = metaData.indices(); + for (DocWriteRequest actionRequest : bulkRequest.requests) { + IndexRequest indexRequest = getIndexWriteRequest(actionRequest); + if (indexRequest != null) { + // get pipeline from request + String pipeline = indexRequest.getPipeline(); + if (pipeline == null) { + // start to look for default pipeline via settings found in the index meta data + IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); + if (indexMetaData == null && indexRequest.index() != null) { + // if the write request if through an alias use the write index's meta data + AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); + if (indexOrAlias != null && indexOrAlias.isAlias()) { + AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; + indexMetaData = alias.getWriteIndex(); + } + } + if (indexMetaData != null) { + // writing to an existing index. Find the the default pipeline if one is defined. + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } else if (indexRequest.index() != null) { + // no index exists yet (and is valid request), so matching index templates to look for a default pipeline + List templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); + assert (templates != null); + String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; + // apply templates, here, in reverse order, since first ones are better matching + for (int i = templates.size() - 1; i >= 0; i--) { + defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(templates.get(i).settings()); + } + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } + + if (hasIndexRequestsWithPipelines) { + // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but + // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method, + // this path is never taken. + try { + if (clusterService.localNode().isIngestNode()) { + processBulkIndexIngestRequest(task, bulkRequest, listener); + } else { + ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); + } + } catch (Exception e) { + listener.onFailure(e); + } + return; + } + if (needToCheck()) { // Attempt to create all the indices that we're going to need during the bulk before we start. // Step 1: collect all the indices in the request @@ -181,7 +245,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener { + executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); }), responses, indicesThatCannotBeCreated); @@ -215,56 +279,7 @@ public void onFailure(Exception e) { } } } else { - executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); - } - } - - private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, - final ActionListener listener, final AtomicArray responses, - Map indicesThatCannotBeCreated) { - boolean hasIndexRequestsWithPipelines = false; - final MetaData metaData = clusterService.state().getMetaData(); - ImmutableOpenMap indicesMetaData = metaData.indices(); - for (DocWriteRequest actionRequest : bulkRequest.requests) { - IndexRequest indexRequest = getIndexWriteRequest(actionRequest); - if(indexRequest != null){ - String pipeline = indexRequest.getPipeline(); - if (pipeline == null) { - IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); - if (indexMetaData == null && indexRequest.index() != null) { - //check the alias - AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); - if (indexOrAlias != null && indexOrAlias.isAlias()) { - AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; - indexMetaData = alias.getWriteIndex(); - } - } - if (indexMetaData == null) { - indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); - } else { - String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); - indexRequest.setPipeline(defaultPipeline); - if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - hasIndexRequestsWithPipelines = true; - } - } - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { - hasIndexRequestsWithPipelines = true; - } - } - } - if (hasIndexRequestsWithPipelines) { - try { - if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, listener); - } else { - ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); - } - } catch (Exception e) { - listener.onFailure(e); - } - } else { - executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); + executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 219aee9ebe2ff..4c6889be56c6b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -57,6 +58,7 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -460,7 +462,7 @@ public void testUseDefaultPipelineWithBulkUpsert() throws Exception { verifyZeroInteractions(transportService); } - public void testCreateIndexBeforeRunPipeline() throws Exception { + public void testDoExecuteCalledTwiceCorrectly() throws Exception { Exception exception = new Exception("fake exception"); IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); indexRequest.setPipeline("testpipeline"); @@ -478,20 +480,76 @@ public void testCreateIndexBeforeRunPipeline() throws Exception { // check failure works, and passes through to the listener assertFalse(action.isExecuted); // haven't executed yet + assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); + assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing completionHandler.getValue().accept(null); assertTrue(action.isExecuted); + assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path. assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); } + public void testNotFindDefaultPipelineFromTemplateMatches(){ + Exception exception = new Exception("fake exception"); + IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); + indexRequest.source(Collections.emptyMap()); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> responseCalled.set(true), + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + assertEquals(IngestService.NOOP_PIPELINE_NAME, indexRequest.getPipeline()); + verifyZeroInteractions(ingestService); + + } + + public void testFindDefaultPipelineFromTemplateMatch(){ + Exception exception = new Exception("fake exception"); + ClusterState state = clusterService.state(); + + ImmutableOpenMap.Builder templateMetaDataBuilder = ImmutableOpenMap.builder(); + templateMetaDataBuilder.put("template1", IndexTemplateMetaData.builder("template1").patterns(Arrays.asList("missing_index")) + .order(1).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline1").build()).build()); + templateMetaDataBuilder.put("template2", IndexTemplateMetaData.builder("template2").patterns(Arrays.asList("missing_*")) + .order(2).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build()).build()); + templateMetaDataBuilder.put("template3", IndexTemplateMetaData.builder("template3").patterns(Arrays.asList("missing*")) + .order(3).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline3").build()).build()); + templateMetaDataBuilder.put("template4", IndexTemplateMetaData.builder("template4").patterns(Arrays.asList("nope")) + .order(3).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline4").build()).build()); + + MetaData metaData = mock(MetaData.class); + when(state.metaData()).thenReturn(metaData); + when(state.getMetaData()).thenReturn(metaData); + when(metaData.templates()).thenReturn(templateMetaDataBuilder.build()); + when(metaData.getTemplates()).thenReturn(templateMetaDataBuilder.build()); + when(metaData.indices()).thenReturn(ImmutableOpenMap.of()); + + IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); + indexRequest.source(Collections.emptyMap()); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> responseCalled.set(true), + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + assertEquals("pipeline3", indexRequest.getPipeline()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + } + private void validateDefaultPipeline(IndexRequest indexRequest) { Exception exception = new Exception("fake exception"); indexRequest.source(Collections.emptyMap()); From 44b4ef1a473f671b1f97f7f79b808b631fec1310 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Sat, 2 Mar 2019 17:22:59 -0600 Subject: [PATCH 2/5] itests --- .../test/ingest/230_change_target_index.yml | 119 ++++++++++++++++++ .../action/bulk/TransportBulkAction.java | 4 +- 2 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/230_change_target_index.yml diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/230_change_target_index.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/230_change_target_index.yml new file mode 100644 index 0000000000000..bb2677f9b193f --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/230_change_target_index.yml @@ -0,0 +1,119 @@ +--- +teardown: +- do: + ingest.delete_pipeline: + id: "retarget" + ignore: 404 + +- do: + indices.delete: + index: foo + +--- +"Test Change Target Index with Explicit Pipeline": + +- do: + ingest.put_pipeline: + id: "retarget" + body: > + { + "processors": [ + { + "set" : { + "field" : "_index", + "value" : "foo" + } + } + ] + } +- match: { acknowledged: true } + +# no indices +- do: + cat.indices: {} + +- match: + $body: | + /^$/ + +- do: + index: + index: test + id: 1 + pipeline: "retarget" + body: { + a: true + } + +- do: + get: + index: foo + id: 1 +- match: { _source.a: true } + +# only the foo index +- do: + cat.indices: + h: i + +- match: + $body: | + /^foo\n$/ + +--- +"Test Change Target Index with Default Pipeline": + +- do: + indices.put_template: + name: index_template + body: + index_patterns: test + settings: + default_pipeline: "retarget" + +- do: + ingest.put_pipeline: + id: "retarget" + body: > + { + "processors": [ + { + "set" : { + "field" : "_index", + "value" : "foo" + } + } + ] + } +- match: { acknowledged: true } + +# no indices +- do: + cat.indices: {} + +- match: + $body: | + /^$/ + +- do: + index: + index: test + id: 1 + body: { + a: true + } + +- do: + get: + index: foo + id: 1 +- match: { _source.a: true } + +# only the foo index +- do: + cat.indices: + h: i + +- match: + $body: | + /^foo\n$/ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a1d054f1d7939..83bbb765ad8e3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -173,14 +173,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); assert (templates != null); String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; From f64fdc3e4b3cb2aa9fc96a557504dd27777ef079 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Sat, 2 Mar 2019 18:23:03 -0600 Subject: [PATCH 3/5] fix doc tests --- docs/reference/ingest/ingest-node.asciidoc | 66 ---------------------- 1 file changed, 66 deletions(-) diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 578bf35cb2446..3c8d8e9abf2bf 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -271,28 +271,6 @@ POST test/_doc/1?pipeline=drop_guests_network // CONSOLE // TEST[continued] -//// -Hidden example assertion: -[source,js] --------------------------------------------------- -GET test/_doc/1 --------------------------------------------------- -// CONSOLE -// TEST[continued] -// TEST[catch:missing] - -[source,js] --------------------------------------------------- -{ - "_index": "test", - "_type": "_doc", - "_id": "1", - "found": false -} --------------------------------------------------- -// TESTRESPONSE -//// - Thanks to the `?.` operator the following document will not throw an error. If the pipeline used a `.` the following document would throw a NullPointerException since the `network` object is not part of the source document. @@ -392,28 +370,6 @@ POST test/_doc/3?pipeline=drop_guests_network // CONSOLE // TEST[continued] -//// -Hidden example assertion: -[source,js] --------------------------------------------------- -GET test/_doc/3 --------------------------------------------------- -// CONSOLE -// TEST[continued] -// TEST[catch:missing] - -[source,js] --------------------------------------------------- -{ - "_index": "test", - "_type": "_doc", - "_id": "3", - "found": false -} --------------------------------------------------- -// TESTRESPONSE -//// - The `?.` operators works well for use in the `if` conditional because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator] returns null if the object is null and `==` is null safe (as well as many other @@ -511,28 +467,6 @@ POST test/_doc/1?pipeline=not_prod_dropper The document is <> since `prod` (case insensitive) is not found in the tags. -//// -Hidden example assertion: -[source,js] --------------------------------------------------- -GET test/_doc/1 --------------------------------------------------- -// CONSOLE -// TEST[continued] -// TEST[catch:missing] - -[source,js] --------------------------------------------------- -{ - "_index": "test", - "_type": "_doc", - "_id": "1", - "found": false -} --------------------------------------------------- -// TESTRESPONSE -//// - The following document is indexed (i.e. not dropped) since `prod` (case insensitive) is found in the tags. From efdaf538a47d7aa0cb289524541fd5adeaf871a9 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 5 Mar 2019 15:25:28 -0600 Subject: [PATCH 4/5] properly handle the default setting when template matching --- .../org/elasticsearch/action/bulk/TransportBulkAction.java | 6 +++++- .../action/bulk/TransportBulkActionIngestTests.java | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 83bbb765ad8e3..38f39326b5e8e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -186,7 +187,10 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener= 0; i--) { - defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(templates.get(i).settings()); + Settings settings = templates.get(i).settings(); + if(IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); + } } indexRequest.setPipeline(defaultPipeline); if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 4c6889be56c6b..a13e8af919b2a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -524,9 +524,9 @@ public void testFindDefaultPipelineFromTemplateMatch(){ templateMetaDataBuilder.put("template2", IndexTemplateMetaData.builder("template2").patterns(Arrays.asList("missing_*")) .order(2).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build()).build()); templateMetaDataBuilder.put("template3", IndexTemplateMetaData.builder("template3").patterns(Arrays.asList("missing*")) - .order(3).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline3").build()).build()); + .order(3).build()); templateMetaDataBuilder.put("template4", IndexTemplateMetaData.builder("template4").patterns(Arrays.asList("nope")) - .order(3).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline4").build()).build()); + .order(4).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline4").build()).build()); MetaData metaData = mock(MetaData.class); when(state.metaData()).thenReturn(metaData); @@ -546,7 +546,7 @@ public void testFindDefaultPipelineFromTemplateMatch(){ failureCalled.set(true); })); - assertEquals("pipeline3", indexRequest.getPipeline()); + assertEquals("pipeline2", indexRequest.getPipeline()); verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); } From 80d71b43445535f47173481c4946491b8e50144e Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 6 Mar 2019 12:49:22 -0600 Subject: [PATCH 5/5] fix loop --- .../elasticsearch/action/bulk/TransportBulkAction.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 29ad57e799fca..9adc92e02bedb 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -185,11 +185,12 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); assert (templates != null); String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; - // apply templates, here, in reverse order, since first ones are better matching - for (int i = templates.size() - 1; i >= 0; i--) { - Settings settings = templates.get(i).settings(); - if(IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + // order of templates are highest order first, break if we find a default_pipeline + for (IndexTemplateMetaData template : templates) { + final Settings settings = template.settings(); + if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); + break; } } indexRequest.setPipeline(defaultPipeline);