Skip to content

Commit

Permalink
Attempt to send 20MB bulks by default
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Oct 28, 2016
1 parent 4ec898b commit 852c3fa
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 58 deletions.
14 changes: 7 additions & 7 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
# You can learn more about Elasticsearch at <https://www.elastic.co/products/elasticsearch>
#
# ==== Template management for Elasticsearch 5.x
# Index template for this version (Logstash 5.0) has been changed to reflect Elasticsearch's mapping changes in version 5.0.
# Most importantly, the subfield for string multi-fields has changed from `.raw` to `.keyword` to match ES default
# Index template for this version (Logstash 5.0) has been changed to reflect Elasticsearch's mapping changes in version 5.0.
# Most importantly, the subfield for string multi-fields has changed from `.raw` to `.keyword` to match ES default
# behavior.
#
# ** Users installing ES 5.x and LS 5.x **
# This change will not affect you and you will continue to use the ES defaults.
#
# ** Users upgrading from LS 2.x to LS 5.x with ES 5.x **
# LS will not force upgrade the template, if `logstash` template already exists. This means you will still use
# `.raw` for sub-fields coming from 2.x. If you choose to use the new template, you will have to reindex your data after
# LS will not force upgrade the template, if `logstash` template already exists. This means you will still use
# `.raw` for sub-fields coming from 2.x. If you choose to use the new template, you will have to reindex your data after
# the new template is installed.
#
# ==== Retry Policy
Expand Down Expand Up @@ -146,7 +146,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# a timeout occurs, the request will be retried.
config :timeout, :validate => :number, :default => 60

# Set the Elasticsearch errors in the whitelist that you don't want to log.
# Set the Elasticsearch errors in the whitelist that you don't want to log.
# A useful example is when you want to skip all 409 errors
# which are `document_already_exists_exception`.
config :failure_type_logging_whitelist, :validate => :array, :default => []
Expand All @@ -173,7 +173,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# Resurrection is the process by which backend endpoints marked 'down' are checked
# to see if they have come back to life
config :resurrect_delay, :validate => :number, :default => 5

# How long to wait before checking if the connection is stale before executing a request on a connection using keepalive.
# You may want to set this lower, if you get connection errors regularly
# Quoting the Apache commons docs (this client is based Apache Commmons):
Expand All @@ -183,7 +183,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# have become stale (half-closed) while kept inactive in the pool.'
# See https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#setValidateAfterInactivity(int)[these docs for more info]
config :validate_after_inactivity, :validate => :number, :default => 10000

def build_client
@client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
end
Expand Down
12 changes: 5 additions & 7 deletions lib/logstash/outputs/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ def register

# Receive an array of events and immediately attempt to index them (no buffering)
def multi_receive(events)
events.each_slice(@flush_size) do |slice|
retrying_submit(slice.map {|e| event_action_tuple(e) })
end
retrying_submit(events.map {|e| event_action_tuple(e) })
end

# Convert the event into a 3-tuple of action, params, and event
Expand Down Expand Up @@ -59,14 +57,14 @@ def valid_actions
VALID_HTTP_ACTIONS
end

def retrying_submit(actions)
def retrying_submit(actions)
# Initially we submit the full list of actions
submit_actions = actions

sleep_interval = @retry_initial_interval

while submit_actions && submit_actions.length > 0

# We retry with whatever is didn't succeed
begin
submit_actions = submit(submit_actions)
Expand Down Expand Up @@ -103,7 +101,7 @@ def next_sleep_interval(current_interval)

def submit(actions)
bulk_response = safe_bulk(actions)

# If the response is nil that means we were in a retry loop
# and aborted since we're shutting down
# If it did return and there are no errors we're good as well
Expand Down Expand Up @@ -231,7 +229,7 @@ def safe_bulk(actions)
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)

