Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix high CPU on retries in json_batch mode #89

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
Gemfile.lock
.bundle
vendor
*.iml
.idea
*~
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 5.2.2
- Fix high CPU usage on retries in json_batch mode. [#89](https://github.com/logstash-plugins/logstash-output-http/pull/89)
- Enable compression in json_batch mode. [#89](https://github.com/logstash-plugins/logstash-output-http/pull/89)

## 5.2.1
- Docs: Set the default_codec doc attribute.

Expand Down
51 changes: 17 additions & 34 deletions lib/logstash/outputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base

concurrency :shared

attr_accessor :is_batch

VALID_METHODS = ["put", "post", "patch", "delete", "get", "head"]

RETRYABLE_MANTICORE_EXCEPTIONS = [
Expand Down Expand Up @@ -107,6 +109,7 @@ def register
end
end

@is_batch = @format == "json_batch"

@headers["Content-Type"] = @content_type

Expand All @@ -118,11 +121,7 @@ def register

def multi_receive(events)
return if events.empty?
if @format == "json_batch"
send_json_batch(events)
else
send_events(events)
end
send_events(events)
end

class RetryTimerTask < java.util.TimerTask
Expand All @@ -138,29 +137,6 @@ def run
end
end

def send_json_batch(events)
attempt = 1
body = LogStash::Json.dump(events.map {|e| map_event(e) })
begin
while true
request = client.send(@http_method, @url, :body => body, :headers => @headers)
response = request.call
break if response_success?(response)
if retryable_response?(response)
log_retryable_response(response)
sleep_for_attempt attempt
attempt += 1
else
log_error_response(response, url, events)
end
end
rescue *RETRYABLE_MANTICORE_EXCEPTIONS => e
logger.warn("Encountered exception during http output send, will retry after delay", :message => e.message, :class => e.class.name)
sleep_for_attempt attempt
retry
end
end

def log_retryable_response(response)
if (response.code == 429)
@logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP")
Expand All @@ -182,15 +158,20 @@ def send_events(events)
successes = java.util.concurrent.atomic.AtomicInteger.new(0)
failures = java.util.concurrent.atomic.AtomicInteger.new(0)
retries = java.util.concurrent.atomic.AtomicInteger.new(0)

event_count = @is_batch ? 1 : events.size

pending = Queue.new
events.each {|e| pending << [e, 0]}
if @is_batch
pending << [events, 0]
else
events.each {|e| pending << [e, 0]}
end

while popped = pending.pop
break if popped == :done

event, attempt = popped

send_event(event, attempt) do |action,event,attempt|
begin
action = :failure if action == :retry && !@retry_failed
Expand All @@ -213,7 +194,7 @@ def send_events(events)
end

if action == :success || action == :failure
if successes.get+failures.get == events.size
if successes.get+failures.get == event_count
pending << :done
end
end
Expand Down Expand Up @@ -246,8 +227,8 @@ def send_event(event, attempt)
body = event_body(event)

# Send the request
url = event.sprintf(@url)
headers = event_headers(event)
url = @is_batch ? @url : event.sprintf(@url)
headers = @is_batch ? @headers : event_headers(event)

# Compress the body and add appropriate header
if @http_compression == true
Expand Down Expand Up @@ -347,6 +328,8 @@ def event_body(event)
LogStash::Json.dump(map_event(event))
elsif @format == "message"
event.sprintf(@message)
elsif @format == "json_batch"
LogStash::Json.dump(event.map {|e| map_event(e) })
else
encode(map_event(event))
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-http.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-http'
s.version = '5.2.1'
s.version = '5.2.2'
s.licenses = ['Apache License (2.0)']
s.summary = "Sends events to a generic HTTP or HTTPS endpoint"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down