Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shutdown sequence to wait force_flush #1009

Merged
merged 3 commits into from
May 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run
break unless (thread_current_running? && Time.now.to_i <= current_time)
wait(0.1) { emit(batch_num) }
end
emit(residual_num)
emit(residual_num) if thread_current_running?
# wait for next second
while thread_current_running? && Time.now.to_i <= current_time
sleep 0.01
Expand Down
16 changes: 12 additions & 4 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def setup_error_label(e)
@error_collector = error_label.event_router
end

def lifecycle(desc: false)
def lifecycle(desc: false, kind_callback: nil)
kind_or_label_list = if desc
[:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten
else
Expand All @@ -127,6 +127,9 @@ def lifecycle(desc: false)
yield instance, display_kind
end
end
if kind_callback
kind_callback.call
end
end
end

Expand Down Expand Up @@ -175,13 +178,18 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a

lifecycle_unsafe_sequence = ->(method, checker) {
operation = case method
when :before_shutdown then "preparing shutdown"
when :shutdown then "shutting down"
when :close then "closing"
else
raise "BUG: unknown method name '#{method}'"
end
operation_threads = []
lifecycle do |instance, kind|
callback = ->(){
operation_threads.each{|t| t.join }
operation_threads.clear
}
lifecycle(kind_callback: callback) do |instance, kind|
t = Thread.new do
Thread.current.abort_on_exception = true
begin
Expand All @@ -194,12 +202,12 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a
end
operation_threads << t
end
operation_threads.each{|t| t.join }
}

lifecycle_safe_sequence.call(:stop, :stopped?)

lifecycle_safe_sequence.call(:before_shutdown, :before_shutdown?)
# before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation
lifecycle_unsafe_sequence.call(:before_shutdown, :before_shutdown?)

lifecycle_unsafe_sequence.call(:shutdown, :shutdown?)

Expand Down