Skip to content

Commit

Permalink
fix: remove deadlock possibility by adding resend_queue queue (#73)
Browse files Browse the repository at this point in the history
* fix: remove deadlock possibility by adding resend_queue queue

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* fix: fix queues management due to tests

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* refactor: restrict rescue scope

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* chore: release 1.4.1

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>

* refactor: restrict rescue scope

Signed-off-by: Dominik Rosiek <drosiek@sumologic.com>
  • Loading branch information
sumo-drosiek authored Mar 9, 2022
1 parent b8cc6fb commit 76cbbc0
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## 1.4.1 (2022-03-09)

- [#73](https://github.com/SumoLogic/logstash-output-sumologic/pull/73) fix: remove deadlock possibility by adding resend_queue queue

## 1.4.0 (2021-09-27)

- [#68](https://github.com/SumoLogic/logstash-output-sumologic/pull/68) feat: retry on 502 error code
Expand Down
70 changes: 45 additions & 25 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
logstash-output-sumologic (1.4.0)
logstash-output-sumologic (1.4.1)
logstash-core-plugin-api (>= 1.60, <= 2.99)
logstash-mixin-http_client (>= 6, < 8)
manticore (>= 0.5.4, < 1.0.0)
Expand All @@ -13,8 +13,8 @@ GEM
numerizer (~> 0.1.1)
clamp (0.6.5)
coderay (1.1.3)
concurrent-ruby (1.1.7)
diff-lcs (1.4.4)
concurrent-ruby (1.1.9)
diff-lcs (1.5.0)
elasticsearch (5.0.5)
elasticsearch-api (= 5.0.5)
elasticsearch-transport (= 5.0.5)
Expand All @@ -23,18 +23,38 @@ GEM
elasticsearch-transport (5.0.5)
faraday
multi_json
faraday (1.1.0)
faraday (1.10.0)
faraday-em_http (~> 1.0)
faraday-em_synchrony (~> 1.0)
faraday-excon (~> 1.1)
faraday-httpclient (~> 1.0)
faraday-multipart (~> 1.0)
faraday-net_http (~> 1.0)
faraday-net_http_persistent (~> 1.0)
faraday-patron (~> 1.0)
faraday-rack (~> 1.0)
faraday-retry (~> 1.0)
ruby2_keywords (>= 0.0.4)
faraday-em_http (1.0.0)
faraday-em_synchrony (1.0.0)
faraday-excon (1.1.0)
faraday-httpclient (1.0.1)
faraday-multipart (1.0.3)
multipart-post (>= 1.2, < 3)
ruby2_keywords
ffi (1.13.1-java)
faraday-net_http (1.0.1)
faraday-net_http_persistent (1.2.0)
faraday-patron (1.0.0)
faraday-rack (1.0.0)
faraday-retry (1.0.3)
ffi (1.15.5-java)
filesize (0.0.4)
fivemat (1.3.7)
gem_publisher (1.5.0)
gems (0.8.3)
i18n (0.6.9)
insist (1.0.0)
jar-dependencies (0.4.1)
jrjackson (0.4.13-java)
jrjackson (0.4.14-java)
jruby-openssl (0.9.19-java)
kramdown (1.14.0)
logstash-codec-plain (3.0.6)
Expand Down Expand Up @@ -74,11 +94,11 @@ GEM
rspec (~> 3.0)
rspec-wait
stud (>= 0.0.20)
logstash-mixin-http_client (7.0.0)
logstash-mixin-http_client (7.1.0)
logstash-codec-plain
logstash-core-plugin-api (>= 1.60, <= 2.99)
manticore (>= 0.5.2, < 1.0.0)
manticore (0.7.0-java)
manticore (>= 0.8.0, < 1.0.0)
manticore (0.8.0-java)
openssl_pkcs8_pure
method_source (0.8.2)
minitar (0.5.4)
Expand All @@ -96,27 +116,27 @@ GEM
rack (1.6.6)
rack-protection (1.5.5)
rack
rake (13.0.1)
rspec (3.10.0)
rspec-core (~> 3.10.0)
rspec-expectations (~> 3.10.0)
rspec-mocks (~> 3.10.0)
rspec-core (3.10.0)
rspec-support (~> 3.10.0)
rake (13.0.6)
rspec (3.11.0)
rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0)
rspec-mocks (~> 3.11.0)
rspec-core (3.11.0)
rspec-support (~> 3.11.0)
rspec-eventually (0.2.2)
rspec-expectations (3.10.0)
rspec-expectations (3.11.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.10.0)
rspec-mocks (3.10.0)
rspec-support (~> 3.11.0)
rspec-mocks (3.11.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.10.0)
rspec-support (3.10.0)
rspec-support (~> 3.11.0)
rspec-support (3.11.0)
rspec-wait (0.0.9)
rspec (>= 3, < 4)
ruby-maven (3.3.12)
ruby-maven (3.3.13)
ruby-maven-libs (~> 3.3.9)
ruby-maven-libs (3.3.9)
ruby2_keywords (0.0.2)
ruby2_keywords (0.0.5)
rubyzip (1.1.7)
sinatra (1.4.8)
rack (~> 1.5)
Expand Down Expand Up @@ -145,4 +165,4 @@ DEPENDENCIES
rspec-eventually

BUNDLED WITH
2.2.27
2.3.7
4 changes: 2 additions & 2 deletions lib/logstash/outputs/sumologic/message_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def enq(batch)
end
end # def enq

def deq()
batch = @queue.deq()
def deq(non_block: false)
batch = @queue.deq(non_block: non_block)
batch_size = batch.payload.bytesize
@stats.record_deque(batch_size)
@queue_bytesize.update { |v| v - batch_size }
Expand Down
36 changes: 30 additions & 6 deletions lib/logstash/outputs/sumologic/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ def initialize(client, queue, stats, config)
@sender_max = (config["sender_max"] ||= 1) < 1 ? 1 : config["sender_max"]
@sleep_before_requeue = config["sleep_before_requeue"] ||= 30
@stats_enabled = config["stats_enabled"] ||= false
@iteration_sleep = 0.3

@tokens = SizedQueue.new(@sender_max)
@sender_max.times { |t| @tokens << t }

# Make resend_queue twice as big as sender_max,
# because if one batch is processed, the next one is already waiting in the thread
@resend_queue = SizedQueue.new(2*@sender_max)
@compressor = LogStash::Outputs::SumoLogic::Compressor.new(config)

end # def initialize
Expand All @@ -39,9 +43,24 @@ def start()
@stopping.make_false()
@sender_t = Thread.new {
while @stopping.false?
batch = @queue.deq()
begin
# Resend batch if any in the queue
batch = @resend_queue.deq(non_block: true)
rescue ThreadError
# send new batch otherwise
begin
batch = @queue.deq(non_block: true)
rescue ThreadError
Stud.stoppable_sleep(@iteration_sleep) { @stopping.true? }
next
end
end
send_request(batch)
end # while
@resend_queue.size.times.map { |queue|
batch = queue.deq()
send_request(batch)
}
@queue.drain().map { |batch|
send_request(batch)
}
Expand Down Expand Up @@ -98,6 +117,7 @@ def send_request(batch)
return
end

# wait for token so we do not exceed number of request in background
token = @tokens.pop()

if @stats_enabled && content.start_with?(STATS_TAG)
Expand All @@ -111,11 +131,9 @@ def send_request(batch)
:content_size => content.size,
:content => content[0..20],
:payload_size => body.size)

# send request in background
request = @client.send(:background).send(:post, @url, :body => body, :headers => headers)

request.on_complete do
@tokens << token
end

request.on_success do |response|
@stats.record_response_success(response.code)
Expand All @@ -126,12 +144,16 @@ def send_request(batch)
:headers => headers,
:contet => content[0..20])
if response.code == 429 || response.code == 502 || response.code == 503 || response.code == 504
# requeue and release token
requeue_message(batch)
@tokens << token
end
else
log_dbg("request accepted",
:token => token,
:code => response.code)
# release token
@tokens << token
end
end

Expand All @@ -143,6 +165,8 @@ def send_request(batch)
:class => exception.class.name,
:backtrace => exception.backtrace)
requeue_message(batch)
# requeue and release token
@tokens << token
end

@stats.record_request(content.bytesize, body.bytesize)
Expand All @@ -162,7 +186,7 @@ def requeue_message(batch)
:content => content[0..20],
:headers => batch.headers)
Stud.stoppable_sleep(@sleep_before_requeue) { @stopping.true? }
@queue.enq(batch)
@resend_queue.enq(batch)
end
end # def reque_message

Expand Down
2 changes: 1 addition & 1 deletion logstash-output-sumologic.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-sumologic'
s.version = '1.4.0'
s.version = '1.4.1'
s.licenses = ['Apache-2.0']
s.summary = 'Deliever the log to Sumo Logic cloud service.'
s.description = 'This gem is a Logstash output plugin to deliver the log or metrics to Sumo Logic cloud service. Go to https://github.com/SumoLogic/logstash-output-sumologic for getting help, reporting issues, etc.'
Expand Down

0 comments on commit 76cbbc0

Please sign in to comment.