# We retry until there are no errors! Errors should all go to the retry queue
sleep_interval = sleep_for_interval(sleep_interval)
sleep_interval = sleep_for_interval(sleep_interval)
retry unless @stopping.true?
end
end
Expand Down
9 changes: 2 additions & 7 deletions lib/logstash/outputs/elasticsearch/common_configs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,8 @@ def self.included(mod)
# to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes in Elasticsearch.
mod.config :hosts, :validate => :array, :default => ["127.0.0.1"]

# This plugin uses the bulk index API for improved indexing performance.
# This setting defines the maximum sized bulk request Logstash will make
# You you may want to increase this to be in line with your pipeline's batch size.
# If you specify a number larger than the batch size of your pipeline it will have no effect,
# save for the case where a filter increases the size of an inflight batch by outputting
# events.
mod.config :flush_size, :validate => :number, :default => 500
# DEPRECATED. We now restrict bulk sizes to a size that balances heap size with reducing round-trips. Very large events will be sent in a single request by themselves.
mod.config :flush_size, :validate => :number, :default => 500, :deprecated => "No Longer Needed"

# The amount of time since last flush before a flush is forced.
#
Expand Down
41 changes: 34 additions & 7 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
require 'logstash/outputs/elasticsearch/http_client/manticore_adapter'

module LogStash; module Outputs; class ElasticSearch;
TARGET_BULK_BYTES = 20 * 1024 * 1024

class HttpClient
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
# This is here in case we use DEFAULT_OPTIONS in the future
Expand Down Expand Up @@ -38,23 +40,48 @@ def get_version
def bulk(actions)
@action_count ||= 0
@action_count += actions.size

return if actions.empty?
bulk_body = actions.collect do |action, args, source|


bulk_actions = actions.collect do |action, args, source|
args, source = update_action_builder(args, source) if action == 'update'

if source && action != 'delete'
next [ { action => args }, source ]
else
next { action => args }
end
end.
flatten.
reduce("") do |acc,line|
acc << LogStash::Json.dump(line)
acc << "\n"
end

bulk_body = ""
bulk_responses = []
bulk_actions.each do |action|
as_json = action.is_a?(Array) ?
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)

if (bulk_body.size + as_json.size) > TARGET_BULK_BYTES
bulk_responses << bulk_send(bulk_body << "\n")
bulk_body = as_json
else
bulk_body << as_json
end
end

bulk_responses << bulk_send(bulk_body << "\n") if bulk_body.size > 0

join_bulk_responses(bulk_responses)
end

def join_bulk_responses(bulk_responses)
{
"errors" => bulk_responses.any? {|r| r["errors"] == true},
"items" => bulk_responses.reduce([]) {|m,r| m << r["items"]}
}
end

def bulk_send(bulk_body)
# Discard the URL
url, response = @pool.post("_bulk", nil, bulk_body)
LogStash::Json.load(response.body)
Expand Down
13 changes: 5 additions & 8 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
require_relative "../../../spec/es_spec_helper"

shared_examples "an indexer" do
let(:event) { super || LogStash::Event.new("message" => "Hello World!", "type" => type) }
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
let(:type) { 10.times.collect { rand(10).to_s }.join("") }
let(:event_count) { 10000 + rand(500) }
let(:flush_size) { rand(200) + 1 }
let(:config) { "not implemented" }
let(:events) { events event_count.times.map { event }.to_a }
subject { LogStash::Outputs::ElasticSearch.new(config) }

before do
subject.register
event_count.times do
subject.multi_receive([LogStash::Event.new("message" => "Hello World!", "type" => type)])
end
subject.multi_receive(events)
end

it "ships events" do
Expand Down Expand Up @@ -47,8 +46,7 @@
let(:config) {
{
"hosts" => get_host_port,
"index" => index,
"flush_size" => flush_size
"index" => index
}
}
end
Expand All @@ -60,8 +58,7 @@
let(:config) {
{
"hosts" => get_host_port,
"index" => index,
"flush_size" => flush_size
"index" => index
}
}
end
Expand Down
6 changes: 1 addition & 5 deletions spec/integration/outputs/parent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
let(:type) { 10.times.collect { rand(10).to_s }.join("") }
let(:event_count) { 10000 + rand(500) }
let(:flush_size) { rand(200) + 1 }
let(:parent) { "not_implemented" }
let(:config) { "not_implemented" }
subject { LogStash::Outputs::ElasticSearch.new(config) }
Expand All @@ -17,7 +16,7 @@
ftw.put!("#{index_url}", :body => mapping.to_json)
pdoc = { "foo" => "bar" }
ftw.put!("#{index_url}/#{type}_parent/test", :body => pdoc.to_json)

