From 852c3fa03eb8af0932bbf82661e6bfa15839e7d5 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 26 Oct 2016 14:56:10 -0500 Subject: [PATCH] Attempt to send 20MB bulks by default --- lib/logstash/outputs/elasticsearch.rb | 14 +++---- lib/logstash/outputs/elasticsearch/common.rb | 12 +++--- .../outputs/elasticsearch/common_configs.rb | 9 +--- .../outputs/elasticsearch/http_client.rb | 41 +++++++++++++++---- spec/integration/outputs/index_spec.rb | 13 +++--- spec/integration/outputs/parent_spec.rb | 6 +-- spec/integration/outputs/routing_spec.rb | 4 -- .../outputs/elasticsearch/http_client_spec.rb | 2 +- spec/unit/outputs/elasticsearch_spec.rb | 10 +---- travis-run.sh | 9 ++-- 10 files changed, 62 insertions(+), 58 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index afdb8e57e..f2c63c81c 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -21,16 +21,16 @@ # You can learn more about Elasticsearch at # # ==== Template management for Elasticsearch 5.x -# Index template for this version (Logstash 5.0) has been changed to reflect Elasticsearch's mapping changes in version 5.0. -# Most importantly, the subfield for string multi-fields has changed from `.raw` to `.keyword` to match ES default +# Index template for this version (Logstash 5.0) has been changed to reflect Elasticsearch's mapping changes in version 5.0. +# Most importantly, the subfield for string multi-fields has changed from `.raw` to `.keyword` to match ES default # behavior. # # ** Users installing ES 5.x and LS 5.x ** # This change will not affect you and you will continue to use the ES defaults. # # ** Users upgrading from LS 2.x to LS 5.x with ES 5.x ** -# LS will not force upgrade the template, if `logstash` template already exists. This means you will still use -# `.raw` for sub-fields coming from 2.x. If you choose to use the new template, you will have to reindex your data after +# LS will not force upgrade the template, if `logstash` template already exists. This means you will still use +# `.raw` for sub-fields coming from 2.x. If you choose to use the new template, you will have to reindex your data after # the new template is installed. # # ==== Retry Policy @@ -146,7 +146,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # a timeout occurs, the request will be retried. config :timeout, :validate => :number, :default => 60 - # Set the Elasticsearch errors in the whitelist that you don't want to log. + # Set the Elasticsearch errors in the whitelist that you don't want to log. # A useful example is when you want to skip all 409 errors # which are `document_already_exists_exception`. config :failure_type_logging_whitelist, :validate => :array, :default => [] @@ -173,7 +173,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # Resurrection is the process by which backend endpoints marked 'down' are checked # to see if they have come back to life config :resurrect_delay, :validate => :number, :default => 5 - + # How long to wait before checking if the connection is stale before executing a request on a connection using keepalive. # You may want to set this lower, if you get connection errors regularly # Quoting the Apache commons docs (this client is based Apache Commmons): @@ -183,7 +183,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # have become stale (half-closed) while kept inactive in the pool.' # See https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#setValidateAfterInactivity(int)[these docs for more info] config :validate_after_inactivity, :validate => :number, :default => 10000 - + def build_client @client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params) end diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 4d2d53663..170c8ca47 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -19,9 +19,7 @@ def register # Receive an array of events and immediately attempt to index them (no buffering) def multi_receive(events) - events.each_slice(@flush_size) do |slice| - retrying_submit(slice.map {|e| event_action_tuple(e) }) - end + retrying_submit(events.map {|e| event_action_tuple(e) }) end # Convert the event into a 3-tuple of action, params, and event @@ -59,14 +57,14 @@ def valid_actions VALID_HTTP_ACTIONS end - def retrying_submit(actions) + def retrying_submit(actions) # Initially we submit the full list of actions submit_actions = actions sleep_interval = @retry_initial_interval while submit_actions && submit_actions.length > 0 - + # We retry with whatever is didn't succeed begin submit_actions = submit(submit_actions) @@ -103,7 +101,7 @@ def next_sleep_interval(current_interval) def submit(actions) bulk_response = safe_bulk(actions) - + # If the response is nil that means we were in a retry loop # and aborted since we're shutting down # If it did return and there are no errors we're good as well @@ -231,7 +229,7 @@ def safe_bulk(actions) @logger.debug("Failed actions for last bad bulk request!", :actions => actions) # We retry until there are no errors! Errors should all go to the retry queue - sleep_interval = sleep_for_interval(sleep_interval) + sleep_interval = sleep_for_interval(sleep_interval) retry unless @stopping.true? end end diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index cd381cad1..f265cefbd 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -75,13 +75,8 @@ def self.included(mod) # to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes in Elasticsearch. mod.config :hosts, :validate => :array, :default => ["127.0.0.1"] - # This plugin uses the bulk index API for improved indexing performance. - # This setting defines the maximum sized bulk request Logstash will make - # You you may want to increase this to be in line with your pipeline's batch size. - # If you specify a number larger than the batch size of your pipeline it will have no effect, - # save for the case where a filter increases the size of an inflight batch by outputting - # events. - mod.config :flush_size, :validate => :number, :default => 500 + # DEPRECATED. We now restrict bulk sizes to a size that balances heap size with reducing round-trips. Very large events will be sent in a single request by themselves. + mod.config :flush_size, :validate => :number, :default => 500, :deprecated => "No Longer Needed" # The amount of time since last flush before a flush is forced. # diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 503408587..2e00d7540 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -5,6 +5,8 @@ require 'logstash/outputs/elasticsearch/http_client/manticore_adapter' module LogStash; module Outputs; class ElasticSearch; + TARGET_BULK_BYTES = 20 * 1024 * 1024 + class HttpClient attr_reader :client, :options, :logger, :pool, :action_count, :recv_count # This is here in case we use DEFAULT_OPTIONS in the future @@ -38,9 +40,11 @@ def get_version def bulk(actions) @action_count ||= 0 @action_count += actions.size - + return if actions.empty? - bulk_body = actions.collect do |action, args, source| + + + bulk_actions = actions.collect do |action, args, source| args, source = update_action_builder(args, source) if action == 'update' if source && action != 'delete' @@ -48,13 +52,36 @@ def bulk(actions) else next { action => args } end - end. - flatten. - reduce("") do |acc,line| - acc << LogStash::Json.dump(line) - acc << "\n" end + bulk_body = "" + bulk_responses = [] + bulk_actions.each do |action| + as_json = action.is_a?(Array) ? + action.map {|line| LogStash::Json.dump(line)}.join("\n") : + LogStash::Json.dump(action) + + if (bulk_body.size + as_json.size) > TARGET_BULK_BYTES + bulk_responses << bulk_send(bulk_body << "\n") + bulk_body = as_json + else + bulk_body << as_json + end + end + + bulk_responses << bulk_send(bulk_body << "\n") if bulk_body.size > 0 + + join_bulk_responses(bulk_responses) + end + + def join_bulk_responses(bulk_responses) + { + "errors" => bulk_responses.any? {|r| r["errors"] == true}, + "items" => bulk_responses.reduce([]) {|m,r| m << r["items"]} + } + end + + def bulk_send(bulk_body) # Discard the URL url, response = @pool.post("_bulk", nil, bulk_body) LogStash::Json.load(response.body) diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 6fbd36524..55ecd751a 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -1,18 +1,17 @@ require_relative "../../../spec/es_spec_helper" shared_examples "an indexer" do + let(:event) { super || LogStash::Event.new("message" => "Hello World!", "type" => type) } let(:index) { 10.times.collect { rand(10).to_s }.join("") } let(:type) { 10.times.collect { rand(10).to_s }.join("") } let(:event_count) { 10000 + rand(500) } - let(:flush_size) { rand(200) + 1 } let(:config) { "not implemented" } + let(:events) { events event_count.times.map { event }.to_a } subject { LogStash::Outputs::ElasticSearch.new(config) } before do subject.register - event_count.times do - subject.multi_receive([LogStash::Event.new("message" => "Hello World!", "type" => type)]) - end + subject.multi_receive(events) end it "ships events" do @@ -47,8 +46,7 @@ let(:config) { { "hosts" => get_host_port, - "index" => index, - "flush_size" => flush_size + "index" => index } } end @@ -60,8 +58,7 @@ let(:config) { { "hosts" => get_host_port, - "index" => index, - "flush_size" => flush_size + "index" => index } } end diff --git a/spec/integration/outputs/parent_spec.rb b/spec/integration/outputs/parent_spec.rb index 42ee6858e..7e5a0cff1 100644 --- a/spec/integration/outputs/parent_spec.rb +++ b/spec/integration/outputs/parent_spec.rb @@ -4,7 +4,6 @@ let(:index) { 10.times.collect { rand(10).to_s }.join("") } let(:type) { 10.times.collect { rand(10).to_s }.join("") } let(:event_count) { 10000 + rand(500) } - let(:flush_size) { rand(200) + 1 } let(:parent) { "not_implemented" } let(:config) { "not_implemented" } subject { LogStash::Outputs::ElasticSearch.new(config) } @@ -17,7 +16,7 @@ ftw.put!("#{index_url}", :body => mapping.to_json) pdoc = { "foo" => "bar" } ftw.put!("#{index_url}/#{type}_parent/test", :body => pdoc.to_json) - + subject.register subject.multi_receive(event_count.times.map { LogStash::Event.new("link_to" => "test", "message" => "Hello World!", "type" => type) }) end @@ -49,7 +48,6 @@ { "hosts" => get_host_port, "index" => index, - "flush_size" => flush_size, "parent" => parent } } @@ -62,10 +60,8 @@ { "hosts" => get_host_port, "index" => index, - "flush_size" => flush_size, "parent" => "%{link_to}" } } end end - diff --git a/spec/integration/outputs/routing_spec.rb b/spec/integration/outputs/routing_spec.rb index 9de39edf9..bb35953f6 100644 --- a/spec/integration/outputs/routing_spec.rb +++ b/spec/integration/outputs/routing_spec.rb @@ -4,7 +4,6 @@ let(:index) { 10.times.collect { rand(10).to_s }.join("") } let(:type) { 10.times.collect { rand(10).to_s }.join("") } let(:event_count) { 10000 + rand(500) } - let(:flush_size) { rand(200) + 1 } let(:routing) { "not_implemented" } let(:config) { "not_implemented" } subject { LogStash::Outputs::ElasticSearch.new(config) } @@ -43,7 +42,6 @@ { "hosts" => get_host_port, "index" => index, - "flush_size" => flush_size, "routing" => routing } } @@ -57,10 +55,8 @@ { "hosts" => get_host_port, "index" => index, - "flush_size" => flush_size, "routing" => "%{message}" } } end end - diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index dbe74855e..00cd7dce1 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -3,7 +3,7 @@ require "java" describe LogStash::Outputs::ElasticSearch::HttpClient do - let(:base_options) { {:hosts => ["127.0.0.1"], :logger => Cabin::Channel.get }} + let(:base_options) { {:hosts => ["127.0.0.1"], :logger => Cabin::Channel.get} } describe "Host/URL Parsing" do subject { described_class.new(base_options) } diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 3525ea2c8..9ae28701b 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -142,7 +142,6 @@ describe "#multi_receive" do let(:events) { [double("one"), double("two"), double("three")] } let(:events_tuples) { [double("one t"), double("two t"), double("three t")] } - let(:options) { super.merge("flush_size" => 2) } before do allow(eso).to receive(:retrying_submit).with(anything) @@ -152,12 +151,6 @@ end eso.multi_receive(events) end - - it "should receive an array of events and invoke retrying_submit with them, split by flush_size" do - expect(eso).to have_received(:retrying_submit).with(events_tuples.slice(0,2)) - expect(eso).to have_received(:retrying_submit).with(events_tuples.slice(2,3)) - end - end end @@ -169,8 +162,7 @@ { "manage_template" => false, "hosts" => "localhost:#{port}", - "flush_size" => 1, - "timeout" => 0.1, # fast timeout + "timeout" => 0.1 # fast timeout } end let(:eso) {LogStash::Outputs::ElasticSearch.new(options)} diff --git a/travis-run.sh b/travis-run.sh index d984b7479..0be325436 100755 --- a/travis-run.sh +++ b/travis-run.sh @@ -10,9 +10,12 @@ trap finish EXIT setup_es() { download_url=$1 - curl -sL $download_url > elasticsearch.tar.gz - mkdir elasticsearch - tar -xzf elasticsearch.tar.gz --strip-components=1 -C ./elasticsearch/. + if [[ ! -d elasticsearch ]]; then + curl -sL $download_url > elasticsearch.tar.gz + mkdir elasticsearch + tar -xzf elasticsearch.tar.gz --strip-components=1 -C ./elasticsearch/. + fi + rm -f elasticsearch/config/scripts || true ln -sn ../../spec/fixtures/scripts elasticsearch/config/. }