Skip to content

Commit

Permalink
Merge pull request #346 from fluent/support-read-from-head-without-po…
Browse files Browse the repository at this point in the history
…s_file

Support read_from_head without pos_file option
  • Loading branch information
repeatedly committed Jun 13, 2014
2 parents 2a2f485 + 06d176b commit aded271
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 21 deletions.
7 changes: 4 additions & 3 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def refresh_watchers
end

def setup_watcher(path, pe)
tw = TailWatcher.new(path, @rotate_wait, pe, log, method(:update_watcher), &method(:receive_lines))
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, method(:update_watcher), &method(:receive_lines))
tw.attach(@loop)
tw
end
Expand Down Expand Up @@ -275,10 +275,11 @@ def parse_multilines(lines, tail_watcher)
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, update_watcher, &receive_lines)
def initialize(path, rotate_wait, pe, log, read_from_head, update_watcher, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@receive_lines = receive_lines
@update_watcher = update_watcher

Expand Down Expand Up @@ -354,7 +355,7 @@ def on_rotate(io)
# seek to the end of the any files.
# logs may duplicate without this seek because it's not sure the file is
# existent file or rotated new file.
pos = fsize
pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
end
io.seek(pos)
Expand Down
7 changes: 3 additions & 4 deletions lib/fluent/test/input_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ def records

def run(&block)
m = method(:emit_stream)
Engine.define_singleton_method(:emit_stream) {|tag,es|
m.call(tag, es)
}
super {
Engine.define_singleton_method(:emit_stream) {|tag,es|
m.call(tag, es)
}

block.call if block

if @expects
Expand Down
116 changes: 102 additions & 14 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ def setup

TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}"

COMMON_CONFIG = %[
CONFIG = %[
path #{TMP_DIR}/tail.txt
tag t1
rotate_wait 2s
]
COMMON_CONFIG = CONFIG + %[
pos_file #{TMP_DIR}/tail.pos
]
CONFIG_READ_FROM_HEAD = %[
read_from_head true
]
SINGLE_LINE_CONFIG = %[
format /(?<message>.*)/
]
Expand All @@ -36,6 +41,8 @@ def test_configure
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
end

# TODO: Should using more better approach instead of sleep wait

def test_emit
File.open("#{TMP_DIR}/tail.txt", "w") {|f|
f.puts "test1"
Expand All @@ -56,8 +63,89 @@ def test_emit

emits = d.emits
assert_equal(true, emits.length > 0)
assert_equal({"message"=>"test3"}, emits[0][2])
assert_equal({"message"=>"test4"}, emits[1][2])
assert_equal({"message" => "test3"}, emits[0][2])
assert_equal({"message" => "test4"}, emits[1][2])
end

def test_emit_with_read_from_head
File.open("#{TMP_DIR}/tail.txt", "w") {|f|
f.puts "test1"
f.puts "test2"
}

d = create_driver(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG)

d.run do
sleep 1

File.open("#{TMP_DIR}/tail.txt", "a") {|f|
f.puts "test3"
f.puts "test4"
}
sleep 1
end

emits = d.emits
assert(emits.length > 0)
assert_equal({"message" => "test1"}, emits[0][2])
assert_equal({"message" => "test2"}, emits[1][2])
assert_equal({"message" => "test3"}, emits[2][2])
assert_equal({"message" => "test4"}, emits[3][2])
end

def test_rotate_file
emits = sub_test_rotate_file(SINGLE_LINE_CONFIG)
assert(emits.length > 0)
assert_equal({"message" => "test3"}, emits[0][2])
assert_equal({"message" => "test4"}, emits[1][2])
assert_equal({"message" => "test5"}, emits[2][2])
assert_equal({"message" => "test6"}, emits[3][2])
end

def test_rotate_file_with_read_from_head
emits = sub_test_rotate_file(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG)
assert(emits.length > 0)
assert_equal({"message" => "test1"}, emits[0][2])
assert_equal({"message" => "test2"}, emits[1][2])
assert_equal({"message" => "test3"}, emits[2][2])
assert_equal({"message" => "test4"}, emits[3][2])
assert_equal({"message" => "test5"}, emits[4][2])
assert_equal({"message" => "test6"}, emits[5][2])
end

def sub_test_rotate_file(config = nil)
File.open("#{TMP_DIR}/tail.txt", "w") {|f|
f.puts "test1"
f.puts "test2"
}
d = create_driver(config)
d.run do
sleep 1

File.open("#{TMP_DIR}/tail.txt", "a") {|f|
f.puts "test3"
f.puts "test4"
}
sleep 1

FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt")
sleep 1

File.open("#{TMP_DIR}/tail.txt", "w") {|f| }
sleep 1

File.open("#{TMP_DIR}/tail.txt", "a") {|f|
f.puts "test5"
f.puts "test6"
}
sleep 1
end

d.run do
sleep 1
end

d.emits
end

def test_lf
Expand All @@ -79,7 +167,7 @@ def test_lf

emits = d.emits
assert_equal(true, emits.length > 0)
assert_equal({"message"=>"test3test4"}, emits[0][2])
assert_equal({"message" => "test3test4"}, emits[0][2])
end

def test_whitespace
Expand All @@ -103,12 +191,12 @@ def test_whitespace

emits = d.emits
assert_equal(true, emits.length > 0)
assert_equal({"message"=>" "}, emits[0][2])
assert_equal({"message"=>" 4 spaces"}, emits[1][2])
assert_equal({"message"=>"4 spaces "}, emits[2][2])
assert_equal({"message"=>" "}, emits[3][2])
assert_equal({"message"=>" tab"}, emits[4][2])
assert_equal({"message"=>"tab "}, emits[5][2])
assert_equal({"message" => " "}, emits[0][2])
assert_equal({"message" => " 4 spaces"}, emits[1][2])
assert_equal({"message" => "4 spaces "}, emits[2][2])
assert_equal({"message" => " "}, emits[3][2])
assert_equal({"message" => " tab"}, emits[4][2])
assert_equal({"message" => "tab "}, emits[5][2])
end

# multiline mode test
Expand Down Expand Up @@ -247,7 +335,7 @@ def test_refresh_watchers

flexstub(Fluent::NewTailInput::TailWatcher) do |watcherclass|
EX_PATHS.each do |path|
watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, any, any).once.and_return do
watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, true, any, any).once.and_return do
flexmock('TailWatcher') { |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand All @@ -263,7 +351,7 @@ def test_refresh_watchers
end

flexstub(Fluent::NewTailInput::TailWatcher) do |watcherclass|
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, any, any).once.and_return do
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, true, any, any).once.and_return do
flexmock('TailWatcher') do |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand Down Expand Up @@ -368,8 +456,8 @@ def test_missing_file
end
emits = d.emits
assert_equal(2, emits.length)
assert_equal({"message"=>"test3"}, emits[0][2])
assert_equal({"message"=>"test4"}, emits[1][2])
assert_equal({"message" => "test3"}, emits[0][2])
assert_equal({"message" => "test4"}, emits[1][2])
end
end
end

0 comments on commit aded271

Please sign in to comment.