-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce tailwatcher argument #2832
Changes from all commits
efa5d56
948617a
b060307
c8abc09
fdf9d71
c80ca87
26ae11a
0ede8e7
cc37027
9940869
269ea68
857d9cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -317,19 +317,33 @@ def refresh_watchers | |
end | ||
|
||
def setup_watcher(path, pe) | ||
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil | ||
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) | ||
tw.attach do |watcher| | ||
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger | ||
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger | ||
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil | ||
tw = TailWatcher.new(path, pe, log, @read_from_head, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) | ||
|
||
if @enable_watch_timer | ||
tt = TimerTrigger.new(1, log) { tw.on_notify } | ||
tw.register_watcher(tt) | ||
end | ||
|
||
if @enable_stat_watcher | ||
tt = StatWatcher.new(path, log) { tw.on_notify } | ||
tw.register_watcher(tt) | ||
end | ||
|
||
tw.on_notify | ||
|
||
tw.watchers.each do |watcher| | ||
event_loop_attach(watcher) | ||
end | ||
|
||
tw | ||
rescue => e | ||
if tw | ||
tw.detach { |watcher| | ||
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger | ||
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger | ||
} | ||
tw.watchers.each do |watcher| | ||
event_loop_detach(watcher) | ||
end | ||
|
||
tw.detach | ||
tw.close | ||
end | ||
raise e | ||
|
@@ -384,6 +398,8 @@ def close_watcher_handles | |
|
||
# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. | ||
def update_watcher(path, pe) | ||
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds") | ||
|
||
if @pf | ||
unless pe.read_inode == @pf[path].read_inode | ||
log.debug "Skip update_watcher because watcher has been already updated by other inotify event" | ||
|
@@ -400,10 +416,11 @@ def update_watcher(path, pe) | |
# so adding close_io argument to avoid this problem. | ||
# At shutdown, IOHandler's io will be released automatically after detached the event loop | ||
def detach_watcher(tw, close_io = true) | ||
tw.detach { |watcher| | ||
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger | ||
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger | ||
} | ||
tw.watchers.each do |watcher| | ||
event_loop_detach(watcher) | ||
end | ||
tw.detach | ||
|
||
tw.close if close_io | ||
flush_buffer(tw) | ||
if tw.unwatched && @pf | ||
|
@@ -420,7 +437,7 @@ def detach_watcher_after_rotate_wait(tw) | |
end | ||
|
||
def flush_buffer(tw) | ||
if lb = tw.line_buffer | ||
if lb = tw.line_buffer_timer_flusher&.line_buffer | ||
lb.chomp! | ||
@parser.parse(lb) { |time, record| | ||
if time && record | ||
|
@@ -490,11 +507,12 @@ def parse_singleline(lines, tail_watcher) | |
es | ||
end | ||
|
||
# No need to check if line_buffer_timer_flusher is nil, since line_buffer_timer_flusher should exist | ||
def parse_multilines(lines, tail_watcher) | ||
lb = tail_watcher.line_buffer | ||
lb = tail_watcher.line_buffer_timer_flusher.line_buffer | ||
es = Fluent::MultiEventStream.new | ||
if @parser.has_firstline? | ||
tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher | ||
tail_watcher.line_buffer_timer_flusher.reset_timer | ||
lines.each { |line| | ||
if @parser.firstline?(line) | ||
if lb | ||
|
@@ -524,59 +542,75 @@ def parse_multilines(lines, tail_watcher) | |
} | ||
end | ||
end | ||
tail_watcher.line_buffer = lb | ||
tail_watcher.line_buffer_timer_flusher.line_buffer = lb | ||
es | ||
end | ||
|
||
class StatWatcher < Coolio::StatWatcher | ||
def initialize(path, log, &callback) | ||
@callback = callback | ||
@log = log | ||
super(path) | ||
end | ||
|
||
def on_change(prev, cur) | ||
@callback.call | ||
rescue | ||
@log.error $!.to_s | ||
@log.error_backtrace | ||
end | ||
end | ||
|
||
class TimerTrigger < Coolio::TimerWatcher | ||
def initialize(interval, log, &callback) | ||
@log = log | ||
@callback = callback | ||
super(interval, true) | ||
end | ||
|
||
def on_timer | ||
@callback.call | ||
rescue => e | ||
@log.error e.to_s | ||
@log.error_backtrace | ||
end | ||
end | ||
|
||
class TailWatcher | ||
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) | ||
def initialize(path, pe, log, read_from_head, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) | ||
@path = path | ||
@rotate_wait = rotate_wait | ||
@pe = pe || MemoryPositionEntry.new | ||
@read_from_head = read_from_head | ||
@enable_watch_timer = enable_watch_timer | ||
@enable_stat_watcher = enable_stat_watcher | ||
@read_lines_limit = read_lines_limit | ||
@receive_lines = receive_lines | ||
@update_watcher = update_watcher | ||
|
||
@stat_trigger = @enable_stat_watcher ? StatWatcher.new(path, log, &method(:on_notify)) : nil | ||
@timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil | ||
|
||
@rotate_handler = RotateHandler.new(log, &method(:on_rotate)) | ||
@io_handler = nil | ||
@log = log | ||
|
||
@line_buffer = nil | ||
@line_buffer_timer_flusher = line_buffer_timer_flusher | ||
@from_encoding = from_encoding | ||
@encoding = encoding | ||
@open_on_every_update = open_on_every_update | ||
@watchers = [] | ||
end | ||
|
||
attr_reader :path | ||
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update | ||
attr_reader :from_encoding, :encoding | ||
attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher | ||
attr_accessor :timer_trigger | ||
attr_accessor :line_buffer, :line_buffer_timer_flusher | ||
attr_reader :pe | ||
attr_reader :line_buffer_timer_flusher | ||
attr_accessor :unwatched # This is used for removing position entry from PositionFile | ||
attr_reader :watchers | ||
|
||
def tag | ||
@parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') | ||
end | ||
|
||
def wrap_receive_lines(lines) | ||
@receive_lines.call(lines, self) | ||
end | ||
|
||
def attach | ||
on_notify | ||
yield self | ||
def register_watcher(watcher) | ||
@watchers << watcher | ||
end | ||
|
||
def detach | ||
yield self | ||
@io_handler.on_notify if @io_handler | ||
end | ||
|
||
|
@@ -630,7 +664,7 @@ def on_rotate(stat) | |
pos = @read_from_head ? 0 : fsize | ||
@pe.update(inode, pos) | ||
end | ||
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) | ||
@io_handler = io_handler | ||
else | ||
@io_handler = NullIOHandler.new | ||
end | ||
|
@@ -655,18 +689,21 @@ def on_rotate(stat) | |
watcher_needs_update = true | ||
end | ||
|
||
log_msg = "detected rotation of #{@path}" | ||
log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists | ||
@log.info log_msg | ||
|
||
if watcher_needs_update | ||
@update_watcher.call(@path, swap_state(@pe)) | ||
else | ||
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) | ||
@log.info "detected rotation of #{@path}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this line needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see. |
||
@io_handler = io_handler | ||
end | ||
end | ||
end | ||
|
||
def io_handler | ||
IOHandler.new(self, path: @path, log: @log, read_lines_limit: @read_lines_limit, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding) do |lines| | ||
@receive_lines.call(lines, self) | ||
end | ||
end | ||
|
||
def swap_state(pe) | ||
# Use MemoryPositionEntry for rotated file temporary | ||
mpe = MemoryPositionEntry.new | ||
|
@@ -675,37 +712,6 @@ def swap_state(pe) | |
pe # This pe will be updated in on_rotate after TailWatcher is initialized | ||
end | ||
|
||
class TimerTrigger < Coolio::TimerWatcher | ||
def initialize(interval, log, &callback) | ||
@callback = callback | ||
@log = log | ||
super(interval, true) | ||
end | ||
|
||
def on_timer | ||
@callback.call | ||
rescue => e | ||
@log.error e.to_s | ||
@log.error_backtrace | ||
end | ||
end | ||
|
||
class StatWatcher < Coolio::StatWatcher | ||
def initialize(path, log, &callback) | ||
@callback = callback | ||
@log = log | ||
super(path) | ||
end | ||
|
||
def on_change(prev, cur) | ||
@callback.call | ||
rescue | ||
# TODO log? | ||
@log.error $!.to_s | ||
@log.error_backtrace | ||
end | ||
end | ||
|
||
class FIFO | ||
def initialize(from_encoding, encoding) | ||
@from_encoding = from_encoding | ||
|
@@ -765,15 +771,20 @@ def bytesize | |
end | ||
|
||
class IOHandler | ||
def initialize(watcher, &receive_lines) | ||
def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) | ||
@watcher = watcher | ||
@path = path | ||
@read_lines_limit = read_lines_limit | ||
@receive_lines = receive_lines | ||
@fifo = FIFO.new(@watcher.from_encoding || Encoding::ASCII_8BIT, @watcher.encoding || Encoding::ASCII_8BIT) | ||
@open_on_every_update = open_on_every_update | ||
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) | ||
@iobuf = ''.force_encoding('ASCII-8BIT') | ||
@lines = [] | ||
@io = nil | ||
@notify_mutex = Mutex.new | ||
@watcher.log.info "following tail of #{@watcher.path}" | ||
@log = log | ||
|
||
@log.info "following tail of #{@path}" | ||
end | ||
|
||
def on_notify | ||
|
@@ -790,7 +801,7 @@ def handle_notify | |
while true | ||
@fifo << io.readpartial(8192, @iobuf) | ||
@fifo.read_lines(@lines) | ||
if @lines.size >= @watcher.read_lines_limit | ||
if @lines.size >= @read_lines_limit | ||
# not to use too much memory in case the file is very large | ||
read_more = true | ||
break | ||
|
@@ -824,18 +835,18 @@ def opened? | |
end | ||
|
||
def open | ||
io = Fluent::FileWrapper.open(@watcher.path) | ||
io = Fluent::FileWrapper.open(@path) | ||
io.seek(@watcher.pe.read_pos + @fifo.bytesize) | ||
io | ||
rescue RangeError | ||
io.close if io | ||
raise WatcherSetupError, "seek error with #{@watcher.path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" | ||
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" | ||
rescue Errno::ENOENT | ||
nil | ||
end | ||
|
||
def with_io | ||
if @watcher.open_on_every_update | ||
if @open_on_every_update | ||
io = open | ||
begin | ||
yield io | ||
|
@@ -850,8 +861,8 @@ def with_io | |
close | ||
raise e | ||
rescue | ||
@watcher.log.error $!.to_s | ||
@watcher.log.error_backtrace | ||
@log.error $!.to_s | ||
@log.error_backtrace | ||
close | ||
end | ||
end | ||
|
@@ -903,24 +914,31 @@ def on_notify(stat) | |
end | ||
|
||
class LineBufferTimerFlusher | ||
attr_accessor :line_buffer | ||
|
||
def initialize(log, flush_interval, &flush_method) | ||
@log = log | ||
@flush_interval = flush_interval | ||
@flush_method = flush_method | ||
@start = nil | ||
@line_buffer = nil | ||
end | ||
|
||
def on_notify(tw) | ||
if @start && @flush_interval | ||
if Time.now - @start >= @flush_interval | ||
@flush_method.call(tw) | ||
tw.line_buffer = nil | ||
@start = nil | ||
end | ||
unless @start && @flush_method | ||
return | ||
end | ||
|
||
if Time.now - @start >= @flush_interval | ||
@flush_method.call(tw) | ||
@line_buffer = nil | ||
@start = nil | ||
end | ||
end | ||
|
||
def reset_timer | ||
return unless @flush_interval | ||
|
||
@start = Time.now | ||
end | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing
log.info
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch…thank you 🙏 9940869