diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index eaac32f2f7..0dc30c691a 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -46,6 +46,8 @@ def initialize @system_config = SystemConfig.new end + MAINLOOP_SLEEP_INTERVAL = 0.3 + MATCH_CACHE_SIZE = 1024 LOG_EMIT_INTERVAL = 0.1 @@ -177,38 +179,24 @@ def run @log_emit_thread = Thread.new(&method(:log_event_loop)) end - unless @engine_stopped - # for empty loop - @default_loop = Coolio::Loop.default - @default_loop.attach Coolio::TimerWatcher.new(1, true) - # TODO attach async watch for thread pool - @default_loop.run - end - - if @engine_stopped and @default_loop - @default_loop.stop - @default_loop = nil - end + sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped - rescue => e + rescue Exception => e $log.error "unexpected error", error: e $log.error_backtrace - ensure - $log.info "shutting down fluentd" - shutdown - if @log_emit_thread - @log_event_loop_stop = true - @log_emit_thread.join - end + raise + end + + $log.info "shutting down fluentd" + shutdown + if @log_emit_thread + @log_event_loop_stop = true + @log_emit_thread.join end end def stop @engine_stopped = true - if @default_loop - @default_loop.stop - @default_loop = nil - end nil end diff --git a/lib/fluent/plugin/in_debug_agent.rb b/lib/fluent/plugin/in_debug_agent.rb index cfb196298b..1cce08421d 100644 --- a/lib/fluent/plugin/in_debug_agent.rb +++ b/lib/fluent/plugin/in_debug_agent.rb @@ -42,6 +42,8 @@ def configure(conf) end def start + super + if @unix_path require 'drb/unix' uri = "drbunix:#{@unix_path}" @@ -55,6 +57,8 @@ def start def shutdown @server.stop_service if @server + + super end end end diff --git a/lib/fluent/plugin/in_dummy.rb b/lib/fluent/plugin/in_dummy.rb index 576b6df466..914b571ae9 100644 --- a/lib/fluent/plugin/in_dummy.rb +++ b/lib/fluent/plugin/in_dummy.rb @@ -64,6 +64,7 @@ def start def shutdown @running = false @thread.join + super end def run diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb index 00b8b4a12e..295b6ce864 100644 --- a/lib/fluent/plugin/in_exec.rb +++ b/lib/fluent/plugin/in_exec.rb @@ -103,6 +103,8 @@ def setup_parser(conf) end def start + super + if @run_interval @finished = false @thread = Thread.new(&method(:run_periodic)) @@ -134,6 +136,8 @@ def shutdown end @thread.join end + + super end def run diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 928957d667..7a2b6b8e64 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -55,6 +55,8 @@ def configure(conf) end def start + super + @loop = Coolio::Loop.new socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] @@ -89,6 +91,8 @@ def shutdown @usock.close @thread.join @lsock.close + + super end def listen(client) diff --git a/lib/fluent/plugin/in_gc_stat.rb b/lib/fluent/plugin/in_gc_stat.rb index 22690f3e7e..3bae0035b3 100644 --- a/lib/fluent/plugin/in_gc_stat.rb +++ b/lib/fluent/plugin/in_gc_stat.rb @@ -50,6 +50,8 @@ def configure(conf) end def start + super + @loop = Coolio::Loop.new @timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer)) @loop.attach(@timer) @@ -60,6 +62,8 @@ def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @thread.join + + super end def run diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index dc33f11d4f..a9a11777b9 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -112,7 +112,6 @@ def start detach_multi_process do super @km = KeepaliveManager.new(@keepalive_timeout) - #@lsock = Coolio::TCPServer.new(@bind, @port, Handler, @km, method(:on_request), @body_size_limit) @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit, @format, log, @cors_allow_origins) @@ -131,6 +130,8 @@ def shutdown @loop.stop @lsock.close @thread.join + + super end def run diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index 68d10d9be5..922692c5ff 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -235,6 +235,8 @@ def on_timer end def start + super + log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins" @srv = WEBrick::HTTPServer.new({ BindAddress: @bind, @@ -290,6 +292,8 @@ def shutdown @thread_for_emit.join @thread_for_emit = nil end + + super end MONITOR_INFO = { diff --git a/lib/fluent/plugin/in_object_space.rb b/lib/fluent/plugin/in_object_space.rb index cdfd51df0d..a59e2a42a7 100644 --- a/lib/fluent/plugin/in_object_space.rb +++ b/lib/fluent/plugin/in_object_space.rb @@ -52,6 +52,8 @@ def configure(conf) end def start + super + @loop = Coolio::Loop.new @timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer)) @loop.attach(@timer) @@ -62,6 +64,8 @@ def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @thread.join + + super end def run diff --git a/lib/fluent/plugin/in_stream.rb b/lib/fluent/plugin/in_stream.rb index d4ec626db6..3138b6caa7 100644 --- a/lib/fluent/plugin/in_stream.rb +++ b/lib/fluent/plugin/in_stream.rb @@ -35,6 +35,8 @@ def initialize end def start + super + @loop = Coolio::Loop.new @lsock = listen @loop.attach(@lsock) @@ -46,6 +48,8 @@ def shutdown @loop.stop @lsock.close @thread.join + + super end #def listen diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 24e1b96f59..9ad122d1b7 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -110,6 +110,8 @@ def configure(conf) end def start + super + callback = if @use_default method(:receive_data) else @@ -128,6 +130,8 @@ def shutdown @loop.stop @handler.close @thread.join + + super end def run diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 632a25af47..ef920c081d 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -118,6 +118,8 @@ def configure_tag end def start + super + if @pos_file @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true @@ -139,6 +141,8 @@ def shutdown @loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception. @thread.join @pf_file.close if @pf_file + + super end def expand_paths diff --git a/lib/fluent/plugin/out_copy.rb b/lib/fluent/plugin/out_copy.rb index c26804b218..c2f6bc146f 100644 --- a/lib/fluent/plugin/out_copy.rb +++ b/lib/fluent/plugin/out_copy.rb @@ -51,15 +51,19 @@ def configure(conf) end def start - @outputs.each {|o| - o.start - } + super + + @outputs.each do |o| + o.start unless o.started? + end end def shutdown - @outputs.each {|o| - o.shutdown - } + @outputs.each do |o| + o.shutdown unless o.shutdown? + end + + super end def emit(tag, es, chain) diff --git a/lib/fluent/plugin/out_exec_filter.rb b/lib/fluent/plugin/out_exec_filter.rb index b8b3fb41ac..81df9b61f5 100644 --- a/lib/fluent/plugin/out_exec_filter.rb +++ b/lib/fluent/plugin/out_exec_filter.rb @@ -220,21 +220,22 @@ def start end def before_shutdown - super log.debug "out_exec_filter#before_shutdown called" @children.each {|c| c.finished = true } sleep 0.5 # TODO wait time before killing child process - end - def shutdown super + end + def shutdown @children.reject! {|c| c.shutdown true } + + super end def format_stream(tag, es) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 0c77812e47..e703c7f468 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -193,6 +193,8 @@ def shutdown end @thread.join if @thread @usock.close if @usock + + super end def run diff --git a/lib/fluent/plugin/out_null.rb b/lib/fluent/plugin/out_null.rb index 032f683a99..6d5bed1d36 100644 --- a/lib/fluent/plugin/out_null.rb +++ b/lib/fluent/plugin/out_null.rb @@ -20,20 +20,6 @@ module Fluent class NullOutput < Output Plugin.register_output('null', self) - def initialize - super - end - - def configure(conf) - super - end - - def start - end - - def shutdown - end - def emit(tag, es, chain) chain.next end diff --git a/lib/fluent/plugin/out_roundrobin.rb b/lib/fluent/plugin/out_roundrobin.rb index a17195ea21..200ec1f49f 100644 --- a/lib/fluent/plugin/out_roundrobin.rb +++ b/lib/fluent/plugin/out_roundrobin.rb @@ -57,17 +57,21 @@ def configure(conf) end def start + super + rebuild_weight_array - @outputs.each {|o| - o.start - } + @outputs.each do |o| + o.start unless o.started? + end end def shutdown - @outputs.each {|o| - o.shutdown - } + @outputs.each do |o| + o.shutdown unless o.shutdown? + end + + super end def emit(tag, es, chain) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 088b537b3f..19c46ce35f 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -360,13 +360,13 @@ def start end def stop - super @secondary.stop if @secondary @buffer.stop if @buffering && @buffer + + super end def before_shutdown - super @secondary.before_shutdown if @secondary if @buffering && @buffer @@ -375,16 +375,18 @@ def before_shutdown end @buffer.before_shutdown end + + super end def shutdown - super @secondary.shutdown if @secondary @buffer.shutdown if @buffering && @buffer + + super end def after_shutdown - super try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it @secondary.after_shutdown if @secondary @@ -399,18 +401,22 @@ def after_shutdown state.thread.join end end + + super end def close - super @buffer.close if @buffering && @buffer @secondary.close if @secondary + + super end def terminate - super @buffer.terminate if @buffering && @buffer @secondary.terminate if @secondary + + super end def support_in_v12_style?(feature) diff --git a/lib/fluent/plugin/socket_util.rb b/lib/fluent/plugin/socket_util.rb index f2c4e99279..75b3fbe9fb 100644 --- a/lib/fluent/plugin/socket_util.rb +++ b/lib/fluent/plugin/socket_util.rb @@ -118,6 +118,8 @@ def configure(conf) end def start + super + @loop = Coolio::Loop.new @handler = listen(method(:on_message)) @loop.attach(@handler) @@ -129,6 +131,8 @@ def shutdown @loop.stop if @loop.instance_variable_get("@running") @handler.close @thread.join + + super end def run diff --git a/lib/fluent/plugin_helper/thread.rb b/lib/fluent/plugin_helper/thread.rb index 4e40604505..315335a3c5 100644 --- a/lib/fluent/plugin_helper/thread.rb +++ b/lib/fluent/plugin_helper/thread.rb @@ -59,13 +59,13 @@ def thread_create(title) begin yield thread_exit = true - rescue => e + rescue Exception => e log.warn "thread exited by unexpected error", plugin: self.class, title: title, error: e thread_exit = true raise ensure unless thread_exit - log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title + log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, error: $! end @_threads_mutex.synchronize do @_threads.delete(::Thread.current.object_id) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 00d3a85b7f..a390105f63 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -132,7 +132,7 @@ def lifecycle(desc: false) def start lifecycle(desc: true) do |i| # instance - i.start + i.start unless i.started? end end @@ -157,19 +157,23 @@ def flush! end def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins - lifecycle_safe_sequence = ->(method) { + # Thesee method callers does `rescue Exception` to call methods of shutdown sequence as far as possible + # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. + # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. + + lifecycle_safe_sequence = ->(method, checker) { lifecycle do |instance, kind| begin log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.send(method) - rescue => e + instance.send(method) unless instance.send(checker) + rescue Exception => e log.warn "unexpected error while calling #{method} on #{kind} plugin", pluguin: instance.class, plugin_id: instance.plugin_id, error: e log.warn_backtrace end end } - lifecycle_unsafe_sequence = ->(method) { + lifecycle_unsafe_sequence = ->(method, checker) { operation = case method when :shutdown then "shutting down" when :close then "closing" @@ -182,9 +186,9 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a Thread.current.abort_on_exception = true begin log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.send(method) - rescue => e - log.warn "unexpected error while #{operation} #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e + instance.send(method) unless instance.send(checker) + rescue Exception => e + log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e log.warn_backtrace end end @@ -193,17 +197,17 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a operation_threads.each{|t| t.join } } - lifecycle_safe_sequence.call(:stop) + lifecycle_safe_sequence.call(:stop, :stopped?) - lifecycle_safe_sequence.call(:before_shutdown) + lifecycle_safe_sequence.call(:before_shutdown, :before_shutdown?) - lifecycle_unsafe_sequence.call(:shutdown) + lifecycle_unsafe_sequence.call(:shutdown, :shutdown?) - lifecycle_safe_sequence.call(:after_shutdown) + lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?) - lifecycle_unsafe_sequence.call(:close) + lifecycle_unsafe_sequence.call(:close, :closed?) - lifecycle_safe_sequence.call(:terminate) + lifecycle_safe_sequence.call(:terminate, :terminated?) end def suppress_interval(interval_time) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 4d733cd4d8..38de28fb6e 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -455,10 +455,19 @@ def supervise end def install_main_process_signal_handlers + # Fluentd worker process (worker of ServerEngine) don't use code in serverengine to set signal handlers, + # because it does almost nothing. + # This method is the only method to set signal handlers in Fluentd worker process. + # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group. - # Then serverengine can't handle signal, so need to handle it in this process. + # ServerEngine server process will send SIGTERM to child(spawned) processes by that SIGINT, so + # worker process SHOULD NOT do anything with SIGINT, SHOULD just ignore. trap :INT do $log.debug "fluentd main process get SIGINT" + end + + trap :TERM do + $log.debug "fluentd main process get SIGTERM" unless @finished @finished = true $log.debug "getting start to shutdown main process"