Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buf_file: Skip and delete broken file chunks during resume. fix #1760 #1874

Merged
merged 2 commits into from
Mar 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ def resume
next
end

chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
begin
chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e
handle_broken_files(path, mode, e)
next
end

case chunk.state
when :staged
stage[chunk.metadata] = chunk
Expand All @@ -167,6 +173,12 @@ def generate_chunk(metadata)

return chunk
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backup_dir's issue is here: #1856

File.unlink(path, path + '.meta') rescue nil
end
end
end
end
21 changes: 19 additions & 2 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ module Fluent
module Plugin
class Buffer
class FileChunk < Chunk
class FileChunkError < StandardError; end

### buffer path user specified : /path/to/directory/user_specified_prefix.*.log
### buffer chunk path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log
### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta
Expand Down Expand Up @@ -309,6 +311,8 @@ def load_existing_staged_chunk(path)
# staging buffer chunk without metadata is classic buffer chunk file
# and it should be enqueued immediately
if File.exist?(@meta_path)
raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb+')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.sync = true
Expand All @@ -319,7 +323,13 @@ def load_existing_staged_chunk(path)
@meta.set_encoding(Encoding::ASCII_8BIT)
@meta.sync = true
@meta.binmode
restore_metadata(@meta.read)
begin
restore_metadata(@meta.read)
rescue => e
@chunk.close
@meta.close
raise FileChunkError, "staged meta file is broken. #{e.message}"
end
@meta.seek(0, IO::SEEK_SET)

@state = :staged
Expand All @@ -345,6 +355,8 @@ def load_existing_staged_chunk(path)

def load_existing_enqueued_chunk(path)
@path = path
raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.binmode
Expand All @@ -354,7 +366,12 @@ def load_existing_enqueued_chunk(path)

@meta_path = @path + '.meta'
if File.readable?(@meta_path)
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
begin
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
rescue => e
@chunk.close
raise FileChunkError, "enqueued meta file is broken. #{e.message}"
end
else
restore_metadata_partially(@chunk)
end
Expand Down