Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buf_file: Skip and delete broken file chunks during resume. fix #1760 #1874

Merged
merged 2 commits into from
Mar 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ def resume
next
end

chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
begin
chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e
handle_broken_files(path, mode, e)
next
end

case chunk.state
when :staged
stage[chunk.metadata] = chunk
Expand All @@ -167,6 +173,12 @@ def generate_chunk(metadata)

return chunk
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backup_dir's issue is here: #1856

File.unlink(path, path + '.meta') rescue nil
end
end
end
end
21 changes: 19 additions & 2 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ module Fluent
module Plugin
class Buffer
class FileChunk < Chunk
class FileChunkError < StandardError; end

### buffer path user specified : /path/to/directory/user_specified_prefix.*.log
### buffer chunk path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log
### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta
Expand Down Expand Up @@ -309,6 +311,8 @@ def load_existing_staged_chunk(path)
# staging buffer chunk without metadata is classic buffer chunk file
# and it should be enqueued immediately
if File.exist?(@meta_path)
raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb+')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.sync = true
Expand All @@ -319,7 +323,13 @@ def load_existing_staged_chunk(path)
@meta.set_encoding(Encoding::ASCII_8BIT)
@meta.sync = true
@meta.binmode
restore_metadata(@meta.read)
begin
restore_metadata(@meta.read)
rescue => e
@chunk.close
@meta.close
raise FileChunkError, "staged meta file is broken. #{e.message}"
end
@meta.seek(0, IO::SEEK_SET)

@state = :staged
Expand All @@ -345,6 +355,8 @@ def load_existing_staged_chunk(path)

def load_existing_enqueued_chunk(path)
@path = path
raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.binmode
Expand All @@ -354,7 +366,12 @@ def load_existing_enqueued_chunk(path)

@meta_path = @path + '.meta'
if File.readable?(@meta_path)
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
begin
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
rescue => e
@chunk.close
raise FileChunkError, "enqueued meta file is broken. #{e.message}"
end
else
restore_metadata_partially(@chunk)
end
Expand Down
124 changes: 124 additions & 0 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,128 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
assert File.exist?(@not_chunk)
end
end

sub_test_case 'there are existing broken file chunks' do
setup do
@bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__)
FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir)
@bufpath = File.join(@bufdir, 'broken_test.*.log')

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
end

teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end

def create_first_chunk(mode)
cid = Fluent::UniqueId.generate
path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log")
File.open(path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i),
4, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i
)

return cid, path
end

def create_second_chunk(mode)
cid = Fluent::UniqueId.generate
path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log")
File.open(path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i),
3, event_time('2016-04-17 14:01:00 -0700').to_i, event_time('2016-04-17 14:01:25 -0700').to_i
)

return cid, path
end

def compare_staged_chunk(staged, id, time, num, mode)
assert_equal 1, staged.size
m = metadata(timekey: event_time(time).to_i)
assert_equal id, staged[m].unique_id
assert_equal num, staged[m].size
assert_equal mode, staged[m].state
end

def compare_queued_chunk(queued, id, num, mode)
assert_equal 1, queued.size
assert_equal id, queued[0].unique_id
assert_equal num, queued[0].size
assert_equal mode, queued[0].state
end

def compare_log(plugin, msg)
logs = plugin.log.out.logs
assert { logs.any? { |log| log.include?(msg) } }
end

test '#resume ignores staged empty chunk' do
_, p1 = create_first_chunk('b')
File.open(p1, 'wb') { |f| } # create staged empty chunk file
c2id, _ = create_second_chunk('b')

@p.start
compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged)
compare_log(@p, 'staged file chunk is empty')
end

test '#resume ignores staged broken metadata' do
c1id, _ = create_first_chunk('b')
_, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file

@p.start
compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
end

test '#resume ignores enqueued empty chunk' do
_, p1 = create_first_chunk('q')
File.open(p1, 'wb') { |f| } # create enqueued empty chunk file
c2id, _ = create_second_chunk('q')

@p.start
compare_queued_chunk(@p.queue, c2id, 3, :queued)
compare_log(@p, 'enqueued file chunk is empty')
end

test '#resume ignores enqueued broken metadata' do
c1id, _ = create_first_chunk('q')
_, p2 = create_second_chunk('q')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file

@p.start
compare_queued_chunk(@p.queue, c1id, 4, :queued)
compare_log(@p, 'enqueued meta file is broken')
end
end
end