4747import org .elasticsearch .cluster .metadata .AliasOrIndex ;
4848import org .elasticsearch .cluster .metadata .IndexMetaData ;
4949import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
50+ import org .elasticsearch .cluster .metadata .IndexTemplateMetaData ;
5051import org .elasticsearch .cluster .metadata .MappingMetaData ;
5152import org .elasticsearch .cluster .metadata .MetaData ;
53+ import org .elasticsearch .cluster .metadata .MetaDataIndexTemplateService ;
5254import org .elasticsearch .cluster .service .ClusterService ;
5355import org .elasticsearch .common .collect .ImmutableOpenMap ;
5456import org .elasticsearch .common .inject .Inject ;
57+ import org .elasticsearch .common .settings .Settings ;
5558import org .elasticsearch .common .unit .TimeValue ;
5659import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
5760import org .elasticsearch .common .util .concurrent .AtomicArray ;
@@ -151,6 +154,72 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
151154 final long startTime = relativeTime ();
152155 final AtomicArray <BulkItemResponse > responses = new AtomicArray <>(bulkRequest .requests .size ());
153156
157+ boolean hasIndexRequestsWithPipelines = false ;
158+ final MetaData metaData = clusterService .state ().getMetaData ();
159+ ImmutableOpenMap <String , IndexMetaData > indicesMetaData = metaData .indices ();
160+ for (DocWriteRequest <?> actionRequest : bulkRequest .requests ) {
161+ IndexRequest indexRequest = getIndexWriteRequest (actionRequest );
162+ if (indexRequest != null ) {
163+ // get pipeline from request
164+ String pipeline = indexRequest .getPipeline ();
165+ if (pipeline == null ) {
166+ // start to look for default pipeline via settings found in the index meta data
167+ IndexMetaData indexMetaData = indicesMetaData .get (actionRequest .index ());
168+ if (indexMetaData == null && indexRequest .index () != null ) {
169+ // if the write request if through an alias use the write index's meta data
170+ AliasOrIndex indexOrAlias = metaData .getAliasAndIndexLookup ().get (indexRequest .index ());
171+ if (indexOrAlias != null && indexOrAlias .isAlias ()) {
172+ AliasOrIndex .Alias alias = (AliasOrIndex .Alias ) indexOrAlias ;
173+ indexMetaData = alias .getWriteIndex ();
174+ }
175+ }
176+ if (indexMetaData != null ) {
177+ // Find the the default pipeline if one is defined from and existing index.
178+ String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetaData .getSettings ());
179+ indexRequest .setPipeline (defaultPipeline );
180+ if (IngestService .NOOP_PIPELINE_NAME .equals (defaultPipeline ) == false ) {
181+ hasIndexRequestsWithPipelines = true ;
182+ }
183+ } else if (indexRequest .index () != null ) {
184+ // No index exists yet (and is valid request), so matching index templates to look for a default pipeline
185+ List <IndexTemplateMetaData > templates = MetaDataIndexTemplateService .findTemplates (metaData , indexRequest .index ());
186+ assert (templates != null );
187+ String defaultPipeline = IngestService .NOOP_PIPELINE_NAME ;
188+ // order of templates are highest order first, break if we find a default_pipeline
189+ for (IndexTemplateMetaData template : templates ) {
190+ final Settings settings = template .settings ();
191+ if (IndexSettings .DEFAULT_PIPELINE .exists (settings )) {
192+ defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (settings );
193+ break ;
194+ }
195+ }
196+ indexRequest .setPipeline (defaultPipeline );
197+ if (IngestService .NOOP_PIPELINE_NAME .equals (defaultPipeline ) == false ) {
198+ hasIndexRequestsWithPipelines = true ;
199+ }
200+ }
201+ } else if (IngestService .NOOP_PIPELINE_NAME .equals (pipeline ) == false ) {
202+ hasIndexRequestsWithPipelines = true ;
203+ }
204+ }
205+ }
206+
207+ if (hasIndexRequestsWithPipelines ) {
208+ // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
209+ // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
210+ // this path is never taken.
211+ try {
212+ if (clusterService .localNode ().isIngestNode ()) {
213+ processBulkIndexIngestRequest (task , bulkRequest , listener );
214+ } else {
215+ ingestForwarder .forwardIngestRequest (BulkAction .INSTANCE , bulkRequest , listener );
216+ }
217+ } catch (Exception e ) {
218+ listener .onFailure (e );
219+ }
220+ return ;
221+ }
222+
154223 if (needToCheck ()) {
155224 // Attempt to create all the indices that we're going to need during the bulk before we start.
156225 // Step 1: collect all the indices in the request
@@ -181,15 +250,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
181250 }
182251 // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
183252 if (autoCreateIndices .isEmpty ()) {
184- executeIngestAndBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
253+ executeBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
185254 } else {
186255 final AtomicInteger counter = new AtomicInteger (autoCreateIndices .size ());
187256 for (String index : autoCreateIndices ) {
188257 createIndex (index , bulkRequest .timeout (), new ActionListener <CreateIndexResponse >() {
189258 @ Override
190259 public void onResponse (CreateIndexResponse result ) {
191260 if (counter .decrementAndGet () == 0 ) {
192- executeIngestAndBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
261+ executeBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
193262 }
194263 }
195264
@@ -205,7 +274,7 @@ public void onFailure(Exception e) {
205274 }
206275 }
207276 if (counter .decrementAndGet () == 0 ) {
208- executeIngestAndBulk (task , bulkRequest , startTime , ActionListener .wrap (listener ::onResponse , inner -> {
277+ executeBulk (task , bulkRequest , startTime , ActionListener .wrap (listener ::onResponse , inner -> {
209278 inner .addSuppressed (e );
210279 listener .onFailure (inner );
211280 }), responses , indicesThatCannotBeCreated );
@@ -215,56 +284,7 @@ public void onFailure(Exception e) {
215284 }
216285 }
217286 } else {
218- executeIngestAndBulk (task , bulkRequest , startTime , listener , responses , emptyMap ());
219- }
220- }
221-
222- private void executeIngestAndBulk (Task task , final BulkRequest bulkRequest , final long startTimeNanos ,
223- final ActionListener <BulkResponse > listener , final AtomicArray <BulkItemResponse > responses ,
224- Map <String , IndexNotFoundException > indicesThatCannotBeCreated ) {
225- boolean hasIndexRequestsWithPipelines = false ;
226- final MetaData metaData = clusterService .state ().getMetaData ();
227- ImmutableOpenMap <String , IndexMetaData > indicesMetaData = metaData .indices ();
228- for (DocWriteRequest <?> actionRequest : bulkRequest .requests ) {
229- IndexRequest indexRequest = getIndexWriteRequest (actionRequest );
230- if (indexRequest != null ){
231- String pipeline = indexRequest .getPipeline ();
232- if (pipeline == null ) {
233- IndexMetaData indexMetaData = indicesMetaData .get (actionRequest .index ());
234- if (indexMetaData == null && indexRequest .index () != null ) {
235- //check the alias
236- AliasOrIndex indexOrAlias = metaData .getAliasAndIndexLookup ().get (indexRequest .index ());
237- if (indexOrAlias != null && indexOrAlias .isAlias ()) {
238- AliasOrIndex .Alias alias = (AliasOrIndex .Alias ) indexOrAlias ;
239- indexMetaData = alias .getWriteIndex ();
240- }
241- }
242- if (indexMetaData == null ) {
243- indexRequest .setPipeline (IngestService .NOOP_PIPELINE_NAME );
244- } else {
245- String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetaData .getSettings ());
246- indexRequest .setPipeline (defaultPipeline );
247- if (IngestService .NOOP_PIPELINE_NAME .equals (defaultPipeline ) == false ) {
248- hasIndexRequestsWithPipelines = true ;
249- }
250- }
251- } else if (IngestService .NOOP_PIPELINE_NAME .equals (pipeline ) == false ) {
252- hasIndexRequestsWithPipelines = true ;
253- }
254- }
255- }
256- if (hasIndexRequestsWithPipelines ) {
257- try {
258- if (clusterService .localNode ().isIngestNode ()) {
259- processBulkIndexIngestRequest (task , bulkRequest , listener );
260- } else {
261- ingestForwarder .forwardIngestRequest (BulkAction .INSTANCE , bulkRequest , listener );
262- }
263- } catch (Exception e ) {
264- listener .onFailure (e );
265- }
266- } else {
267- executeBulk (task , bulkRequest , startTimeNanos , listener , responses , indicesThatCannotBeCreated );
287+ executeBulk (task , bulkRequest , startTime , listener , responses , emptyMap ());
268288 }
269289 }
270290
0 commit comments