Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Fix line skipping issue in receive_lines method #4530

Merged
merged 12 commits into from
Jul 14, 2024
75 changes: 56 additions & 19 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def update_watcher(tail_watcher, pe, new_inode)
if @follow_inodes && new_inode.nil?
# nil inode means the file disappeared, so we only need to stop it.
@tails.delete(tail_watcher.path)
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# Because of this problem, log duplication can occur during `rotate_wait`.
# Need to set `rotate_wait 0` for a workaround.
# Duplication will occur if `refresh_watcher` is called during the `rotate_wait`.
Expand Down Expand Up @@ -696,14 +696,6 @@ def flush_buffer(tw, buf)

# @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
def receive_lines(lines, tail_watcher)
lines = lines.reject do |line|
skip_line = @max_line_size ? line.bytesize > @max_line_size : false
if skip_line
log.warn "received line length is longer than #{@max_line_size}"
log.debug "skipped line: #{line.chomp}"
end
skip_line
end
es = @receive_handler.call(lines, tail_watcher)
unless es.empty?
tag = if @tag_prefix || @tag_suffix
Expand Down Expand Up @@ -819,6 +811,7 @@ def io_handler(watcher, path)
from_encoding: @from_encoding,
encoding: @encoding,
metrics: @metrics,
max_line_size: @max_line_size,
&method(:receive_lines)
)
end
Expand Down Expand Up @@ -1011,15 +1004,19 @@ def swap_state(pe)
end

class FIFO
def initialize(from_encoding, encoding)
def initialize(from_encoding, encoding, log, max_line_size=nil)
@from_encoding = from_encoding
@encoding = encoding
@need_enc = from_encoding != encoding
@buffer = ''.force_encoding(from_encoding)
@eol = "\n".encode(from_encoding).freeze
@max_line_size = max_line_size
@skip_current_line = false
@skipping_current_line_bytesize = 0
@log = log
end

attr_reader :from_encoding, :encoding, :buffer
attr_reader :from_encoding, :encoding, :buffer, :max_line_size

def <<(chunk)
# Although "chunk" is most likely transient besides String#force_encoding itself
Expand Down Expand Up @@ -1051,6 +1048,7 @@ def convert(s)

def read_lines(lines)
idx = @buffer.index(@eol)
has_skipped_line = false

until idx.nil?
# Using freeze and slice is faster than slice!
Expand All @@ -1059,11 +1057,47 @@ def read_lines(lines)
rbuf = @buffer.slice(0, idx + 1)
@buffer = @buffer.slice(idx + 1, @buffer.size)
idx = @buffer.index(@eol)

is_long_line = @max_line_size && (
@skip_current_line || rbuf.bytesize > @max_line_size
)

if is_long_line
@log.warn "received line length is longer than #{@max_line_size}"
if @skip_current_line
@log.debug("The continuing line is finished. Finally discarded data: ") { convert(rbuf).chomp }
else
@log.debug("skipped line: ") { convert(rbuf).chomp }
end
has_skipped_line = true
@skip_current_line = false
@skipping_current_line_bytesize = 0
next
end

lines << convert(rbuf)
end

is_long_current_line = @max_line_size && (
@skip_current_line || @buffer.bytesize > @max_line_size
)

if is_long_current_line
@log.debug(
"The continuing current line length is longer than #{@max_line_size}." +
" The received data will be discarded until this line is finished." +
" Discarded data: "
) { convert(@buffer).chomp }
@skip_current_line = true
daipom marked this conversation as resolved.
Show resolved Hide resolved
@skipping_current_line_bytesize += @buffer.bytesize
@buffer.clear
end

return has_skipped_line
end

def bytesize
def reading_bytesize
return @skipping_current_line_bytesize if @skip_current_line
@buffer.bytesize
end
end
Expand All @@ -1074,14 +1108,14 @@ class IOHandler

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size)
@iobuf = ''.force_encoding('ASCII-8BIT')
@lines = []
@io = nil
Expand Down Expand Up @@ -1164,6 +1198,7 @@ def handle_notify
with_io do |io|
begin
read_more = false
has_skipped_line = false

if !io.nil? && @lines.empty?
begin
Expand All @@ -1177,7 +1212,7 @@ def handle_notify
@fifo << data

n_lines_before_read = @lines.size
@fifo.read_lines(@lines)
has_skipped_line = @fifo.read_lines(@lines) || has_skipped_line
group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read)

