Skip to content

Commit

Permalink
buf_file: Skip and delete broken files during resume. fix fluent#1760
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly authored and okkez committed Mar 27, 2018
1 parent d2e41e6 commit 5164aa3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
14 changes: 13 additions & 1 deletion lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ def resume
next
end

chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
begin
chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e
handle_broken_files(path, mode, e)
next
end

case chunk.state
when :staged
stage[chunk.metadata] = chunk
Expand All @@ -167,6 +173,12 @@ def generate_chunk(metadata)

return chunk
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
File.unlink(path, path + '.meta') rescue nil
end
end
end
end
21 changes: 19 additions & 2 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ module Fluent
module Plugin
class Buffer
class FileChunk < Chunk
class FileChunkError < StandardError; end

### buffer path user specified : /path/to/directory/user_specified_prefix.*.log
### buffer chunk path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log
### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta
Expand Down Expand Up @@ -309,6 +311,8 @@ def load_existing_staged_chunk(path)
# staging buffer chunk without metadata is classic buffer chunk file
# and it should be enqueued immediately
if File.exist?(@meta_path)
raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb+')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.sync = true
Expand All @@ -319,7 +323,13 @@ def load_existing_staged_chunk(path)
@meta.set_encoding(Encoding::ASCII_8BIT)
@meta.sync = true
@meta.binmode
restore_metadata(@meta.read)
begin
restore_metadata(@meta.read)
rescue => e
@chunk.close
@meta.close
raise FileChunkError, "staged meta file is broken. #{e.message}"
end
@meta.seek(0, IO::SEEK_SET)

@state = :staged
Expand All @@ -345,6 +355,8 @@ def load_existing_staged_chunk(path)

def load_existing_enqueued_chunk(path)
@path = path
raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.binmode
Expand All @@ -354,7 +366,12 @@ def load_existing_enqueued_chunk(path)

@meta_path = @path + '.meta'
if File.readable?(@meta_path)
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
begin
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
rescue => e
@chunk.close
raise FileChunkError, "enqueued meta file is broken. #{e.message}"
end
else
restore_metadata_partially(@chunk)
end
Expand Down

0 comments on commit 5164aa3

Please sign in to comment.