From 76cbbc007943f0fce37cecd801b2f472262b52cd Mon Sep 17 00:00:00 2001 From: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Date: Wed, 9 Mar 2022 13:04:56 +0100 Subject: [PATCH] fix: remove deadlock possibility by adding resend_queue queue (#73) * fix: remove deadlock possibility by adding resend_queue queue Signed-off-by: Dominik Rosiek * fix: fix queues management due to tests Signed-off-by: Dominik Rosiek * refactor: restrict rescue scope Signed-off-by: Dominik Rosiek * chore: release 1.4.1 Signed-off-by: Dominik Rosiek * refactor: restrict rescue scope Signed-off-by: Dominik Rosiek --- CHANGELOG.md | 4 ++ Gemfile.lock | 70 ++++++++++++------- .../outputs/sumologic/message_queue.rb | 4 +- lib/logstash/outputs/sumologic/sender.rb | 36 ++++++++-- logstash-output-sumologic.gemspec | 2 +- 5 files changed, 82 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f59c4c..46274b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Change Log +## 1.4.1 (2022-03-09) + +- [#73](https://github.com/SumoLogic/logstash-output-sumologic/pull/73) fix: remove deadlock possibility by adding resend_queue queue + ## 1.4.0 (2021-09-27) - [#68](https://github.com/SumoLogic/logstash-output-sumologic/pull/68) feat: retry on 502 error code diff --git a/Gemfile.lock b/Gemfile.lock index 9aa02a4..d04f29c 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - logstash-output-sumologic (1.4.0) + logstash-output-sumologic (1.4.1) logstash-core-plugin-api (>= 1.60, <= 2.99) logstash-mixin-http_client (>= 6, < 8) manticore (>= 0.5.4, < 1.0.0) @@ -13,8 +13,8 @@ GEM numerizer (~> 0.1.1) clamp (0.6.5) coderay (1.1.3) - concurrent-ruby (1.1.7) - diff-lcs (1.4.4) + concurrent-ruby (1.1.9) + diff-lcs (1.5.0) elasticsearch (5.0.5) elasticsearch-api (= 5.0.5) elasticsearch-transport (= 5.0.5) @@ -23,10 +23,30 @@ GEM elasticsearch-transport (5.0.5) faraday multi_json - faraday (1.1.0) + faraday (1.10.0) + faraday-em_http (~> 1.0) + faraday-em_synchrony (~> 1.0) + faraday-excon (~> 1.1) + faraday-httpclient (~> 1.0) + faraday-multipart (~> 1.0) + faraday-net_http (~> 1.0) + faraday-net_http_persistent (~> 1.0) + faraday-patron (~> 1.0) + faraday-rack (~> 1.0) + faraday-retry (~> 1.0) + ruby2_keywords (>= 0.0.4) + faraday-em_http (1.0.0) + faraday-em_synchrony (1.0.0) + faraday-excon (1.1.0) + faraday-httpclient (1.0.1) + faraday-multipart (1.0.3) multipart-post (>= 1.2, < 3) - ruby2_keywords - ffi (1.13.1-java) + faraday-net_http (1.0.1) + faraday-net_http_persistent (1.2.0) + faraday-patron (1.0.0) + faraday-rack (1.0.0) + faraday-retry (1.0.3) + ffi (1.15.5-java) filesize (0.0.4) fivemat (1.3.7) gem_publisher (1.5.0) @@ -34,7 +54,7 @@ GEM i18n (0.6.9) insist (1.0.0) jar-dependencies (0.4.1) - jrjackson (0.4.13-java) + jrjackson (0.4.14-java) jruby-openssl (0.9.19-java) kramdown (1.14.0) logstash-codec-plain (3.0.6) @@ -74,11 +94,11 @@ GEM rspec (~> 3.0) rspec-wait stud (>= 0.0.20) - logstash-mixin-http_client (7.0.0) + logstash-mixin-http_client (7.1.0) logstash-codec-plain logstash-core-plugin-api (>= 1.60, <= 2.99) - manticore (>= 0.5.2, < 1.0.0) - manticore (0.7.0-java) + manticore (>= 0.8.0, < 1.0.0) + manticore (0.8.0-java) openssl_pkcs8_pure method_source (0.8.2) minitar (0.5.4) @@ -96,27 +116,27 @@ GEM rack (1.6.6) rack-protection (1.5.5) rack - rake (13.0.1) - rspec (3.10.0) - rspec-core (~> 3.10.0) - rspec-expectations (~> 3.10.0) - rspec-mocks (~> 3.10.0) - rspec-core (3.10.0) - rspec-support (~> 3.10.0) + rake (13.0.6) + rspec (3.11.0) + rspec-core (~> 3.11.0) + rspec-expectations (~> 3.11.0) + rspec-mocks (~> 3.11.0) + rspec-core (3.11.0) + rspec-support (~> 3.11.0) rspec-eventually (0.2.2) - rspec-expectations (3.10.0) + rspec-expectations (3.11.0) diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.10.0) - rspec-mocks (3.10.0) + rspec-support (~> 3.11.0) + rspec-mocks (3.11.0) diff-lcs (>= 1.2.0, < 2.0) - rspec-support (~> 3.10.0) - rspec-support (3.10.0) + rspec-support (~> 3.11.0) + rspec-support (3.11.0) rspec-wait (0.0.9) rspec (>= 3, < 4) - ruby-maven (3.3.12) + ruby-maven (3.3.13) ruby-maven-libs (~> 3.3.9) ruby-maven-libs (3.3.9) - ruby2_keywords (0.0.2) + ruby2_keywords (0.0.5) rubyzip (1.1.7) sinatra (1.4.8) rack (~> 1.5) @@ -145,4 +165,4 @@ DEPENDENCIES rspec-eventually BUNDLED WITH - 2.2.27 + 2.3.7 diff --git a/lib/logstash/outputs/sumologic/message_queue.rb b/lib/logstash/outputs/sumologic/message_queue.rb index 97cb91c..0bfbe16 100644 --- a/lib/logstash/outputs/sumologic/message_queue.rb +++ b/lib/logstash/outputs/sumologic/message_queue.rb @@ -27,8 +27,8 @@ def enq(batch) end end # def enq - def deq() - batch = @queue.deq() + def deq(non_block: false) + batch = @queue.deq(non_block: non_block) batch_size = batch.payload.bytesize @stats.record_deque(batch_size) @queue_bytesize.update { |v| v - batch_size } diff --git a/lib/logstash/outputs/sumologic/sender.rb b/lib/logstash/outputs/sumologic/sender.rb index 881de42..4a8a97c 100644 --- a/lib/logstash/outputs/sumologic/sender.rb +++ b/lib/logstash/outputs/sumologic/sender.rb @@ -24,10 +24,14 @@ def initialize(client, queue, stats, config) @sender_max = (config["sender_max"] ||= 1) < 1 ? 1 : config["sender_max"] @sleep_before_requeue = config["sleep_before_requeue"] ||= 30 @stats_enabled = config["stats_enabled"] ||= false + @iteration_sleep = 0.3 @tokens = SizedQueue.new(@sender_max) @sender_max.times { |t| @tokens << t } + # Make resend_queue twice as big as sender_max, + # because if one batch is processed, the next one is already waiting in the thread + @resend_queue = SizedQueue.new(2*@sender_max) @compressor = LogStash::Outputs::SumoLogic::Compressor.new(config) end # def initialize @@ -39,9 +43,24 @@ def start() @stopping.make_false() @sender_t = Thread.new { while @stopping.false? - batch = @queue.deq() + begin + # Resend batch if any in the queue + batch = @resend_queue.deq(non_block: true) + rescue ThreadError + # send new batch otherwise + begin + batch = @queue.deq(non_block: true) + rescue ThreadError + Stud.stoppable_sleep(@iteration_sleep) { @stopping.true? } + next + end + end send_request(batch) end # while + @resend_queue.size.times.map { |queue| + batch = queue.deq() + send_request(batch) + } @queue.drain().map { |batch| send_request(batch) } @@ -98,6 +117,7 @@ def send_request(batch) return end + # wait for token so we do not exceed number of request in background token = @tokens.pop() if @stats_enabled && content.start_with?(STATS_TAG) @@ -111,11 +131,9 @@ def send_request(batch) :content_size => content.size, :content => content[0..20], :payload_size => body.size) + + # send request in background request = @client.send(:background).send(:post, @url, :body => body, :headers => headers) - - request.on_complete do - @tokens << token - end request.on_success do |response| @stats.record_response_success(response.code) @@ -126,12 +144,16 @@ def send_request(batch) :headers => headers, :contet => content[0..20]) if response.code == 429 || response.code == 502 || response.code == 503 || response.code == 504 + # requeue and release token requeue_message(batch) + @tokens << token end else log_dbg("request accepted", :token => token, :code => response.code) + # release token + @tokens << token end end @@ -143,6 +165,8 @@ def send_request(batch) :class => exception.class.name, :backtrace => exception.backtrace) requeue_message(batch) + # requeue and release token + @tokens << token end @stats.record_request(content.bytesize, body.bytesize) @@ -162,7 +186,7 @@ def requeue_message(batch) :content => content[0..20], :headers => batch.headers) Stud.stoppable_sleep(@sleep_before_requeue) { @stopping.true? } - @queue.enq(batch) + @resend_queue.enq(batch) end end # def reque_message diff --git a/logstash-output-sumologic.gemspec b/logstash-output-sumologic.gemspec index 78621a9..bbdb249 100644 --- a/logstash-output-sumologic.gemspec +++ b/logstash-output-sumologic.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-sumologic' - s.version = '1.4.0' + s.version = '1.4.1' s.licenses = ['Apache-2.0'] s.summary = 'Deliever the log to Sumo Logic cloud service.' s.description = 'This gem is a Logstash output plugin to deliver the log or metrics to Sumo Logic cloud service. Go to https://github.com/SumoLogic/logstash-output-sumologic for getting help, reporting issues, etc.'