diff --git a/lib/fluent/plugin/in_dummy.rb b/lib/fluent/plugin/in_dummy.rb index 5f1bb71c4b..648d620e55 100644 --- a/lib/fluent/plugin/in_dummy.rb +++ b/lib/fluent/plugin/in_dummy.rb @@ -80,7 +80,7 @@ def run break unless (thread_current_running? && Time.now.to_i <= current_time) wait(0.1) { emit(batch_num) } end - emit(residual_num) + emit(residual_num) if thread_current_running? # wait for next second while thread_current_running? && Time.now.to_i <= current_time sleep 0.01 diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 26dceb752b..45be7fac7c 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -104,7 +104,7 @@ def setup_error_label(e) @error_collector = error_label.event_router end - def lifecycle(desc: false) + def lifecycle(desc: false, kind_callback: nil) kind_or_label_list = if desc [:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten else @@ -127,6 +127,9 @@ def lifecycle(desc: false) yield instance, display_kind end end + if kind_callback + kind_callback.call + end end end @@ -175,13 +178,18 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a lifecycle_unsafe_sequence = ->(method, checker) { operation = case method + when :before_shutdown then "preparing shutdown" when :shutdown then "shutting down" when :close then "closing" else raise "BUG: unknown method name '#{method}'" end operation_threads = [] - lifecycle do |instance, kind| + callback = ->(){ + operation_threads.each{|t| t.join } + operation_threads.clear + } + lifecycle(kind_callback: callback) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true begin @@ -194,12 +202,12 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a end operation_threads << t end - operation_threads.each{|t| t.join } } lifecycle_safe_sequence.call(:stop, :stopped?) - lifecycle_safe_sequence.call(:before_shutdown, :before_shutdown?) + # before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation + lifecycle_unsafe_sequence.call(:before_shutdown, :before_shutdown?) lifecycle_unsafe_sequence.call(:shutdown, :shutdown?)