From 37384561d42c841f42007691eebb9b3b6d34d2f8 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 1 Apr 2021 19:01:16 +0200 Subject: [PATCH 1/6] Fix: do not rely on redis.client anymore --- lib/logstash/inputs/redis.rb | 15 ++++++--------- logstash-input-redis.gemspec | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 7f0a19b..a82935f 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -157,11 +157,8 @@ def connect redis = new_redis_instance # register any renamed Redis commands - if @command_map.any? - client_command_map = redis.client.command_map - @command_map.each do |name, renamed| - client_command_map[name.to_sym] = renamed.to_sym - end + @command_map.each do |name, renamed| + redis._client.command_map[name.to_sym] = renamed.to_sym end load_batch_script(redis) if batched? && is_list_type? @@ -270,14 +267,14 @@ def subscribe_stop return if @redis.nil? || !@redis.connected? # if its a SubscribedClient then: # it does not have a disconnect method (yet) - if @redis.client.is_a?(::Redis::SubscribedClient) + if @redis.subscribed? if @data_type == 'pattern_channel' - @redis.client.punsubscribe + @redis.punsubscribe else - @redis.client.unsubscribe + @redis.unsubscribe end else - @redis.client.disconnect + @redis.disconnect! end @redis = nil end diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index 9ef3f4c..99701ad 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json' - s.add_runtime_dependency 'redis', '~> 4' + s.add_runtime_dependency 'redis', '>= 4.0.1', '< 5' s.add_development_dependency 'logstash-devutils' end From 0045b64a96bcf9016124e5f520570f2845100823 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 6 Apr 2021 19:49:25 +0200 Subject: [PATCH 2/6] Test: redo specs with 'real' Redis mocking --- spec/inputs/redis_spec.rb | 289 +++++++++++++++++--------------------- 1 file changed, 129 insertions(+), 160 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 6675a8e..86e2652 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -1,12 +1,12 @@ require "logstash/devutils/rspec/spec_helper" require "logstash/devutils/rspec/shared_examples" -require "redis" -require "stud/try" require 'logstash/inputs/redis' require 'securerandom' def populate(key, event_count) require "logstash/event" + require "redis" + require "stud/try" redis = Redis.new(:host => "localhost") event_count.times do |value| event = LogStash::Event.new("sequence" => value) @@ -65,153 +65,130 @@ def process(conf, event_count) end end -# unit tests --------------------- - describe LogStash::Inputs::Redis do - let(:redis) { double('redis') } - let(:builder) { ->{ redis } } - let(:connection) { double('redis_connection') } - let(:connected) { [true] } + let(:queue) { Queue.new } + let(:data_type) { 'list' } let(:batch_count) { 1 } - let(:cfg) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} } + let(:config) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} } let(:quit_calls) { [:quit] } - let(:accumulator) { [] } - let(:command_map) { {} } subject do - LogStash::Plugin.lookup("input", "redis") - .new(cfg).add_external_redis_builder(builder) + LogStash::Inputs::Redis.new(config) end context 'construction' do it 'registers the input' do - expect {subject.register}.not_to raise_error + expect { subject.register }.not_to raise_error end end context 'renamed redis commands' do - let(:cfg) { - {'key' => 'foo', - 'data_type' => data_type, - 'command_map' => - { - 'blpop' => 'testblpop', - 'evalsha' => 'testevalsha', - 'lrange' => 'testlrange', - 'ltrim' => 'testltrim', - 'script' => 'testscript', - 'subscribe' => 'testsubscribe', - 'psubscribe' => 'testpsubscribe', - }, - 'batch_count' => 2 + let(:config) do + { + 'key' => 'foo', + 'data_type' => data_type, + 'command_map' => { + 'blpop' => 'testblpop', + 'evalsha' => 'testevalsha', + 'lrange' => 'testlrange', + 'ltrim' => 'testltrim', + 'script' => 'testscript', + 'subscribe' => 'testsubscribe', + 'psubscribe' => 'testpsubscribe', + }, + 'batch_count' => 2 } - } - - before do - subject.register - allow(redis).to receive(:connected?) - allow(redis).to receive(:client).and_return(connection) - allow(connection).to receive(:command_map).and_return(command_map) end it 'sets the renamed commands in the command map' do - allow(redis).to receive(:script) - allow(redis).to receive(:evalsha).and_return([]) - - tt = Thread.new do - sleep 0.01 - subject.do_stop + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :script + expect(command[1]).to eql 'load' end - subject.run(accumulator) - tt.join + subject.register + redis = subject.send :connect + + command_map = redis._client.command_map - expect(command_map[:blpop]).to eq cfg['command_map']['blpop'].to_sym - expect(command_map[:evalsha]).to eq cfg['command_map']['evalsha'].to_sym - expect(command_map[:lrange]).to eq cfg['command_map']['lrange'].to_sym - expect(command_map[:ltrim]).to eq cfg['command_map']['ltrim'].to_sym - expect(command_map[:script]).to eq cfg['command_map']['script'].to_sym - expect(command_map[:subscribe]).to eq cfg['command_map']['subscribe'].to_sym - expect(command_map[:psubscribe]).to eq cfg['command_map']['psubscribe'].to_sym + expect(command_map[:blpop]).to eq config['command_map']['blpop'].to_sym + expect(command_map[:evalsha]).to eq config['command_map']['evalsha'].to_sym + expect(command_map[:lrange]).to eq config['command_map']['lrange'].to_sym + expect(command_map[:ltrim]).to eq config['command_map']['ltrim'].to_sym + expect(command_map[:script]).to eq config['command_map']['script'].to_sym + expect(command_map[:subscribe]).to eq config['command_map']['subscribe'].to_sym + expect(command_map[:psubscribe]).to eq config['command_map']['psubscribe'].to_sym end it 'loads the batch script with the renamed command' do - capture = nil - allow(redis).to receive(:script) { |load, lua_script| capture = lua_script } - allow(redis).to receive(:evalsha).and_return([]) + expect_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :script + expect(command[1]).to eql 'load' - tt = Thread.new do - sleep 0.01 - subject.do_stop + script = command[2] + expect(script).to include "redis.call('#{config['command_map']['lrange']}', KEYS[1], 0, batchsize)" + expect(script).to include "redis.call('#{config['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)" end - subject.run(accumulator) - tt.join - - expect(capture).to include "redis.call('#{cfg['command_map']['lrange']}', KEYS[1], 0, batchsize)" - expect(capture).to include "redis.call('#{cfg['command_map']['ltrim']}', KEYS[1], batchsize + 1, -1)" + subject.register + subject.send :connect end end - context 'runtime for list data_type' do + before do subject.register + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true + allow_any_instance_of( Redis::Client ).to receive(:disconnect) + allow_any_instance_of( Redis ).to receive(:quit) + end + + after do + subject.stop end context 'close when redis is unset' do - let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } it 'does not attempt to quit' do - allow(redis).to receive(:nil?).and_return(true) - quit_calls.each do |call| - expect(redis).not_to receive(call) - end - expect {subject.do_stop}.not_to raise_error + expect_any_instance_of( Redis::Client ).to_not receive(:call) + expect_any_instance_of( Redis::Client ).to_not receive(:disconnect) + + expect { subject.do_stop }.not_to raise_error end end it 'calling the run method, adds events to the queue' do - expect(redis).to receive(:blpop).at_least(:once).and_return(['foo', 'l1']) - - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:quit) - - tt = Thread.new do - sleep 0.01 - subject.do_stop + allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block| + expect(command[0]).to eql :blpop + expect(command[1]).to eql ['foo', 0] + expect(command[2]).to eql 1 + end.and_return ['foo', "{\"foo1\":\"bar\""], nil + + thread = Thread.new do + subject.run(queue) end + thread.join(1.0) - subject.run(accumulator) - - tt.join - - expect(accumulator.size).to be > 0 + expect( queue.size ).to be > 0 end context "when the batch size is greater than 1" do let(:batch_count) { 10 } - let(:rates) { [] } - - before do - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:script) - allow(redis).to receive(:quit) - end it 'calling the run method, adds events to the queue' do - expect(redis).to receive(:evalsha).at_least(:once).and_return(['a', 'b']) + allow_any_instance_of( Redis ).to receive(:script) + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :evalsha + end.and_return ['{"a": 1}', '{"b":'], [] - tt = Thread.new do - sleep 0.01 - subject.do_stop + thread = Thread.new do + subject.run(queue) end + thread.join(1.0) - subject.run(accumulator) - - tt.join - expect(accumulator.size).to be > 0 + expect( queue.size ).to be > 0 end end @@ -220,20 +197,18 @@ def process(conf, event_count) let(:rates) { [] } it 'will throttle the loop' do - allow(redis).to receive(:evalsha) do + allow_any_instance_of( Redis ).to receive(:script) + allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command| + expect(command[0]).to eql :evalsha rates.unshift Time.now.to_f - [] - end - allow(redis).to receive(:connected?).and_return(connected.last) - allow(redis).to receive(:script) - allow(redis).to receive(:quit) + end.and_return [] tt = Thread.new do sleep 1 subject.do_stop end - subject.run(accumulator) + subject.run(queue) tt.join @@ -242,7 +217,7 @@ def process(conf, event_count) inters << x - y end - expect(accumulator.size).to eq(0) + expect( queue.size ).to eq(0) inters.each do |delta| expect(delta).to be_within(0.01).of(LogStash::Inputs::Redis::BATCH_EMPTY_SLEEP) end @@ -250,16 +225,17 @@ def process(conf, event_count) end it 'multiple close calls, calls to redis once' do - subject.use_redis(redis) - allow(redis).to receive(:blpop).and_return(['foo', 'l1']) - expect(redis).to receive(:connected?).and_return(connected.last) + # subject.use_redis(redis) + # allow(redis).to receive(:blpop).and_return(['foo', 'l1']) + # expect(redis).to receive(:connected?).and_return(connected.last) + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false + # allow_any_instance_of( Redis::Client ).to receive(:disconnect) quit_calls.each do |call| - expect(redis).to receive(call).at_most(:once) + allow_any_instance_of( Redis ).to receive(call).at_most(:once) end subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -267,7 +243,7 @@ def process(conf, event_count) context 'for the subscribe data_types' do def run_it_thread(inst) Thread.new(inst) do |subj| - subj.run(accumulator) + subj.run(queue) end end @@ -283,35 +259,21 @@ def publish_thread(new_redis, prefix) def close_thread(inst, rt) Thread.new(inst, rt) do |subj, runner| # block for the messages - e1 = accumulator.pop - e2 = accumulator.pop + e1 = queue.pop + e2 = queue.pop # put em back for the tests - accumulator.push(e1) - accumulator.push(e2) + queue.push(e1) + queue.push(e2) runner.raise(LogStash::ShutdownSignal) subj.close end end - let(:accumulator) { Queue.new } - - let(:instance) do - inst = described_class.new(cfg) - inst.register - inst - end - before(:example, type: :mocked) do subject.register - subject.use_redis(redis) - allow(connection).to receive(:is_a?).and_return(true) - allow(redis).to receive(:client).and_return(connection) - expect(redis).to receive(:connected?).and_return(connected.last) - allow(connection).to receive(:unsubscribe) - allow(connection).to receive(:punsubscribe) - + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false quit_calls.each do |call| - expect(redis).to receive(call).at_most(:once) + allow_any_instance_of( Redis ).to receive(call).at_most(:once) end end @@ -322,8 +284,7 @@ def close_thread(inst, rt) context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -331,23 +292,23 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'c').join + publish_thread(subject.send(:new_redis_instance), 'c').join #simulate the pipeline thread - close_thread(instance, rt).join + close_thread(subject, rt).join - expect(accumulator.size).to eq(2) + expect(queue.size).to eq(2) end it 'events had redis_channel' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'c').join + publish_thread(subject.send(:new_redis_instance), 'c').join #simulate the pipeline thread - close_thread(instance, rt).join - e1 = accumulator.pop - e2 = accumulator.pop + close_thread(subject, rt).join + e1 = queue.pop + e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') expect(e2.get('[@metadata][redis_channel]')).to eq('foo') end @@ -361,8 +322,7 @@ def close_thread(inst, rt) context 'mocked redis' do it 'multiple stop calls, calls to redis once', type: :mocked do subject.do_stop - connected.push(false) #can't use let block here so push to array - expect {subject.do_stop}.not_to raise_error + expect { subject.do_stop }.not_to raise_error subject.do_stop end end @@ -370,23 +330,24 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'pc').join + publish_thread(subject.send(:new_redis_instance), 'pc').join #simulate the pipeline thread - close_thread(instance, rt).join + close_thread(subject, rt).join - expect(accumulator.size).to eq(2) + expect(queue.size).to eq(2) end + it 'events had redis_channel' do #simulate the input thread - rt = run_it_thread(instance) + rt = run_it_thread(subject) #simulate the other system thread - publish_thread(instance.new_redis_instance, 'pc').join + publish_thread(subject.send(:new_redis_instance), 'pc').join #simulate the pipeline thread - close_thread(instance, rt).join - e1 = accumulator.pop - e2 = accumulator.pop + close_thread(subject, rt).join + e1 = queue.pop + e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') expect(e2.get('[@metadata][redis_channel]')).to eq('foo') end @@ -394,15 +355,23 @@ def close_thread(inst, rt) end end - describe LogStash::Inputs::Redis do - context "when using data type" do - ["list", "channel", "pattern_channel"].each do |data_type| - context data_type do - it_behaves_like "an interruptible input plugin" do - let(:config) { {'key' => 'foo', 'data_type' => data_type } } - end + context "when using data type" do + + # before do + # allow_any_instance_of( Redis ).to receive(:evalsha).and_return [] + # allow_any_instance_of( Redis ).to receive(:script) + # allow_any_instance_of( Redis ).to receive(:psubscribe) + # allow_any_instance_of( Redis ).to receive(:subscribe) + # allow_any_instance_of( Redis ).to receive(:quit) + # end + + ["list", "channel", "pattern_channel"].each do |data_type| + context data_type do + it_behaves_like "an interruptible input plugin", :redis => true do + let(:config) { { 'key' => 'foo', 'data_type' => data_type } } end end end + end end From c2424cf4e9e8709c55dc53d322ecb73c69f9c0c1 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 6 Apr 2021 19:49:48 +0200 Subject: [PATCH 3/6] Refactor: cleanup plugin (internals) --- lib/logstash/inputs/redis.rb | 26 +++++--------------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index a82935f..c92c170 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -61,28 +61,10 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable config :command_map, :validate => :hash, :default => {} public - # public API - # use to store a proc that can provide a Redis instance or mock - def add_external_redis_builder(builder) #callable - @redis_builder = builder - self - end - - # use to apply an instance directly and bypass the builder - def use_redis(instance) - @redis = instance - self - end - - def new_redis_instance - @redis_builder.call - end def register @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" - @redis_builder ||= method(:internal_redis_builder) - # just switch on data_type once if @data_type == 'list' || @data_type == 'dummy' @run_method = method(:list_runner) @@ -147,8 +129,7 @@ def redis_params return connectionParams.merge(baseParams) end - # private - def internal_redis_builder + def new_redis_instance ::Redis.new(redis_params) end @@ -162,6 +143,7 @@ def connect end load_batch_script(redis) if batched? && is_list_type? + redis end # def connect @@ -205,7 +187,9 @@ def list_runner(output_queue) @redis ||= connect @list_method.call(@redis, output_queue) rescue ::Redis::BaseError => e - @logger.warn("Redis connection problem", :exception => e) + info = { message: e.message, exception: e.class } + info[:backtrace] = e.backtrace if @logger.debug? + @logger.warn("Redis connection problem", info) # Reset the redis variable to trigger reconnect @redis = nil # this sleep does not need to be stoppable as its From 94eb0e00b73a8996a3d4ebd391743633926c0140 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 6 Apr 2021 20:09:47 +0200 Subject: [PATCH 4/6] Test: change back rspec mocks to be on same thread --- spec/inputs/redis_spec.rb | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 86e2652..7b52644 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -166,10 +166,14 @@ def process(conf, event_count) expect(command[2]).to eql 1 end.and_return ['foo', "{\"foo1\":\"bar\""], nil - thread = Thread.new do - subject.run(queue) + tt = Thread.new do + sleep 0.25 + subject.do_stop end - thread.join(1.0) + + subject.run(queue) + + tt.join expect( queue.size ).to be > 0 end @@ -183,10 +187,14 @@ def process(conf, event_count) expect(command[0]).to eql :evalsha end.and_return ['{"a": 1}', '{"b":'], [] - thread = Thread.new do - subject.run(queue) + tt = Thread.new do + sleep 0.25 + subject.do_stop end - thread.join(1.0) + + subject.run(queue) + + tt.join expect( queue.size ).to be > 0 end @@ -204,7 +212,7 @@ def process(conf, event_count) end.and_return [] tt = Thread.new do - sleep 1 + sleep 0.25 subject.do_stop end From 1f2ba3e653b94befb89314223ed010ccb6de425f Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 6 Apr 2021 20:19:00 +0200 Subject: [PATCH 5/6] Bump + changelog --- CHANGELOG.md | 3 +++ logstash-input-redis.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1677336..81972ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.6.1 + - Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86) + ## 3.6.0 - Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#84](https://github.com/logstash-plugins/logstash-input-redis/pull/84) diff --git a/logstash-input-redis.gemspec b/logstash-input-redis.gemspec index 99701ad..030338f 100755 --- a/logstash-input-redis.gemspec +++ b/logstash-input-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-redis' - s.version = '3.6.0' + s.version = '3.6.1' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from a Redis instance" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 63602de9c4f1ecaca34c79a25650abadf0cd1d9d Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 7 Apr 2021 13:35:20 +0200 Subject: [PATCH 6/6] Update spec/inputs/redis_spec.rb Co-authored-by: Ry Biesemeyer --- spec/inputs/redis_spec.rb | 8 -------- 1 file changed, 8 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 7b52644..45a6318 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -365,14 +365,6 @@ def close_thread(inst, rt) context "when using data type" do - # before do - # allow_any_instance_of( Redis ).to receive(:evalsha).and_return [] - # allow_any_instance_of( Redis ).to receive(:script) - # allow_any_instance_of( Redis ).to receive(:psubscribe) - # allow_any_instance_of( Redis ).to receive(:subscribe) - # allow_any_instance_of( Redis ).to receive(:quit) - # end - ["list", "channel", "pattern_channel"].each do |data_type| context data_type do it_behaves_like "an interruptible input plugin", :redis => true do