Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trigger input plugin shutdown using out of band call #3812

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
def initialize(params={})
super
@threadable = false
@stop_called = Concurrent::AtomicBoolean.new(false)
config_init(params)
@tags ||= []

Expand Down Expand Up @@ -100,6 +101,20 @@ def tag(newtag)
@tags << newtag
end # def tag

# if you override stop, don't forget to call super
# as the first action
public
def stop
@logger.debug("stopping", :plugin => self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: maybe log "stop called" instead of "stopping" it seems to better represent the reality?

@stop_called.make_true
end

# stop? should never be overriden
public
def stop?
@stop_called.value
end

protected
def to_event(raw, source)
raise LogStash::ThisMethodWasRemoved("LogStash::Inputs::Base#to_event - you should use codecs now instead of to_event. Not sure what this means? Get help on https://discuss.elastic.co/c/logstash")
Expand Down
57 changes: 18 additions & 39 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,16 @@ def inputworker(plugin)
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
begin
plugin.run(@input_to_filter)
rescue LogStash::ShutdownSignal
# ignore and quit
rescue => e
# if plugin is stopping, ignore uncatched exceptions and exit worker
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
:plugin => plugin.class.config_name, :exception => e,
:backtrace => e.backtrace)
return
end

# otherwise, report error and restart
if @logger.debug?
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
:plugin => plugin.inspect, :error => e.to_s,
Expand All @@ -187,23 +194,13 @@ def inputworker(plugin)
@logger.error(I18n.t("logstash.pipeline.worker-error",
:plugin => plugin.inspect, :error => e))
end
puts e.backtrace if @logger.debug?
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{plugin.teardown}
sleep 1
retry
end
ensure
begin
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{plugin.teardown}
rescue LogStash::ShutdownSignal
# teardown could receive the ShutdownSignal, retry it

# Assuming the failure that caused this exception is transient,
# let's sleep for a bit and execute #run again
sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the previous code's sleep() didn't have a comment, but let's make a comment here explaining why we are sleeping.

retry
ensure
plugin.teardown
end
end # def inputworker

Expand Down Expand Up @@ -242,8 +239,8 @@ def outputworker
event = @filter_to_output.pop
break if event == LogStash::SHUTDOWN
output_func(event)
end # while true

end
ensure
@outputs.each do |output|
output.worker_plugins.each(&:teardown)
end
Expand All @@ -255,37 +252,19 @@ def outputworker
def shutdown
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs)
@input_threads.each do |thread|
# Interrupt all inputs
@logger.info("Sending shutdown signal to input thread", :thread => thread)

# synchronize both ShutdownSignal and teardown below. by synchronizing both
# we will avoid potentially sending a shutdown signal when the inputworker is
# executing the teardown method.
@run_mutex.synchronize do
thread.raise(LogStash::ShutdownSignal)
begin
thread.wakeup # in case it's in blocked IO or sleeping
rescue ThreadError
end
end
end

# sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly
@inputs.each do |input|
begin
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{input.teardown}
@run_mutex.synchronize{input.stop}
rescue LogStash::ShutdownSignal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the @run_mutex should not be necessary here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it still necessary anywhere? I was trying to find out why it was there in the first place, but couldn't find a reason.
I know it's supposed to isolate the "start input plugins" phase from a shutdown, but I couldn't see the need for the isolation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after re-analyzing the @run_mutex related code it is useful for 2 things:

  • consistent view of the @ready and @started properties which need to be consistent across threads since the related ready? and started? methods will be typically called from another thread that the one executing the run method
  • to avoid calling the shutdown code before the startup is completed

I believe the ready? and started? methods could be implemented using atomic booleans and for the prevention startup and shutdown race condition it should be refactored in a better way. It really isn't clear now.

I will open another issue for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to #3886

# teardown could receive the ShutdownSignal, retry it
retry
end
end

# No need to send the ShutdownEvent to the filters/outputs nor to wait for
# the inputs to finish, because in the #run method we wait for that anyway.
end # def shutdown

def plugin(plugin_type, name, *args)
Expand Down
63 changes: 3 additions & 60 deletions lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/logging"
require "logstash/config/mixin"
require "cabin"
require "concurrent"

class LogStash::Plugin
attr_accessor :params
Expand All @@ -27,72 +28,14 @@ def initialize(params=nil)
@logger = Cabin::Channel.get(LogStash)
end

# This method is called when someone or something wants this plugin to shut
# down. When you successfully shutdown, you must call 'finished'
# You must also call 'super' in any subclasses.
public
def shutdown(queue)
# By default, shutdown is assumed a no-op for all plugins.
# If you need to take special efforts to shutdown (like waiting for
# an operation to complete, etc)
teardown
@logger.info("Received shutdown signal", :plugin => self)

@shutdown_queue = queue
if @plugin_state == :finished
finished
else
@plugin_state = :terminating
end
end # def shutdown

# You should call this method when you (the plugin) are done with work
# forever.
public
def finished
# TODO(sissel): I'm not sure what I had planned for this shutdown_queue
# thing
if @shutdown_queue
@logger.info("Sending shutdown event to agent queue", :plugin => self)
@shutdown_queue << self
end

if @plugin_state != :finished
@logger.info("Plugin is finished", :plugin => self)
@plugin_state = :finished
end
end # def finished

# Subclasses should implement this teardown method if you need to perform any
# special tasks during shutdown (like flushing, etc.)
# if you override teardown, don't forget to call super
public
def teardown
# nothing by default
finished
@logger.debug("closing", :plugin => self)
end

# This method is called when a SIGHUP triggers a reload operation
public
def reload
# Do nothing by default
end

public
def finished?
return @plugin_state == :finished
end # def finished?

public
def running?
return @plugin_state != :finished
end # def finished?

public
def terminating?
return @plugin_state == :terminating
end # def terminating?

public
def to_s
return "#{self.class.name}: #{@params}"
end
Expand Down
1 change: 1 addition & 0 deletions logstash-core.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "clamp", "~> 0.6.5" #(MIT license) for command line args/flags
gem.add_runtime_dependency "filesize", "0.0.4" #(MIT license) for :bytes config validator
gem.add_runtime_dependency "gems", "~> 0.8.3" #(MIT license)
gem.add_runtime_dependency "concurrent-ruby", "~> 0.9.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty for fixing a specific version. +1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well actually I'm saying anything 0.9.*

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I meant that :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect 1.0 to break a lot of stuff.


# TODO(sissel): Treetop 1.5.x doesn't seem to work well, but I haven't
# investigated what the cause might be. -Jordan
Expand Down