diff --git a/.ci/docker-compose.override.yml b/.ci/docker-compose.override.yml new file mode 100644 index 0000000..7c3fd31 --- /dev/null +++ b/.ci/docker-compose.override.yml @@ -0,0 +1,5 @@ +version: '3' + +services: + logstash: + network_mode: host diff --git a/.ci/run.sh b/.ci/run.sh new file mode 100755 index 0000000..daf3303 --- /dev/null +++ b/.ci/run.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# This is intended to be run inside the docker container as the command of the docker-compose. + +env + +set -ex + +jruby -rbundler/setup -S rspec -fd + +jruby -rbundler/setup -S rspec -fd --tag redis diff --git a/.travis.yml b/.travis.yml index a50fc73..028f060 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,13 @@ import: -- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file +- logstash-plugins/.ci:travis/travis.yml@1.x + +addons: + apt: + sources: + - sourceline: 'ppa:chris-lea/redis-server' + packages: + - redis-server + +before_install: + - sudo service redis-server stop + - sudo service redis-server start --bind 0.0.0.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 81972ab..3f6a485 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 3.7.0 + - Fix: make sure plugin can be stop-ed in case of a channel data_type [#87](https://github.com/logstash-plugins/logstash-input-redis/pull/87) + - Test: start running integration specs on CI + ## 3.6.1 - Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index c92c170..41c9058 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -107,28 +107,26 @@ def is_list_type? # private def redis_params + params = { + :timeout => @timeout, + :db => @db, + :password => @password.nil? ? nil : @password.value, + :ssl => @ssl + } + if @path.nil? - connectionParams = { - :host => @host, - :port => @port - } + params[:host] = @host + params[:port] = @port else @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'") - connectionParams = { - :path => @path - } + params[:path] = @path end - baseParams = { - :timeout => @timeout, - :db => @db, - :password => @password.nil? ? nil : @password.value, - :ssl => @ssl - } - - return connectionParams.merge(baseParams) + params end + TIMEOUT = 5 # Redis only supports Integer values + def new_redis_instance ::Redis.new(redis_params) end @@ -174,9 +172,12 @@ def queue_event(msg, output_queue, channel=nil) # private def list_stop - return if @redis.nil? || !@redis.connected? + redis = @redis # might change during method invocation + return if redis.nil? || !redis.connected? - @redis.quit rescue nil + redis.quit rescue nil + # check if input retried while executing + list_stop unless redis.equal? @redis @redis = nil end @@ -186,15 +187,8 @@ def list_runner(output_queue) begin @redis ||= connect @list_method.call(@redis, output_queue) - rescue ::Redis::BaseError => e - info = { message: e.message, exception: e.class } - info[:backtrace] = e.backtrace if @logger.debug? - @logger.warn("Redis connection problem", info) - # Reset the redis variable to trigger reconnect - @redis = nil - # this sleep does not need to be stoppable as its - # in a while !stop? loop - sleep 1 + rescue => e + retry if handle_error(e) end end end @@ -238,7 +232,7 @@ def list_batch_listener(redis, output_queue) end def list_single_listener(redis, output_queue) - item = redis.blpop(@key, 0, :timeout => 1) + item = redis.blpop(@key, 0, :timeout => TIMEOUT) return unless item # from timeout or other conditions # blpop returns the 'key' read from as well as the item result @@ -248,18 +242,20 @@ def list_single_listener(redis, output_queue) # private def subscribe_stop - return if @redis.nil? || !@redis.connected? - # if its a SubscribedClient then: - # it does not have a disconnect method (yet) - if @redis.subscribed? + redis = @redis # might change during method invocation + return if redis.nil? || !redis.connected? + + if redis.subscribed? if @data_type == 'pattern_channel' - @redis.punsubscribe + redis.punsubscribe else - @redis.unsubscribe + redis.unsubscribe end else - @redis.disconnect! + redis.disconnect! end + # check if input retried while executing + subscribe_stop unless redis.equal? @redis @redis = nil end @@ -268,27 +264,48 @@ def redis_runner begin @redis ||= connect yield - rescue ::Redis::BaseError => e - @logger.warn("Redis connection problem", :exception => e) - # Reset the redis variable to trigger reconnect - @redis = nil - Stud.stoppable_sleep(1) { stop? } - retry if !stop? + rescue => e + retry if handle_error(e) + end + end + + def handle_error(e) + info = { message: e.message, exception: e.class } + info[:backtrace] = e.backtrace if @logger.debug? + + case e + when ::Redis::TimeoutError + # expected for channels in case no data is available + @logger.debug("Redis timeout, retrying", info) + when ::Redis::BaseConnectionError, ::Redis::ProtocolError + @logger.warn("Redis connection error", info) + when ::Redis::BaseError + @logger.error("Redis error", info) + when ::LogStash::ShutdownSignal + @logger.debug("Received shutdown signal") + return false # stop retry-ing + else + info[:backtrace] ||= e.backtrace + @logger.error("Unexpected error", info) end + + # Reset the redis variable to trigger reconnect + @redis = nil + + Stud.stoppable_sleep(1) { stop? } + !stop? # return true unless stop? end # private def channel_runner(output_queue) - redis_runner do - channel_listener(output_queue) - end + redis_runner { channel_listener(output_queue) } end # private def channel_listener(output_queue) - @redis.subscribe(@key) do |on| + @redis.subscribe_with_timeout(TIMEOUT, @key) do |on| on.subscribe do |channel, count| - @logger.info("Subscribed", :channel => channel, :count => count) + @logger.debug("Subscribed", :channel => channel, :count => count) end on.message do |channel, message| @@ -296,22 +313,20 @@ def channel_listener(output_queue) end on.unsubscribe do |channel, count| - @logger.info("Unsubscribed", :channel => channel, :count => count) + @logger.debug("Unsubscribed", :channel => channel, :count => count) end end end def pattern_channel_runner(output_queue) - redis_runner do - pattern_channel_listener(output_queue) - end + redis_runner { pattern_channel_listener(output_queue) } end # private def pattern_channel_listener(output_queue) - @redis.psubscribe @key do |on| + @redis.psubscribe_with_timeout(TIMEOUT, @key) do |on| on.psubscribe do |channel, count| - @logger.info("Subscribed", :channel => channel, :count => count) + @logger.debug("Subscribed", :channel => channel, :count => count) end on.pmessage do |pattern, channel, message| @@ -319,11 +334,9 @@ def pattern_channel_listener(output_queue) end on.punsubscribe do |channel, count| - @logger.info("Unsubscribed", :channel => channel, :count => count) + @logger.debug("Unsubscribed", :channel => channel, :count => count) end end end -# end - end end end # Redis Inputs LogStash diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index 030338f..520cacd 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-redis' - s.version = '3.6.1' + s.version = '3.7.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from a Redis instance" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 45a6318..17c2b44 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -17,11 +17,15 @@ def populate(key, event_count) end def process(conf, event_count) - events = input(conf) do |pipeline, queue| - event_count.times.map{queue.pop} + events = input(conf) do |_, queue| + sleep 0.1 until queue.size >= event_count + queue.size.times.map { queue.pop } end - - expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a) + # due multiple workers we get events out-of-order in the output + events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') } + expect(events[0].get('sequence')).to eq(0) + expect(events[100].get('sequence')).to eq(100) + expect(events[1000].get('sequence')).to eq(1000) end # integration tests --------------------- @@ -31,7 +35,6 @@ def process(conf, event_count) it "should read events from a list" do key = SecureRandom.hex event_count = 1000 + rand(50) - # event_count = 100 conf = <<-CONFIG input { redis { @@ -163,7 +166,6 @@ def process(conf, event_count) allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| expect(command[0]).to eql :blpop expect(command[1]).to eql ['foo', 0] - expect(command[2]).to eql 1 end.and_return ['foo', "{\"foo1\":\"bar\""], nil tt = Thread.new do @@ -178,6 +180,33 @@ def process(conf, event_count) expect( queue.size ).to be > 0 end + it 'keep running when a connection error occurs' do + raised = false + allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + expect(command[0]).to eql :blpop + unless raised + raised = true + raise Redis::CannotConnectError.new('test') + end + ['foo', "{\"after\":\"raise\"}"] + end + + expect(subject.logger).to receive(:warn).with('Redis connection error', + hash_including(:message=>"test", :exception=>Redis::CannotConnectError) + ).and_call_original + + tt = Thread.new do + sleep 1.5 # allow for retry (sleep) after handle_error + subject.do_stop + end + + subject.run(queue) + + tt.join + + expect( queue.size ).to be > 0 + end + context "when the batch size is greater than 1" do let(:batch_count) { 10 } @@ -233,9 +262,6 @@ def process(conf, event_count) end it 'multiple close calls, calls to redis once' do - # subject.use_redis(redis) - # allow(redis).to receive(:blpop).and_return(['foo', 'l1']) - # expect(redis).to receive(:connected?).and_return(connected.last) allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false # allow_any_instance_of( Redis::Client ).to receive(:disconnect) quit_calls.each do |call| @@ -249,9 +275,12 @@ def process(conf, event_count) end context 'for the subscribe data_types' do - def run_it_thread(inst) - Thread.new(inst) do |subj| - subj.run(queue) + + before { subject.register } + + def run_it_thread(plugin) + Thread.new(plugin) do |subject| + subject.run(queue) end end @@ -264,8 +293,8 @@ def publish_thread(new_redis, prefix) end end - def close_thread(inst, rt) - Thread.new(inst, rt) do |subj, runner| + def close_thread(plugin, runner_thread) + Thread.new(plugin, runner_thread) do |subject, runner| # block for the messages e1 = queue.pop e2 = queue.pop @@ -273,7 +302,20 @@ def close_thread(inst, rt) queue.push(e1) queue.push(e2) runner.raise(LogStash::ShutdownSignal) - subj.close + subject.close + end + end + + def stub_plugin_timeout(timeout) + value = LogStash::Inputs::Redis::TIMEOUT + begin + LogStash::Inputs::Redis.send :remove_const, :TIMEOUT + LogStash::Inputs::Redis.const_set :TIMEOUT, timeout + + yield + ensure + LogStash::Inputs::Redis.send :remove_const, :TIMEOUT rescue nil + LogStash::Inputs::Redis.const_set :TIMEOUT, value end end @@ -289,6 +331,8 @@ def close_thread(inst, rt) let(:data_type) { 'channel' } let(:quit_calls) { [:unsubscribe, :connection] } + before { subject.register } + context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop @@ -308,6 +352,23 @@ def close_thread(inst, rt) expect(queue.size).to eq(2) end + + it 'calling the run method, adds events to the queue (after timeout)' do + stub_plugin_timeout(0.5) do + #simulate the input thread + rt = run_it_thread(subject) + [ :warn, :error ].each { |level| expect(subject.logger).not_to receive(level) } + #make sure the Redis call times out and gets retried + sleep(LogStash::Inputs::Redis::TIMEOUT * 4) + #simulate the other system thread + publish_thread(subject.send(:new_redis_instance), 'c').join + #simulate the pipeline thread + close_thread(subject, rt).join + + expect(queue.size).to eq(2) + end + end + it 'events had redis_channel' do #simulate the input thread rt = run_it_thread(subject) @@ -347,6 +408,22 @@ def close_thread(inst, rt) expect(queue.size).to eq(2) end + it 'calling the run method, adds events to the queue (after timeout)' do + stub_plugin_timeout(0.5) do + #simulate the input thread + rt = run_it_thread(subject) + [ :warn, :error ].each { |level| expect(subject.logger).not_to receive(level) } + #make sure the Redis call times out and gets retried + sleep(LogStash::Inputs::Redis::TIMEOUT * 4) + #simulate the other system thread + publish_thread(subject.send(:new_redis_instance), 'c').join + #simulate the pipeline thread + close_thread(subject, rt).join + + expect(queue.size).to eq(2) + end + end + it 'events had redis_channel' do #simulate the input thread rt = run_it_thread(subject)