Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix high CPU on retries in json_batch mode #89

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
Gemfile.lock
.bundle
vendor
*.iml
.idea
*~
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.2
- Fix high CPU usage on retries in json_batch mode.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, one last detail, can you add the this pr's reference to the changelog entry? we have guidelines for editing the changelog here (these guidelines are somewhat recent we need to make them easier to find).


## 5.2.1
- Docs: Set the default_codec doc attribute.

Expand Down
51 changes: 17 additions & 34 deletions lib/logstash/outputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base

concurrency :shared

attr_accessor :is_batch

VALID_METHODS = ["put", "post", "patch", "delete", "get", "head"]

RETRYABLE_MANTICORE_EXCEPTIONS = [
Expand Down Expand Up @@ -107,6 +109,7 @@ def register
end
end

@is_batch = @format == "json_batch"

@headers["Content-Type"] = @content_type

Expand All @@ -118,11 +121,7 @@ def register

def multi_receive(events)
return if events.empty?
if @format == "json_batch"
send_json_batch(events)
else
send_events(events)
end
send_events(events)
end

class RetryTimerTask < java.util.TimerTask
Expand All @@ -138,29 +137,6 @@ def run
end
end

def send_json_batch(events)
attempt = 1
body = LogStash::Json.dump(events.map {|e| map_event(e) })
begin
while true
request = client.send(@http_method, @url, :body => body, :headers => @headers)
response = request.call
break if response_success?(response)
if retryable_response?(response)
log_retryable_response(response)
sleep_for_attempt attempt
attempt += 1
else
log_error_response(response, url, events)
end
end
rescue *RETRYABLE_MANTICORE_EXCEPTIONS => e
logger.warn("Encountered exception during http output send, will retry after delay", :message => e.message, :class => e.class.name)
sleep_for_attempt attempt
retry
end
end

def log_retryable_response(response)
if (response.code == 429)
@logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP")
Expand All @@ -182,15 +158,20 @@ def send_events(events)
successes = java.util.concurrent.atomic.AtomicInteger.new(0)
failures = java.util.concurrent.atomic.AtomicInteger.new(0)
retries = java.util.concurrent.atomic.AtomicInteger.new(0)

event_count = @is_batch ? 1 : events.size

pending = Queue.new
events.each {|e| pending << [e, 0]}
if @is_batch
pending << [events, 0]
else
events.each {|e| pending << [e, 0]}
end

while popped = pending.pop
break if popped == :done

event, attempt = popped

send_event(event, attempt) do |action,event,attempt|
begin
action = :failure if action == :retry && !@retry_failed
Expand All @@ -213,7 +194,7 @@ def send_events(events)
end

if action == :success || action == :failure
if successes.get+failures.get == events.size
if successes.get+failures.get == event_count
pending << :done
end
end
Expand Down Expand Up @@ -246,8 +227,8 @@ def send_event(event, attempt)
body = event_body(event)

# Send the request
url = event.sprintf(@url)
headers = event_headers(event)
url = @is_batch ? @url : event.sprintf(@url)
headers = @is_batch ? @headers : event_headers(event)

# Compress the body and add appropriate header
if @http_compression == true
Expand Down Expand Up @@ -347,6 +328,8 @@ def event_body(event)
LogStash::Json.dump(map_event(event))
elsif @format == "message"
event.sprintf(@message)
elsif @format == "json_batch"
LogStash::Json.dump(event.map {|e| map_event(e) })
else
encode(map_event(event))
end
Expand Down