From 70ad737aa3823ffa70229e56d9decefe9b8c1454 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 29 Jun 2021 18:28:48 +0200 Subject: [PATCH 1/5] Fix: make sure we use a new codec on restart --- lib/logstash/inputs/file.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index f1e7405..5d3eb87 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -325,7 +325,7 @@ def register else @watcher_class = FileWatch::ObservingRead end - @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) + set_new_identity_map_codec!(@codec) @completely_stopped = Concurrent::AtomicBoolean.new @queue = Concurrent::AtomicReference.new @@ -350,6 +350,7 @@ def start_processing stop @watcher = @watcher_class.new(@filewatch_config) + set_new_identity_map_codec!(@codec.base_codec) @completed_file_handlers = [] if read_mode? @@ -408,6 +409,10 @@ def queue private + def set_new_identity_map_codec!(base_codec) + @codec = LogStash::Codecs::IdentityMapCodec.new(base_codec) + end + def build_sincedb_base_from_settings(settings) logstash_data_path = settings.get_value("path.data") Pathname.new(logstash_data_path).join("plugins", "inputs", "file").tap do |path| From 63eba5dd6878dab8fb66407c3ec43506f00500ed Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 29 Jun 2021 18:49:37 +0200 Subject: [PATCH 2/5] Refactor: stop calling plugin.close and make sure codec is always fresh (on restart) --- lib/logstash/inputs/file.rb | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 5d3eb87..6a87a83 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -325,7 +325,7 @@ def register else @watcher_class = FileWatch::ObservingRead end - set_new_identity_map_codec!(@codec) + @base_codec = @codec # TODO is there a way register would be called after run (on restart?) @completely_stopped = Concurrent::AtomicBoolean.new @queue = Concurrent::AtomicReference.new @@ -345,12 +345,12 @@ def listener_for(path) end def start_processing - # if the pipeline restarts this input, - # make sure previous files are closed - stop + # if the pipeline restarts this input, make sure previous files are closed + quit_watcher + @codec.close if @codec.is_a?(LogStash::Codecs::IdentityMapCodec) @watcher = @watcher_class.new(@filewatch_config) - set_new_identity_map_codec!(@codec.base_codec) + @codec = LogStash::Codecs::IdentityMapCodec.new(@base_codec) @completed_file_handlers = [] if read_mode? @@ -395,11 +395,11 @@ def log_line_received(path, line) @logger.debug? && @logger.debug("Received line", :path => path, :text => line) end + # @override LogStash::Inputs::Base#stop def stop - unless @watcher.nil? - @codec.close - @watcher.quit - end + @logger.trace? && @logger.trace(__method__.to_s, @path) + quit_watcher + @codec.close if @codec end # @private used in specs @@ -409,8 +409,8 @@ def queue private - def set_new_identity_map_codec!(base_codec) - @codec = LogStash::Codecs::IdentityMapCodec.new(base_codec) + def quit_watcher + @watcher.quit if @watcher end def build_sincedb_base_from_settings(settings) From 859f86fe30d3d006be2a0b676542ae5a54279ccd Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 29 Jun 2021 18:50:06 +0200 Subject: [PATCH 3/5] Refactor: minor logging improvements --- lib/logstash/inputs/file.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 6a87a83..c87cc72 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -387,7 +387,7 @@ def post_process_this(event, path) def handle_deletable_path(path) return if tail_mode? return if @completed_file_handlers.empty? - @logger.debug? && @logger.debug(__method__.to_s, :path => path) + @logger.trace? && @logger.trace(__method__.to_s, :path => path) @completed_file_handlers.each { |handler| handler.handle(path) } end @@ -428,7 +428,7 @@ def attempt_set(event, field_reference, value) event.set(field_reference, value) rescue => e - logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.message) + logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.class, :message => e.message) false end @@ -461,6 +461,7 @@ def read_mode? end def exit_flush + @logger.trace? && @logger.trace(__method__.to_s, @path) listener = FlushableListener.new("none", self) if @codec.identity_count.zero? # using the base codec without identity/path info @@ -468,7 +469,7 @@ def exit_flush begin listener.process_event(event) rescue => e - @logger.error("File Input: flush on exit downstream error", :exception => e) + @logger.error("flush on exit downstream error", :exception => e.class, :message => e.message) end end else From 4e10942f0cb5a1adf8e26b68126fe26cd72d69e3 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Jun 2021 08:18:43 +0200 Subject: [PATCH 4/5] Test: wait for run to setup identity codec --- spec/inputs/file_tail_spec.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/inputs/file_tail_spec.rb b/spec/inputs/file_tail_spec.rb index fecb904..ca3905d 100644 --- a/spec/inputs/file_tail_spec.rb +++ b/spec/inputs/file_tail_spec.rb @@ -391,7 +391,7 @@ end end .then("assert both files are mapped as identities and stop") do - wait(2).for {subject.codec.identity_count}.to eq(2), "both files are not mapped as identities" + wait(2).for { subject.codec.identity_count }.to eq(2), "both files are not mapped as identities" end .then("stop") do subject.stop @@ -473,13 +473,13 @@ it "collects line events from only one file" do actions = RSpec::Sequencing .run("assert one identity is mapped") do - wait(0.4).for{subject.codec.identity_count}.to be > 0, "no identity is mapped" + wait(0.4).for{ subject.codec.respond_to?(:identity_count) ? subject.codec.identity_count : 0 }.to be > 0, "no identity is mapped" end .then("stop") do subject.stop end .then("stop flushes last event") do - wait(0.4).for{events.size}.to eq(2), "events size does not equal 2" + wait(0.4).for{ events.size }.to eq(2), "events size does not equal 2" end subject.run(events) # wait for actions future value @@ -508,7 +508,7 @@ it "collects line events from both files" do actions = RSpec::Sequencing .run("assert both identities are mapped and the first two events are built") do - wait(0.4).for{subject.codec.identity_count == 1 && events.size == 2}.to eq(true), "both identities are not mapped and the first two events are not built" + wait(0.4).for{ events.size == 2 && subject.codec.identity_count == 1 }.to eq(true), "both identities are not mapped and the first two events are not built" end .then("wait for close to flush last event of each identity") do wait(0.8).for{events.size}.to eq(4), "close does not flush last event of each identity" From b3f27549e629a0ac476d7933630e40bf3c7f3c81 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 1 Jul 2021 11:24:29 +0200 Subject: [PATCH 5/5] Build: remove condition around runtime dependency --- logstash-input-file.gemspec | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index b5cf6c3..a478bf1 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -23,13 +23,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-plain' - - if RUBY_VERSION.start_with?("1") - s.add_runtime_dependency 'rake', '~> 12.2.0' - s.add_runtime_dependency 'addressable', '~> 2.4.0' - else - s.add_runtime_dependency 'addressable' - end + s.add_runtime_dependency 'addressable' s.add_runtime_dependency 'concurrent-ruby', '~> 1.0' s.add_runtime_dependency 'logstash-codec-multiline', ['~> 3.0']