group_watcher_limit = group_watcher&.limit_lines_reached?(@path)
Expand All @@ -1200,9 +1235,11 @@ def handle_notify
end
end

unless @lines.empty?
if @lines.empty?
@watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) if has_skipped_line
else
if @receive_lines.call(@lines, @watcher)
@watcher.pe.update_pos(io.pos - @fifo.bytesize)
@watcher.pe.update_pos(io.pos - @fifo.reading_bytesize)
@lines.clear
else
read_more = false
Expand All @@ -1214,12 +1251,12 @@ def handle_notify

def open
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
io.seek(@watcher.pe.read_pos + @fifo.reading_bytesize)
@metrics.opened.inc
io
rescue RangeError
io.close if io
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.reading_bytesize.to_s(16)}"
rescue Errno::EACCES => e
@log.warn "#{e}"
nil
Expand Down
98 changes: 84 additions & 14 deletions test/plugin/in_tail/test_fifo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class IntailFIFO < Test::Unit::TestCase
sub_test_case '#read_line' do
test 'returns lines spliting per `\n`' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'concant line when line is separated' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'returns lines which convert encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log)
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'reads lines as from_encoding' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase

sub_test_case 'when it includes multi byte chars' do
test 'handles it as ascii_8bit' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT)
fifo << text
lines = []
Expand All @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'replaces character with ? when convert error happens' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8)
fifo << text
lines = []
Expand All @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase
end

test 'reutrns nothing when buffer is empty' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
lines = []
fifo.read_lines(lines)
assert_equal [], lines
Expand All @@ -86,11 +86,54 @@ class IntailFIFO < Test::Unit::TestCase
fifo.read_lines(lines)
assert_equal [], lines
end

data('bigger than max_line_size', [
["test test test\n" * 3],
[],
])
data('less than or equal to max_line_size', [
["test\n" * 2],
["test\n", "test\n"],
])
data('mix', [
["test test test\ntest\ntest test test\ntest\ntest test test\n"],
["test\n", "test\n"],
])
data('mix and multiple', [
[
"test test test\ntest\n",
"test",
" test test\nt",
"est\nt"
],
["test\n", "test\n"],
])
data('remaining data bigger than max_line_size should be discarded', [
[
"test\nlong line still not having EOL",
"following texts to the previous long line\ntest\n",
],
["test\n", "test\n"],
])
test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)|
max_line_size = 5
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
lines = []

input_texts.each do |text|
fifo << text.force_encoding(Encoding::ASCII_8BIT)
fifo.read_lines(lines)
# The size of remaining buffer (i.e. a line still not having EOL) must not exceed max_line_size.
assert { fifo.buffer.bytesize <= max_line_size }
end

assert_equal expected, lines
end
end

sub_test_case '#<<' do
test 'does not make any change about encoding to an argument' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)

assert_equal Encoding::UTF_8, text.encoding
Expand All @@ -99,23 +142,50 @@ class IntailFIFO < Test::Unit::TestCase
end
end

sub_test_case '#bytesize' do
test 'reutrns buffer size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT)
sub_test_case '#reading_bytesize' do
test 'returns buffer size' do
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
text = "test\n" * 3 + 'test'
fifo << text

assert_equal text.bytesize, fifo.bytesize
assert_equal text.bytesize, fifo.reading_bytesize
lines = []
fifo.read_lines(lines)
assert_equal ["test\n", "test\n", "test\n"], lines

assert_equal 'test'.bytesize, fifo.bytesize
assert_equal 'test'.bytesize, fifo.reading_bytesize
fifo << "2\n"
fifo.read_lines(lines)
assert_equal ["test\n", "test\n", "test\n", "test2\n"], lines

assert_equal 0, fifo.bytesize
assert_equal 0, fifo.reading_bytesize
end

test 'returns the entire line size even if the size is over max_line_size' do
max_line_size = 20
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
lines = []

text = "long line still not having EOL"
fifo << text
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal text.bytesize, fifo.reading_bytesize

text2 = " following texts"
fifo << text2
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal text.bytesize + text2.bytesize, fifo.reading_bytesize

text3 = " end of the line\n"
fifo << text3
fifo.read_lines(lines)
assert_equal [], lines
assert_equal 0, fifo.buffer.bytesize
assert_equal 0, fifo.reading_bytesize
end
end
end
Loading