From d6a15648717ccef95b860ede7add1310820142ca Mon Sep 17 00:00:00 2001 From: "been.zino" Date: Fri, 10 May 2024 19:25:25 +0900 Subject: [PATCH 01/12] Fix line skipping issue in receive_lines method Before this patch, long lines could cause breakdowns in fluentd, potentially posing a vulnerability. With this patch, max_line_size will be integrated into the FIFO, enabling the system to skip lines exceeding the maximum size before executing receive_lines. Co-authored-by: yugeeklab Co-authored-by: moggaa Signed-off-by: been.zino --- lib/fluent/plugin/in_tail.rb | 42 +++++++++++----- test/plugin/in_tail/test_fifo.rb | 38 ++++++++++---- test/plugin/in_tail/test_io_handler.rb | 31 ++++++++++++ test/plugin/test_in_tail.rb | 70 +++++++++++++------------- 4 files changed, 124 insertions(+), 57 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index fc5761f917..6846d10e83 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -696,14 +696,6 @@ def flush_buffer(tw, buf) # @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError def receive_lines(lines, tail_watcher) - lines = lines.reject do |line| - skip_line = @max_line_size ? line.bytesize > @max_line_size : false - if skip_line - log.warn "received line length is longer than #{@max_line_size}" - log.debug "skipped line: #{line.chomp}" - end - skip_line - end es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix @@ -819,6 +811,7 @@ def io_handler(watcher, path) from_encoding: @from_encoding, encoding: @encoding, metrics: @metrics, + max_line_size: @max_line_size, &method(:receive_lines) ) end @@ -1011,15 +1004,18 @@ def swap_state(pe) end class FIFO - def initialize(from_encoding, encoding) + def initialize(from_encoding, encoding, log, max_line_size=nil) @from_encoding = from_encoding @encoding = encoding @need_enc = from_encoding != encoding @buffer = ''.force_encoding(from_encoding) @eol = "\n".encode(from_encoding).freeze + @max_line_size = max_line_size + @was_long_line = false + @log = log end - attr_reader :from_encoding, :encoding, :buffer + attr_reader :from_encoding, :encoding, :buffer, :max_line_size def <<(chunk) # Although "chunk" is most likely transient besides String#force_encoding itself @@ -1059,7 +1055,27 @@ def read_lines(lines) rbuf = @buffer.slice(0, idx + 1) @buffer = @buffer.slice(idx + 1, @buffer.size) idx = @buffer.index(@eol) - lines << convert(rbuf) + + is_long_line = !@max_line_size.nil? && ( + rbuf.bytesize > @max_line_size || @was_long_line + ) + + if !is_long_line + lines << convert(rbuf) + else + @log.warn "received line length is longer than #{@max_line_size}" + @log.debug "skipped line" + @was_long_line = false + end + end + + is_long_line = !@max_line_size.nil? && ( + @buffer.bytesize > @max_line_size || @was_long_line + ) + + if is_long_line + @buffer = ''.force_encoding(@from_encoding) + @was_long_line = true end end @@ -1074,14 +1090,14 @@ class IOHandler attr_accessor :shutdown_timeout - def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) + def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) @watcher = watcher @path = path @read_lines_limit = read_lines_limit @read_bytes_limit_per_second = read_bytes_limit_per_second @receive_lines = receive_lines @open_on_every_update = open_on_every_update - @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) + @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size) @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] @io = nil diff --git a/test/plugin/in_tail/test_fifo.rb b/test/plugin/in_tail/test_fifo.rb index 86345d8bb9..acd6f9e51d 100644 --- a/test/plugin/in_tail/test_fifo.rb +++ b/test/plugin/in_tail/test_fifo.rb @@ -5,7 +5,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#read_line' do test 'returns lines spliting per `\n`' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'concant line when line is separated' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'returns lines which convert encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'reads lines as from_encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case 'when it includes multi byte chars' do test 'handles it as ascii_8bit' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'replaces character with ? when convert error happens' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'reutrns nothing when buffer is empty' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) lines = [] fifo.read_lines(lines) assert_equal [], lines @@ -86,11 +86,31 @@ class IntailFIFO < Test::Unit::TestCase fifo.read_lines(lines) assert_equal [], lines end + + test 'return empty line when line size is bigger than max_line_size' do + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, 5) + t = "test" * 3 + text = ("#{t}\n" * 3).force_encoding(Encoding::ASCII_8BIT) + fifo << text + lines = [] + fifo.read_lines(lines) + assert_equal [], lines + end + + test 'return lines when line size is less than or equal to max_line_size' do + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, 5) + t = "test" + text = ("#{t}\n" * 2).force_encoding(Encoding::ASCII_8BIT) + fifo << text + lines = [] + fifo.read_lines(lines) + assert_equal ["test\n", "test\n"], lines + end end sub_test_case '#<<' do test 'does not make any change about encoding to an argument' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) assert_equal Encoding::UTF_8, text.encoding @@ -101,7 +121,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#bytesize' do test 'reutrns buffer size' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = "test\n" * 3 + 'test' fifo << text diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 647c4a55a1..84d39ceaae 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -146,5 +146,36 @@ def create_watcher assert_equal 5, returned_lines[0].size assert_equal 3, returned_lines[1].size end + + test 'call receive_lines some times when long line(more than 8192) and max_line_size is 4096' do + t = 'line' * (8192 / 8) + text = "#{t}\n" * 8 + t = 'line' * 8 + text += "#{t}\n" * 8 + + @file.write(text) + @file.close + + update_pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos {0} + stub(pe).update_pos { |val| update_pos = val } + pe + end + + returned_lines = [] + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, max_line_size: 4096, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| + returned_lines << lines.dup + true + end + + r.on_notify + assert_equal text.bytesize, update_pos + assert_equal 1, returned_lines.size + assert_equal 8, returned_lines[0].size + end end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index d30595ce9a..819bb37aaf 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2011,41 +2011,6 @@ def test_tag_prefix_and_suffix_ignore mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end - - data( - small: ["128", 128], - KiB: ["1k", 1024] - ) - test 'max_line_size' do |(label, size)| - config = config_element("", "", { - "tag" => "max_line_size", - "path" => "#{@tmp_dir}/with_long_lines.txt", - "format" => "none", - "read_from_head" => true, - "max_line_size" => label, - "log_level" => "debug" - }) - Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| - f.puts "foo" - f.puts "x" * size # 'x' * size + \n > @max_line_size - f.puts "bar" - end - d = create_driver(config, false) - timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021") - Timecop.freeze(timestamp) - d.run(expect_records: 2) - assert_equal([ - [{"message" => "foo"},{"message" => "bar"}], - [ - "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", - "2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n" - ] - ], - [ - d.events.collect { |event| event.last }, - d.logs[-2..] - ]) - end end # Ensure that no fatal exception is raised when a file is missing and that @@ -3426,4 +3391,39 @@ def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait ) end end + + data( + small: ["128", 128], + KiB: ["1k", 1024] + ) + test 'max_line_size' do |(label, size)| + config = config_element("", "", { + "tag" => "max_line_size", + "path" => "#{@tmp_dir}/with_long_lines.txt", + "format" => "none", + "read_from_head" => true, + "max_line_size" => label, + "log_level" => "debug" + }) + Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| + f.puts "foo" + f.puts "x" * size # 'x' * size + \n > @max_line_size + f.puts "bar" + end + d = create_driver(config, false) + timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021") + Timecop.freeze(timestamp) + d.run(expect_records: 2) + assert_equal([ + [{"message" => "foo"},{"message" => "bar"}], + [ + "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", + "2021-11-29 11:22:33 +0000 [debug]: skipped line\n" + ] + ], + [ + d.events.collect { |event| event.last }, + d.logs[-2..] + ]) + end end From 514760ca61be56f2792acc24631dfb3385908b03 Mon Sep 17 00:00:00 2001 From: Taeseong Yu Date: Fri, 31 May 2024 13:26:55 +0900 Subject: [PATCH 02/12] Update lib/fluent/plugin/in_tail.rb Co-authored-by: Daijiro Fukuda Signed-off-by: been.zino --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 6846d10e83..8aec88c25d 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1056,7 +1056,7 @@ def read_lines(lines) @buffer = @buffer.slice(idx + 1, @buffer.size) idx = @buffer.index(@eol) - is_long_line = !@max_line_size.nil? && ( + is_long_line = @max_line_size && ( rbuf.bytesize > @max_line_size || @was_long_line ) From dfc0c4b73303b40d9204759b43ac5605541cc798 Mon Sep 17 00:00:00 2001 From: Taeseong Yu Date: Fri, 31 May 2024 13:27:42 +0900 Subject: [PATCH 03/12] Update lib/fluent/plugin/in_tail.rb Co-authored-by: Daijiro Fukuda Signed-off-by: been.zino --- lib/fluent/plugin/in_tail.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 8aec88c25d..a08c8476df 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1060,13 +1060,14 @@ def read_lines(lines) rbuf.bytesize > @max_line_size || @was_long_line ) - if !is_long_line - lines << convert(rbuf) - else + if is_long_line @log.warn "received line length is longer than #{@max_line_size}" - @log.debug "skipped line" + @log.debug("skipped line: ") { convert(rbuf).chomp } @was_long_line = false + next end + + lines << convert(rbuf) end is_long_line = !@max_line_size.nil? && ( From 8abe35068596f23756174ccc7dfb6666b98d4212 Mon Sep 17 00:00:00 2001 From: Taeseong Yu Date: Fri, 31 May 2024 13:27:59 +0900 Subject: [PATCH 04/12] Update lib/fluent/plugin/in_tail.rb Co-authored-by: Daijiro Fukuda Signed-off-by: been.zino --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a08c8476df..5848e13a81 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1070,7 +1070,7 @@ def read_lines(lines) lines << convert(rbuf) end - is_long_line = !@max_line_size.nil? && ( + is_long_line = @max_line_size && ( @buffer.bytesize > @max_line_size || @was_long_line ) From d35fccdfb65ff21ff5a2a38a0405a54cac4d274d Mon Sep 17 00:00:00 2001 From: Taeseong Yu Date: Fri, 31 May 2024 13:29:05 +0900 Subject: [PATCH 05/12] Update test/plugin/test_in_tail.rb Co-authored-by: Daijiro Fukuda Signed-off-by: been.zino --- test/plugin/test_in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 819bb37aaf..bb16c1ab8e 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3418,7 +3418,7 @@ def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait [{"message" => "foo"},{"message" => "bar"}], [ "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", - "2021-11-29 11:22:33 +0000 [debug]: skipped line\n" + "2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n" ] ], [ From cffdfffa3554a21a48401ead3dfd59dad9d85385 Mon Sep 17 00:00:00 2001 From: Taeseong Yu Date: Fri, 31 May 2024 13:29:28 +0900 Subject: [PATCH 06/12] Update test/plugin/in_tail/test_fifo.rb Co-authored-by: Daijiro Fukuda Signed-off-by: been.zino --- test/plugin/in_tail/test_fifo.rb | 55 ++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/test/plugin/in_tail/test_fifo.rb b/test/plugin/in_tail/test_fifo.rb index acd6f9e51d..ebe0729d49 100644 --- a/test/plugin/in_tail/test_fifo.rb +++ b/test/plugin/in_tail/test_fifo.rb @@ -87,24 +87,47 @@ class IntailFIFO < Test::Unit::TestCase assert_equal [], lines end - test 'return empty line when line size is bigger than max_line_size' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, 5) - t = "test" * 3 - text = ("#{t}\n" * 3).force_encoding(Encoding::ASCII_8BIT) - fifo << text + data('bigger than max_line_size', [ + ["test test test\n" * 3], + [], + ]) + data('less than or equal to max_line_size', [ + ["test\n" * 2], + ["test\n", "test\n"], + ]) + data('mix', [ + ["test test test\ntest\ntest test test\ntest\ntest test test\n"], + ["test\n", "test\n"], + ]) + data('mix and multiple', [ + [ + "test test test\ntest\n", + "test", + " test test\nt", + "est\nt" + ], + ["test\n", "test\n"], + ]) + data('remaining data bigger than max_line_size should be discarded', [ + [ + "test\nlong line still not having EOL", + "following texts to the previous long line\ntest\n", + ], + ["test\n", "test\n"], + ]) + test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)| + max_line_size = 5 + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size) lines = [] - fifo.read_lines(lines) - assert_equal [], lines - end - test 'return lines when line size is less than or equal to max_line_size' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, 5) - t = "test" - text = ("#{t}\n" * 2).force_encoding(Encoding::ASCII_8BIT) - fifo << text - lines = [] - fifo.read_lines(lines) - assert_equal ["test\n", "test\n"], lines + input_texts.each do |text| + fifo << text.force_encoding(Encoding::ASCII_8BIT) + fifo.read_lines(lines) + # The size of remaining buffer (i.e. a line still not having EOL) must not exceed max_line_size. + assert { fifo.bytesize <= max_line_size } + end + + assert_equal expected, lines end end From 9e9947a9a54385b99b4753bf8b483861ee15ad08 Mon Sep 17 00:00:00 2001 From: Taeseong Yu Date: Fri, 31 May 2024 13:29:39 +0900 Subject: [PATCH 07/12] Update lib/fluent/plugin/in_tail.rb Co-authored-by: Daijiro Fukuda Signed-off-by: been.zino --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 5848e13a81..953c6cbab4 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1075,7 +1075,7 @@ def read_lines(lines) ) if is_long_line - @buffer = ''.force_encoding(@from_encoding) + @buffer.clear @was_long_line = true end end From bf73efeea722376f30bccac0f10095fb6601939c Mon Sep 17 00:00:00 2001 From: "been.zino" Date: Sun, 9 Jun 2024 17:12:55 +0900 Subject: [PATCH 08/12] Update pos even if line is skipped Signed-off-by: been.zino --- lib/fluent/plugin/in_tail.rb | 16 ++++++++- test/plugin/in_tail/test_io_handler.rb | 47 +++++++++++++------------- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 953c6cbab4..5b92d955bc 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -566,7 +566,7 @@ def update_watcher(tail_watcher, pe, new_inode) if @follow_inodes && new_inode.nil? # nil inode means the file disappeared, so we only need to stop it. @tails.delete(tail_watcher.path) - # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 + # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 # Because of this problem, log duplication can occur during `rotate_wait`. # Need to set `rotate_wait 0` for a workaround. # Duplication will occur if `refresh_watcher` is called during the `rotate_wait`. @@ -1012,6 +1012,7 @@ def initialize(from_encoding, encoding, log, max_line_size=nil) @eol = "\n".encode(from_encoding).freeze @max_line_size = max_line_size @was_long_line = false + @has_skipped_line = false @log = log end @@ -1047,6 +1048,7 @@ def convert(s) def read_lines(lines) idx = @buffer.index(@eol) + @has_skipped_line = false until idx.nil? # Using freeze and slice is faster than slice! @@ -1064,6 +1066,7 @@ def read_lines(lines) @log.warn "received line length is longer than #{@max_line_size}" @log.debug("skipped line: ") { convert(rbuf).chomp } @was_long_line = false + @has_skipped_line = true next end @@ -1077,12 +1080,17 @@ def read_lines(lines) if is_long_line @buffer.clear @was_long_line = true + @has_skipped_line = true end end def bytesize @buffer.bytesize end + + def has_skipped_line? + @has_skipped_line + end end class IOHandler @@ -1181,6 +1189,7 @@ def handle_notify with_io do |io| begin read_more = false + has_skipped_line = false if !io.nil? && @lines.empty? begin @@ -1195,6 +1204,7 @@ def handle_notify n_lines_before_read = @lines.size @fifo.read_lines(@lines) + has_skipped_line = @fifo.has_skipped_line? group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read) group_watcher_limit = group_watcher&.limit_lines_reached?(@path) @@ -1217,6 +1227,10 @@ def handle_notify end end + if @lines.empty? && has_skipped_line + @watcher.pe.update_pos(io.pos - @fifo.bytesize) + end + unless @lines.empty? if @receive_lines.call(@lines, @watcher) @watcher.pe.update_pos(io.pos - @fifo.bytesize) diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 84d39ceaae..9397006763 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -146,36 +146,35 @@ def create_watcher assert_equal 5, returned_lines[0].size assert_equal 3, returned_lines[1].size end + end - test 'call receive_lines some times when long line(more than 8192) and max_line_size is 4096' do - t = 'line' * (8192 / 8) - text = "#{t}\n" * 8 - t = 'line' * 8 - text += "#{t}\n" * 8 + test 'does not call receive_lines when line_size exceeds max_line_size' do + t = 'x' * (8192) + text = "#{t}\n" - @file.write(text) - @file.close + max_line_size = 8192 - update_pos = 0 + @file.write(text) + @file.close - watcher = create_watcher - stub(watcher).pe do - pe = 'position_file' - stub(pe).read_pos {0} - stub(pe).update_pos { |val| update_pos = val } - pe - end + update_pos = 0 - returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, max_line_size: 4096, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| - returned_lines << lines.dup - true - end + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos {0} + stub(pe).update_pos { |val| update_pos = val } + pe + end - r.on_notify - assert_equal text.bytesize, update_pos - assert_equal 1, returned_lines.size - assert_equal 8, returned_lines[0].size + returned_lines = [] + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| + returned_lines << lines.dup + true end + + r.on_notify + assert_equal text.bytesize, update_pos + assert_equal 0, returned_lines.size end end From 05e7caf493ab07f91b6f58468883518de8a30cd8 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 12 Jun 2024 17:47:30 +0900 Subject: [PATCH 09/12] make has_skipped_line local variable Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 5b92d955bc..6c600847b5 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1012,7 +1012,6 @@ def initialize(from_encoding, encoding, log, max_line_size=nil) @eol = "\n".encode(from_encoding).freeze @max_line_size = max_line_size @was_long_line = false - @has_skipped_line = false @log = log end @@ -1048,7 +1047,7 @@ def convert(s) def read_lines(lines) idx = @buffer.index(@eol) - @has_skipped_line = false + has_skipped_line = false until idx.nil? # Using freeze and slice is faster than slice! @@ -1066,7 +1065,7 @@ def read_lines(lines) @log.warn "received line length is longer than #{@max_line_size}" @log.debug("skipped line: ") { convert(rbuf).chomp } @was_long_line = false - @has_skipped_line = true + has_skipped_line = true next end @@ -1080,17 +1079,14 @@ def read_lines(lines) if is_long_line @buffer.clear @was_long_line = true - @has_skipped_line = true end + + return has_skipped_line end def bytesize @buffer.bytesize end - - def has_skipped_line? - @has_skipped_line - end end class IOHandler @@ -1203,8 +1199,7 @@ def handle_notify @fifo << data n_lines_before_read = @lines.size - @fifo.read_lines(@lines) - has_skipped_line = @fifo.has_skipped_line? + has_skipped_line = @fifo.read_lines(@lines) || has_skipped_line group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read) group_watcher_limit = group_watcher&.limit_lines_reached?(@path) @@ -1227,11 +1222,9 @@ def handle_notify end end - if @lines.empty? && has_skipped_line - @watcher.pe.update_pos(io.pos - @fifo.bytesize) - end - - unless @lines.empty? + if @lines.empty? + @watcher.pe.update_pos(io.pos - @fifo.bytesize) if has_skipped_line + else if @receive_lines.call(@lines, @watcher) @watcher.pe.update_pos(io.pos - @fifo.bytesize) @lines.clear From 6c0371729811635b7aab5d9381be7b4a367d0d13 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 12 Jun 2024 17:50:22 +0900 Subject: [PATCH 10/12] refactor some variable names Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 6c600847b5..dd94560d0b 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1011,7 +1011,7 @@ def initialize(from_encoding, encoding, log, max_line_size=nil) @buffer = ''.force_encoding(from_encoding) @eol = "\n".encode(from_encoding).freeze @max_line_size = max_line_size - @was_long_line = false + @skip_current_line = false @log = log end @@ -1058,13 +1058,13 @@ def read_lines(lines) idx = @buffer.index(@eol) is_long_line = @max_line_size && ( - rbuf.bytesize > @max_line_size || @was_long_line + @skip_current_line || rbuf.bytesize > @max_line_size ) if is_long_line @log.warn "received line length is longer than #{@max_line_size}" @log.debug("skipped line: ") { convert(rbuf).chomp } - @was_long_line = false + @skip_current_line = false has_skipped_line = true next end @@ -1072,13 +1072,13 @@ def read_lines(lines) lines << convert(rbuf) end - is_long_line = @max_line_size && ( - @buffer.bytesize > @max_line_size || @was_long_line + is_long_current_line = @max_line_size && ( + @skip_current_line || @buffer.bytesize > @max_line_size ) - if is_long_line + if is_long_current_line @buffer.clear - @was_long_line = true + @skip_current_line = true end return has_skipped_line From fdf74e5d715b2a769026bb22b13f5e690ea67dcd Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 12 Jun 2024 17:57:29 +0900 Subject: [PATCH 11/12] fix to commit the correct pos to continue processing correctly Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 18 +-- test/plugin/in_tail/test_fifo.rb | 39 ++++++- test/plugin/in_tail/test_io_handler.rb | 151 +++++++++++++++++++++---- 3 files changed, 175 insertions(+), 33 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index dd94560d0b..8861cc015e 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1012,6 +1012,7 @@ def initialize(from_encoding, encoding, log, max_line_size=nil) @eol = "\n".encode(from_encoding).freeze @max_line_size = max_line_size @skip_current_line = false + @skipping_current_line_bytesize = 0 @log = log end @@ -1064,8 +1065,9 @@ def read_lines(lines) if is_long_line @log.warn "received line length is longer than #{@max_line_size}" @log.debug("skipped line: ") { convert(rbuf).chomp } - @skip_current_line = false has_skipped_line = true + @skip_current_line = false + @skipping_current_line_bytesize = 0 next end @@ -1077,14 +1079,16 @@ def read_lines(lines) ) if is_long_current_line - @buffer.clear @skip_current_line = true + @skipping_current_line_bytesize += @buffer.bytesize + @buffer.clear end return has_skipped_line end - def bytesize + def reading_bytesize + return @skipping_current_line_bytesize if @skip_current_line @buffer.bytesize end end @@ -1223,10 +1227,10 @@ def handle_notify end if @lines.empty? - @watcher.pe.update_pos(io.pos - @fifo.bytesize) if has_skipped_line + @watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) if has_skipped_line else if @receive_lines.call(@lines, @watcher) - @watcher.pe.update_pos(io.pos - @fifo.bytesize) + @watcher.pe.update_pos(io.pos - @fifo.reading_bytesize) @lines.clear else read_more = false @@ -1238,12 +1242,12 @@ def handle_notify def open io = Fluent::FileWrapper.open(@path) - io.seek(@watcher.pe.read_pos + @fifo.bytesize) + io.seek(@watcher.pe.read_pos + @fifo.reading_bytesize) @metrics.opened.inc io rescue RangeError io.close if io - raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" + raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.reading_bytesize.to_s(16)}" rescue Errno::EACCES => e @log.warn "#{e}" nil diff --git a/test/plugin/in_tail/test_fifo.rb b/test/plugin/in_tail/test_fifo.rb index ebe0729d49..ebead83b92 100644 --- a/test/plugin/in_tail/test_fifo.rb +++ b/test/plugin/in_tail/test_fifo.rb @@ -124,7 +124,7 @@ class IntailFIFO < Test::Unit::TestCase fifo << text.force_encoding(Encoding::ASCII_8BIT) fifo.read_lines(lines) # The size of remaining buffer (i.e. a line still not having EOL) must not exceed max_line_size. - assert { fifo.bytesize <= max_line_size } + assert { fifo.buffer.bytesize <= max_line_size } end assert_equal expected, lines @@ -142,23 +142,50 @@ class IntailFIFO < Test::Unit::TestCase end end - sub_test_case '#bytesize' do - test 'reutrns buffer size' do + sub_test_case '#reading_bytesize' do + test 'returns buffer size' do fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) text = "test\n" * 3 + 'test' fifo << text - assert_equal text.bytesize, fifo.bytesize + assert_equal text.bytesize, fifo.reading_bytesize lines = [] fifo.read_lines(lines) assert_equal ["test\n", "test\n", "test\n"], lines - assert_equal 'test'.bytesize, fifo.bytesize + assert_equal 'test'.bytesize, fifo.reading_bytesize fifo << "2\n" fifo.read_lines(lines) assert_equal ["test\n", "test\n", "test\n", "test2\n"], lines - assert_equal 0, fifo.bytesize + assert_equal 0, fifo.reading_bytesize + end + + test 'returns the entire line size even if the size is over max_line_size' do + max_line_size = 20 + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size) + lines = [] + + text = "long line still not having EOL" + fifo << text + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal text.bytesize, fifo.reading_bytesize + + text2 = " following texts" + fifo << text2 + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal text.bytesize + text2.bytesize, fifo.reading_bytesize + + text3 = " end of the line\n" + fifo << text3 + fifo.read_lines(lines) + assert_equal [], lines + assert_equal 0, fifo.buffer.bytesize + assert_equal 0, fifo.reading_bytesize end end end diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 9397006763..5174ff9cfb 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -148,33 +148,144 @@ def create_watcher end end - test 'does not call receive_lines when line_size exceeds max_line_size' do - t = 'x' * (8192) - text = "#{t}\n" + sub_test_case 'max_line_size' do + test 'does not call receive_lines when line_size exceeds max_line_size' do + t = 'x' * (8192) + text = "#{t}\n" - max_line_size = 8192 + max_line_size = 8192 - @file.write(text) - @file.close + @file.write(text) + @file.close - update_pos = 0 + update_pos = 0 - watcher = create_watcher - stub(watcher).pe do - pe = 'position_file' - stub(pe).read_pos {0} - stub(pe).update_pos { |val| update_pos = val } - pe + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos {0} + stub(pe).update_pos { |val| update_pos = val } + pe + end + + returned_lines = [] + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| + returned_lines << lines.dup + true + end + + r.on_notify + assert_equal text.bytesize, update_pos + assert_equal 0, returned_lines.size end - returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher| - returned_lines << lines.dup - true + data( + "open_on_every_update false" => false, + "open_on_every_update true" => true, + ) + test 'manage pos correctly if a long line not having EOL occurs' do |open_on_every_update| + max_line_size = 20 + returned_lines = [] + pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos { pos } + stub(pe).update_pos { |val| pos = val } + pe + end + + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + short_line = "short line\n" + long_lines = [ + "long line still not having EOL", + " end of the line\n", + ] + + @file.write(short_line) + @file.write(long_lines[0]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + assert_equal short_line.bytesize, pos + + @file.write(long_lines[1]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + expected_size = short_line.bytesize + long_lines[0..1].map{|l| l.bytesize}.sum + assert_equal expected_size, pos + + io_handler.close end - r.on_notify - assert_equal text.bytesize, update_pos - assert_equal 0, returned_lines.size + data( + "open_on_every_update false" => false, + "open_on_every_update true" => true, + ) + test 'discards a subsequent data in a long line even if restarting occurs between' do |open_on_every_update| + max_line_size = 20 + returned_lines = [] + pos = 0 + + watcher = create_watcher + stub(watcher).pe do + pe = 'position_file' + stub(pe).read_pos { pos } + stub(pe).update_pos { |val| pos = val } + pe + end + + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + short_line = "short line\n" + long_lines = [ + "long line still not having EOL", + " end of the line\n", + ] + + @file.write(short_line) + @file.write(long_lines[0]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + + io_handler.close + io_handler = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new( + watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, + max_line_size: max_line_size, log: $log, open_on_every_update: open_on_every_update, + metrics: @metrics + ) do |lines, _watcher| + returned_lines << lines.dup + true + end + + @file.write(long_lines[1]) + @file.flush + io_handler.on_notify + + assert_equal [[short_line]], returned_lines + + io_handler.close + end end end From 955cee5681f2c1cb375364a421ac1bdc2364e811 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 8 Jul 2024 12:14:02 +0900 Subject: [PATCH 12/12] improve debug message Signed-off-by: Daijiro Fukuda Co-authored-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 8861cc015e..6a83075cc5 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1064,7 +1064,11 @@ def read_lines(lines) if is_long_line @log.warn "received line length is longer than #{@max_line_size}" - @log.debug("skipped line: ") { convert(rbuf).chomp } + if @skip_current_line + @log.debug("The continuing line is finished. Finally discarded data: ") { convert(rbuf).chomp } + else + @log.debug("skipped line: ") { convert(rbuf).chomp } + end has_skipped_line = true @skip_current_line = false @skipping_current_line_bytesize = 0 @@ -1079,6 +1083,11 @@ def read_lines(lines) ) if is_long_current_line + @log.debug( + "The continuing current line length is longer than #{@max_line_size}." + + " The received data will be discarded until this line is finished." + + " Discarded data: " + ) { convert(@buffer).chomp } @skip_current_line = true @skipping_current_line_bytesize += @buffer.bytesize @buffer.clear