From 63c24c9fe48a74d00c65145cc55c32f4c6907448 Mon Sep 17 00:00:00 2001 From: Kelly Joseph Price Date: Sat, 23 Sep 2017 12:21:15 -0700 Subject: [PATCH] Add pipeline to bulk params --- .../lib/elasticsearch/model/importing.rb | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/elasticsearch-model/lib/elasticsearch/model/importing.rb b/elasticsearch-model/lib/elasticsearch/model/importing.rb index 0534c2519..c0519ef9f 100644 --- a/elasticsearch-model/lib/elasticsearch/model/importing.rb +++ b/elasticsearch-model/lib/elasticsearch/model/importing.rb @@ -143,6 +143,7 @@ def import(options={}, &block) target_index = options.delete(:index) || index_name target_type = options.delete(:type) || document_type transform = options.delete(:transform) || __transform + pipeline = options.delete(:pipeline) return_value = options.delete(:return) || 'count' unless transform.respond_to?(:call) @@ -158,10 +159,15 @@ def import(options={}, &block) end __find_in_batches(options) do |batch| - response = client.bulk \ - index: target_index, - type: target_type, - body: __batch_to_bulk(batch, transform) + params = { + index: target_index, + type: target_type, + body: __batch_to_bulk(batch, transform) + } + + params[:pipeline] = pipeline if pipeline + + response = client.bulk params yield response if block_given?