Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lifecycle control #965

Merged
merged 5 commits into from
May 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 12 additions & 24 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def initialize
@system_config = SystemConfig.new
end

MAINLOOP_SLEEP_INTERVAL = 0.3

MATCH_CACHE_SIZE = 1024
LOG_EMIT_INTERVAL = 0.1

Expand Down Expand Up @@ -177,38 +179,24 @@ def run
@log_emit_thread = Thread.new(&method(:log_event_loop))
end

unless @engine_stopped
# for empty loop
@default_loop = Coolio::Loop.default
@default_loop.attach Coolio::TimerWatcher.new(1, true)
# TODO attach async watch for thread pool
@default_loop.run
end

if @engine_stopped and @default_loop
@default_loop.stop
@default_loop = nil
end
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped

rescue => e
rescue Exception => e
$log.error "unexpected error", error: e
$log.error_backtrace
ensure
$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
raise
end

$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
end

def stop
@engine_stopped = true
if @default_loop
@default_loop.stop
@default_loop = nil
end
nil
end

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_debug_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def configure(conf)
end

def start
super

if @unix_path
require 'drb/unix'
uri = "drbunix:#{@unix_path}"
Expand All @@ -55,6 +57,8 @@ def start

def shutdown
@server.stop_service if @server

super
end
end
end
1 change: 1 addition & 0 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def start
def shutdown
@running = false
@thread.join
super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def setup_parser(conf)
end

def start
super

if @run_interval
@finished = false
@thread = Thread.new(&method(:run_periodic))
Expand Down Expand Up @@ -134,6 +136,8 @@ def shutdown
end
@thread.join
end

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def configure(conf)
end

def start
super

@loop = Coolio::Loop.new

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
Expand Down Expand Up @@ -89,6 +91,8 @@ def shutdown
@usock.close
@thread.join
@lsock.close

super
end

def listen(client)
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_gc_stat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def configure(conf)
end

def start
super

@loop = Coolio::Loop.new
@timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer))
@loop.attach(@timer)
Expand All @@ -60,6 +62,8 @@ def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join

super
end

def run
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def start
detach_multi_process do
super
@km = KeepaliveManager.new(@keepalive_timeout)
#@lsock = Coolio::TCPServer.new(@bind, @port, Handler, @km, method(:on_request), @body_size_limit)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
@body_size_limit, @format, log,
@cors_allow_origins)
Expand All @@ -131,6 +130,8 @@ def shutdown
@loop.stop
@lsock.close
@thread.join

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def on_timer
end

def start
super

log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
@srv = WEBrick::HTTPServer.new({
BindAddress: @bind,
Expand Down Expand Up @@ -290,6 +292,8 @@ def shutdown
@thread_for_emit.join
@thread_for_emit = nil
end

super
end

MONITOR_INFO = {
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_object_space.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def configure(conf)
end

def start
super

@loop = Coolio::Loop.new
@timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer))
@loop.attach(@timer)
Expand All @@ -62,6 +64,8 @@ def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def initialize
end

def start
super

@loop = Coolio::Loop.new
@lsock = listen
@loop.attach(@lsock)
Expand All @@ -46,6 +48,8 @@ def shutdown
@loop.stop
@lsock.close
@thread.join

super
end

#def listen
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def configure(conf)
end

def start
super

callback = if @use_default
method(:receive_data)
else
Expand All @@ -128,6 +130,8 @@ def shutdown
@loop.stop
@handler.close
@thread.join

super
end

def run
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def configure_tag
end

def start
super

if @pos_file
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
Expand All @@ -139,6 +141,8 @@ def shutdown
@loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception.
@thread.join
@pf_file.close if @pf_file

super
end

def expand_paths
Expand Down
16 changes: 10 additions & 6 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,19 @@ def configure(conf)
end

def start
@outputs.each {|o|
o.start
}
super

@outputs.each do |o|
o.start unless o.started?
end
end

def shutdown
@outputs.each {|o|
o.shutdown
}
@outputs.each do |o|
o.shutdown unless o.shutdown?
end

super
end

def emit(tag, es, chain)
Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/plugin/out_exec_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,22 @@ def start
end

def before_shutdown
super
log.debug "out_exec_filter#before_shutdown called"
@children.each {|c|
c.finished = true
}
sleep 0.5 # TODO wait time before killing child process
end

def shutdown
super
end

def shutdown
@children.reject! {|c|
c.shutdown
true
}

super
end

def format_stream(tag, es)
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def shutdown
end
@thread.join if @thread
@usock.close if @usock

super
end

def run
Expand Down
14 changes: 0 additions & 14 deletions lib/fluent/plugin/out_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,6 @@ module Fluent
class NullOutput < Output
Plugin.register_output('null', self)

def initialize
super
end

def configure(conf)
super
end

def start
end

def shutdown
end

def emit(tag, es, chain)
chain.next
end
Expand Down
16 changes: 10 additions & 6 deletions lib/fluent/plugin/out_roundrobin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,21 @@ def configure(conf)
end

def start
super

rebuild_weight_array

@outputs.each {|o|
o.start
}
@outputs.each do |o|
o.start unless o.started?
end
end

def shutdown
@outputs.each {|o|
o.shutdown
}
@outputs.each do |o|
o.shutdown unless o.shutdown?
end

super
end

def emit(tag, es, chain)
Expand Down
Loading