From 73a952f29b790857bdc3f184224325f451970ae6 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 28 Feb 2018 21:38:04 +0900 Subject: [PATCH 1/2] buf_file: Skip and delete broken files during resume. fix #1760 --- lib/fluent/plugin/buf_file.rb | 14 +++++++++++++- lib/fluent/plugin/buffer/file_chunk.rb | 21 +++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index d9426c31ec..761c1de5d1 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -141,7 +141,13 @@ def resume next end - chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata + begin + chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata + rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e + handle_broken_files(path, mode, e) + next + end + case chunk.state when :staged stage[chunk.metadata] = chunk @@ -167,6 +173,12 @@ def generate_chunk(metadata) return chunk end + + def handle_broken_files(path, mode, e) + log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message + # After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink. + File.unlink(path, path + '.meta') rescue nil + end end end end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 38108d2219..9f2708ec3b 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -22,6 +22,8 @@ module Fluent module Plugin class Buffer class FileChunk < Chunk + class FileChunkError < StandardError; end + ### buffer path user specified : /path/to/directory/user_specified_prefix.*.log ### buffer chunk path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log ### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta @@ -309,6 +311,8 @@ def load_existing_staged_chunk(path) # staging buffer chunk without metadata is classic buffer chunk file # and it should be enqueued immediately if File.exist?(@meta_path) + raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero? + @chunk = File.open(@path, 'rb+') @chunk.set_encoding(Encoding::ASCII_8BIT) @chunk.sync = true @@ -319,7 +323,13 @@ def load_existing_staged_chunk(path) @meta.set_encoding(Encoding::ASCII_8BIT) @meta.sync = true @meta.binmode - restore_metadata(@meta.read) + begin + restore_metadata(@meta.read) + rescue => e + @chunk.close + @meta.close + raise FileChunkError, "staged meta file is broken. #{e.message}" + end @meta.seek(0, IO::SEEK_SET) @state = :staged @@ -345,6 +355,8 @@ def load_existing_staged_chunk(path) def load_existing_enqueued_chunk(path) @path = path + raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero? + @chunk = File.open(@path, 'rb') @chunk.set_encoding(Encoding::ASCII_8BIT) @chunk.binmode @@ -354,7 +366,12 @@ def load_existing_enqueued_chunk(path) @meta_path = @path + '.meta' if File.readable?(@meta_path) - restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read }) + begin + restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read }) + rescue => e + @chunk.close + raise FileChunkError, "enqueued meta file is broken. #{e.message}" + end else restore_metadata_partially(@chunk) end From cb70a44caaeccbdb7f2b9f8c60a9178d572419f7 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 2 Mar 2018 16:32:29 +0900 Subject: [PATCH 2/2] buf_file: Add test for broken files --- test/plugin/test_buf_file.rb | 124 +++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 9d29ed0d66..3628293091 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -840,4 +840,128 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) assert File.exist?(@not_chunk) end end + + sub_test_case 'there are existing broken file chunks' do + setup do + @bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__) + FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir) + @bufpath = File.join(@bufdir, 'broken_test.*.log') + + Fluent::Test.setup + @d = FluentPluginFileBufferTest::DummyOutputPlugin.new + @p = Fluent::Plugin::FileBuffer.new + @p.owner = @d + @p.configure(config_element('buffer', '', {'path' => @bufpath})) + end + + teardown do + if @p + @p.stop unless @p.stopped? + @p.before_shutdown unless @p.before_shutdown? + @p.shutdown unless @p.shutdown? + @p.after_shutdown unless @p.after_shutdown? + @p.close unless @p.closed? + @p.terminate unless @p.terminated? + end + if @bufdir + Dir.glob(File.join(@bufdir, '*')).each do |path| + next if ['.', '..'].include?(File.basename(path)) + File.delete(path) + end + end + end + + def create_first_chunk(mode) + cid = Fluent::UniqueId.generate + path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log") + File.open(path, 'wb') do |f| + f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n" + end + write_metadata( + path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i), + 4, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i + ) + + return cid, path + end + + def create_second_chunk(mode) + cid = Fluent::UniqueId.generate + path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log") + File.open(path, 'wb') do |f| + f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" + end + write_metadata( + path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i), + 3, event_time('2016-04-17 14:01:00 -0700').to_i, event_time('2016-04-17 14:01:25 -0700').to_i + ) + + return cid, path + end + + def compare_staged_chunk(staged, id, time, num, mode) + assert_equal 1, staged.size + m = metadata(timekey: event_time(time).to_i) + assert_equal id, staged[m].unique_id + assert_equal num, staged[m].size + assert_equal mode, staged[m].state + end + + def compare_queued_chunk(queued, id, num, mode) + assert_equal 1, queued.size + assert_equal id, queued[0].unique_id + assert_equal num, queued[0].size + assert_equal mode, queued[0].state + end + + def compare_log(plugin, msg) + logs = plugin.log.out.logs + assert { logs.any? { |log| log.include?(msg) } } + end + + test '#resume ignores staged empty chunk' do + _, p1 = create_first_chunk('b') + File.open(p1, 'wb') { |f| } # create staged empty chunk file + c2id, _ = create_second_chunk('b') + + @p.start + compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged) + compare_log(@p, 'staged file chunk is empty') + end + + test '#resume ignores staged broken metadata' do + c1id, _ = create_first_chunk('b') + _, p2 = create_second_chunk('b') + File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file + + @p.start + compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged) + compare_log(@p, 'staged meta file is broken') + end + + test '#resume ignores enqueued empty chunk' do + _, p1 = create_first_chunk('q') + File.open(p1, 'wb') { |f| } # create enqueued empty chunk file + c2id, _ = create_second_chunk('q') + + @p.start + compare_queued_chunk(@p.queue, c2id, 3, :queued) + compare_log(@p, 'enqueued file chunk is empty') + end + + test '#resume ignores enqueued broken metadata' do + c1id, _ = create_first_chunk('q') + _, p2 = create_second_chunk('q') + File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file + + @p.start + compare_queued_chunk(@p.queue, c1id, 4, :queued) + compare_log(@p, 'enqueued meta file is broken') + end + end end