subject.register
subject.multi_receive(event_count.times.map { LogStash::Event.new("link_to" => "test", "message" => "Hello World!", "type" => type) })
end
Expand Down Expand Up @@ -49,7 +48,6 @@
{
"hosts" => get_host_port,
"index" => index,
"flush_size" => flush_size,
"parent" => parent
}
}
Expand All @@ -62,10 +60,8 @@
{
"hosts" => get_host_port,
"index" => index,
"flush_size" => flush_size,
"parent" => "%{link_to}"
}
}
end
end

4 changes: 0 additions & 4 deletions spec/integration/outputs/routing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
let(:type) { 10.times.collect { rand(10).to_s }.join("") }
let(:event_count) { 10000 + rand(500) }
let(:flush_size) { rand(200) + 1 }
let(:routing) { "not_implemented" }
let(:config) { "not_implemented" }
subject { LogStash::Outputs::ElasticSearch.new(config) }
Expand Down Expand Up @@ -43,7 +42,6 @@
{
"hosts" => get_host_port,
"index" => index,
"flush_size" => flush_size,
"routing" => routing
}
}
Expand All @@ -57,10 +55,8 @@
{
"hosts" => get_host_port,
"index" => index,
"flush_size" => flush_size,
"routing" => "%{message}"
}
}
end
end

2 changes: 1 addition & 1 deletion spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require "java"

describe LogStash::Outputs::ElasticSearch::HttpClient do
let(:base_options) { {:hosts => ["127.0.0.1"], :logger => Cabin::Channel.get }}
let(:base_options) { {:hosts => ["127.0.0.1"], :logger => Cabin::Channel.get} }

describe "Host/URL Parsing" do
subject { described_class.new(base_options) }
Expand Down
10 changes: 1 addition & 9 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
describe "#multi_receive" do
let(:events) { [double("one"), double("two"), double("three")] }
let(:events_tuples) { [double("one t"), double("two t"), double("three t")] }
let(:options) { super.merge("flush_size" => 2) }

before do
allow(eso).to receive(:retrying_submit).with(anything)
Expand All @@ -152,12 +151,6 @@
end
eso.multi_receive(events)
end

it "should receive an array of events and invoke retrying_submit with them, split by flush_size" do
expect(eso).to have_received(:retrying_submit).with(events_tuples.slice(0,2))
expect(eso).to have_received(:retrying_submit).with(events_tuples.slice(2,3))
end

end

end
Expand All @@ -169,8 +162,7 @@
{
"manage_template" => false,
"hosts" => "localhost:#{port}",
"flush_size" => 1,
"timeout" => 0.1, # fast timeout
"timeout" => 0.1 # fast timeout
}
end
let(:eso) {LogStash::Outputs::ElasticSearch.new(options)}
Expand Down
9 changes: 6 additions & 3 deletions travis-run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ trap finish EXIT

setup_es() {
download_url=$1
curl -sL $download_url > elasticsearch.tar.gz
mkdir elasticsearch
tar -xzf elasticsearch.tar.gz --strip-components=1 -C ./elasticsearch/.
if [[ ! -d elasticsearch ]]; then
curl -sL $download_url > elasticsearch.tar.gz
mkdir elasticsearch
tar -xzf elasticsearch.tar.gz --strip-components=1 -C ./elasticsearch/.
fi
rm -f elasticsearch/config/scripts || true
ln -sn ../../spec/fixtures/scripts elasticsearch/config/.
}

Expand Down

0 comments on commit 852c3fa

Please sign in to comment.