Skip to content

Commit

Permalink
Merge pull request #1190 from fluent/add-after_start-to-detect-end-of…
Browse files Browse the repository at this point in the history
…-start

Add Fluent::Plugin::Base#after_start to detect end of #start
  • Loading branch information
tagomoris authored Aug 29, 2016
2 parents babfa10 + cda68c7 commit e9ac872
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 3 deletions.
15 changes: 12 additions & 3 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ class Base
include Configurable
include SystemConfig::Mixin

State = Struct.new(:configure, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)
State = Struct.new(:configure, :start, :after_start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)

def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false)
@_state = State.new(false, false, false, false, false, false, false, false, false)
end

def has_router?
Expand All @@ -37,7 +37,7 @@ def has_router?

def configure(conf)
super
@_state ||= State.new(false, false, false, false, false, false, false, false)
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
@_state.configure = true
self
end
Expand All @@ -47,6 +47,11 @@ def start
self
end

def after_start
@_state.after_start = true
self
end

def stop
@_state.stop = true
self
Expand Down Expand Up @@ -85,6 +90,10 @@ def started?
@_state.start
end

def after_started?
@_state.after_start
end

def stopped?
@_state.stop
end
Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ def start
@secondary.start if @secondary
end

def after_start
super
@secondary.after_start if @secondary
end

def stop
@secondary.stop if @secondary
@buffer.stop if @buffering && @buffer
Expand Down Expand Up @@ -922,6 +927,11 @@ def enqueue_thread_run
interval = @buffer_config.flush_thread_interval
end

while !self.after_started? && !self.stopped?
sleep 0.5
end
log.debug "enqueue_thread actually running"

begin
while @output_flush_threads_running
now_int = Time.now.to_i
Expand Down Expand Up @@ -969,6 +979,11 @@ def flush_thread_run(state)
clock_id = Process::CLOCK_MONOTONIC rescue Process::CLOCK_MONOTONIC_RAW
state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval

while !self.after_started? && !self.stopped?
sleep 0.5
end
log.debug "flush_thread actually running"

begin
# This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
while @output_flush_threads_running
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def start
lifecycle(desc: true) do |i| # instance
i.start unless i.started?
end
lifecycle(desc: true) do |i|
i.after_start unless i.after_started?
end
end

def flush!
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/test/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def configure(str, use_v1 = false)
# num_waits is for checking thread status. This will be removed after improved plugin API
def run(num_waits = 10, &block)
@instance.start
@instance.after_start
begin
# wait until thread starts
num_waits.times { sleep 0.05 }
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def instance_start
@instance.start
instance_hook_after_started
end
unless @instance.after_started?
@instance.after_start
end
end

def instance_hook_after_started
Expand Down
12 changes: 12 additions & 0 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def waiting(seconds)
assert !@i.started?
@i.start
assert @i.started?
assert !@i.after_started?
@i.after_start
assert @i.after_started?
assert !@i.stopped?
@i.stop
assert @i.stopped?
Expand Down Expand Up @@ -329,6 +332,7 @@ def waiting(seconds)
i.register(:process){|tag, es| process_called = true }
i.configure(config_element())
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -344,6 +348,7 @@ def waiting(seconds)
i.register(:format){|tag, time, record| format_called_times += 1; '' }
i.configure(config_element())
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -364,6 +369,7 @@ def waiting(seconds)
i.configure(config_element())
i.register(:prefer_buffered_processing){ false } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert !i.prefer_buffered_processing

Expand All @@ -389,6 +395,7 @@ def waiting(seconds)
i.configure(config_element())
i.register(:prefer_buffered_processing){ true } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert i.prefer_buffered_processing

Expand All @@ -408,6 +415,7 @@ def waiting(seconds)

i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -427,6 +435,7 @@ def waiting(seconds)

i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -449,6 +458,7 @@ def waiting(seconds)
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.register(:prefer_delayed_commit){ false } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert !i.prefer_delayed_commit

Expand All @@ -474,6 +484,7 @@ def waiting(seconds)
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.register(:prefer_delayed_commit){ true } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert i.prefer_delayed_commit

Expand Down Expand Up @@ -542,6 +553,7 @@ def waiting(seconds)
@i.register(:process){|tag, es| ary << [tag, es] }
@i.configure(config_element())
@i.start
@i.after_start

t = event_time()
es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])
Expand Down
13 changes: 13 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def waiting(seconds)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start
@i.after_start
assert{ logs.select{|log| log.include?('[warn]') }.size == 0 }
end

Expand All @@ -154,6 +155,7 @@ def waiting(seconds)
logs = @i.log.out.logs.dup

@i.start # this calls `log.reset`... capturing logs about configure must be done before this line
@i.after_start
assert_equal ['key1', 'key2', 'key3', 'key4'], @i.chunk_keys

assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 }
Expand All @@ -164,6 +166,7 @@ def waiting(seconds)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start # this calls `log.reset`... capturing logs about configure must be done before this line
@i.after_start
assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 }
end

Expand All @@ -172,6 +175,7 @@ def waiting(seconds)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start
@i.after_start
assert{ logs.select{|log| log.include?('[warn]') }.size == 0 }
end
end
Expand All @@ -187,6 +191,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
@i.start
@i.after_start
end

test '#start does not create enqueue thread, but creates flush threads' do
Expand Down Expand Up @@ -289,6 +294,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
@i.start
@i.after_start
end

test '#start creates enqueue thread and flush threads' do
Expand Down Expand Up @@ -398,6 +404,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
@i.start
@i.after_start
end

test '#start does not create enqueue thread, but creates flush threads' do
Expand Down Expand Up @@ -491,6 +498,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test '#configure raises config error if timekey is not specified' do
Expand Down Expand Up @@ -706,6 +714,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test 'default flush_mode is set to :interval' do
Expand Down Expand Up @@ -922,6 +931,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test 'default flush_mode is set to :interval' do
Expand Down Expand Up @@ -1134,6 +1144,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start

assert_equal :interval, @i.instance_eval{ @flush_mode }
end
Expand All @@ -1150,6 +1161,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start

assert_equal :lazy, @i.instance_eval{ @flush_mode }
end
Expand All @@ -1168,6 +1180,7 @@ def waiting(seconds)
@i = create_output(:delayed)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test '#format is called for each event streams' do
Expand Down
3 changes: 3 additions & 0 deletions test/plugin/test_output_as_buffered_overflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def waiting(seconds)
@i = create_output()
@i.configure(config_element('ROOT','',{},[config_element('buffer','tag',hash)]))
@i.start
@i.after_start
end

test '#emit_events raises error when buffer is full' do
Expand Down Expand Up @@ -108,6 +109,7 @@ def waiting(seconds)
@i = create_output()
@i.configure(config_element('ROOT','',{'log_level' => 'debug'},[config_element('buffer','tag',hash)]))
@i.start
@i.after_start
end

test '#emit_events blocks until any queues are flushed' do
Expand Down Expand Up @@ -169,6 +171,7 @@ def waiting(seconds)
@i = create_output()
@i.configure(config_element('ROOT','',{'log_level' => 'debug'},[config_element('buffer','tag',hash)]))
@i.start
@i.after_start
end

test '#emit_events will success by dropping oldest chunk' do
Expand Down
Loading

0 comments on commit e9ac872

Please sign in to comment.