diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index f28d04e0e98..bde8d3bff4d 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -67,6 +67,7 @@ class LogStash::Inputs::Base < LogStash::Plugin def initialize(params={}) super @threadable = false + @stop_called = Concurrent::AtomicBoolean.new(false) config_init(params) @tags ||= [] @@ -100,6 +101,20 @@ def tag(newtag) @tags << newtag end # def tag + # if you override stop, don't forget to call super + # as the first action + public + def stop + @logger.debug("stopping", :plugin => self) + @stop_called.make_true + end + + # stop? should never be overriden + public + def stop? + @stop_called.value + end + protected def to_event(raw, source) raise LogStash::ThisMethodWasRemoved("LogStash::Inputs::Base#to_event - you should use codecs now instead of to_event. Not sure what this means? Get help on https://discuss.elastic.co/c/logstash") diff --git a/lib/logstash/pipeline.rb b/lib/logstash/pipeline.rb index 0089e3f1688..fa1a806cd61 100644 --- a/lib/logstash/pipeline.rb +++ b/lib/logstash/pipeline.rb @@ -175,9 +175,16 @@ def inputworker(plugin) LogStash::Util::set_thread_name("<#{plugin.class.config_name}") begin plugin.run(@input_to_filter) - rescue LogStash::ShutdownSignal - # ignore and quit rescue => e + # if plugin is stopping, ignore uncatched exceptions and exit worker + if plugin.stop? + @logger.debug("Input plugin raised exception during shutdown, ignoring it.", + :plugin => plugin.class.config_name, :exception => e, + :backtrace => e.backtrace) + return + end + + # otherwise, report error and restart if @logger.debug? @logger.error(I18n.t("logstash.pipeline.worker-error-debug", :plugin => plugin.inspect, :error => e.to_s, @@ -187,23 +194,13 @@ def inputworker(plugin) @logger.error(I18n.t("logstash.pipeline.worker-error", :plugin => plugin.inspect, :error => e)) end - puts e.backtrace if @logger.debug? - # input teardown must be synchronized since is can be called concurrently by - # the input worker thread and from the pipeline thread shutdown method. - # this means that input teardown methods must support multiple calls. - @run_mutex.synchronize{plugin.teardown} - sleep 1 - retry - end - ensure - begin - # input teardown must be synchronized since is can be called concurrently by - # the input worker thread and from the pipeline thread shutdown method. - # this means that input teardown methods must support multiple calls. - @run_mutex.synchronize{plugin.teardown} - rescue LogStash::ShutdownSignal - # teardown could receive the ShutdownSignal, retry it + + # Assuming the failure that caused this exception is transient, + # let's sleep for a bit and execute #run again + sleep(1) retry + ensure + plugin.teardown end end # def inputworker @@ -242,8 +239,8 @@ def outputworker event = @filter_to_output.pop break if event == LogStash::SHUTDOWN output_func(event) - end # while true - + end + ensure @outputs.each do |output| output.worker_plugins.each(&:teardown) end @@ -255,21 +252,6 @@ def outputworker def shutdown InflightEventsReporter.logger = @logger InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs) - @input_threads.each do |thread| - # Interrupt all inputs - @logger.info("Sending shutdown signal to input thread", :thread => thread) - - # synchronize both ShutdownSignal and teardown below. by synchronizing both - # we will avoid potentially sending a shutdown signal when the inputworker is - # executing the teardown method. - @run_mutex.synchronize do - thread.raise(LogStash::ShutdownSignal) - begin - thread.wakeup # in case it's in blocked IO or sleeping - rescue ThreadError - end - end - end # sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly @inputs.each do |input| @@ -277,15 +259,12 @@ def shutdown # input teardown must be synchronized since is can be called concurrently by # the input worker thread and from the pipeline thread shutdown method. # this means that input teardown methods must support multiple calls. - @run_mutex.synchronize{input.teardown} + @run_mutex.synchronize{input.stop} rescue LogStash::ShutdownSignal # teardown could receive the ShutdownSignal, retry it retry end end - - # No need to send the ShutdownEvent to the filters/outputs nor to wait for - # the inputs to finish, because in the #run method we wait for that anyway. end # def shutdown def plugin(plugin_type, name, *args) diff --git a/lib/logstash/plugin.rb b/lib/logstash/plugin.rb index 76d3eeb43fb..1949fa6f277 100644 --- a/lib/logstash/plugin.rb +++ b/lib/logstash/plugin.rb @@ -3,6 +3,7 @@ require "logstash/logging" require "logstash/config/mixin" require "cabin" +require "concurrent" class LogStash::Plugin attr_accessor :params @@ -27,72 +28,14 @@ def initialize(params=nil) @logger = Cabin::Channel.get(LogStash) end - # This method is called when someone or something wants this plugin to shut - # down. When you successfully shutdown, you must call 'finished' - # You must also call 'super' in any subclasses. - public - def shutdown(queue) - # By default, shutdown is assumed a no-op for all plugins. - # If you need to take special efforts to shutdown (like waiting for - # an operation to complete, etc) - teardown - @logger.info("Received shutdown signal", :plugin => self) - - @shutdown_queue = queue - if @plugin_state == :finished - finished - else - @plugin_state = :terminating - end - end # def shutdown - - # You should call this method when you (the plugin) are done with work - # forever. - public - def finished - # TODO(sissel): I'm not sure what I had planned for this shutdown_queue - # thing - if @shutdown_queue - @logger.info("Sending shutdown event to agent queue", :plugin => self) - @shutdown_queue << self - end - - if @plugin_state != :finished - @logger.info("Plugin is finished", :plugin => self) - @plugin_state = :finished - end - end # def finished - # Subclasses should implement this teardown method if you need to perform any # special tasks during shutdown (like flushing, etc.) + # if you override teardown, don't forget to call super public def teardown - # nothing by default - finished + @logger.debug("closing", :plugin => self) end - # This method is called when a SIGHUP triggers a reload operation - public - def reload - # Do nothing by default - end - - public - def finished? - return @plugin_state == :finished - end # def finished? - - public - def running? - return @plugin_state != :finished - end # def finished? - - public - def terminating? - return @plugin_state == :terminating - end # def terminating? - - public def to_s return "#{self.class.name}: #{@params}" end diff --git a/logstash-core.gemspec b/logstash-core.gemspec index 2df64952a60..709218163d6 100644 --- a/logstash-core.gemspec +++ b/logstash-core.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "clamp", "~> 0.6.5" #(MIT license) for command line args/flags gem.add_runtime_dependency "filesize", "0.0.4" #(MIT license) for :bytes config validator gem.add_runtime_dependency "gems", "~> 0.8.3" #(MIT license) + gem.add_runtime_dependency "concurrent-ruby", "~> 0.9.1" # TODO(sissel): Treetop 1.5.x doesn't seem to work well, but I haven't # investigated what the cause might be. -Jordan