Skip to content

Commit

Permalink
Use filter_with_time API
Browse files Browse the repository at this point in the history
Default filter_stream implementation catches exceptions and
exceptions are routed to error stream.
So warning tests are changed to log check way.
  • Loading branch information
repeatedly committed Sep 2, 2016
1 parent d5062ae commit 030dfd8
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 121 deletions.
84 changes: 45 additions & 39 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,60 +58,66 @@ def configure(conf)
self
end

def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time, record|
raw_value = record[@key_name]
if raw_value.nil?
log.warn "#{@key_name} does not exist" unless @ignore_key_not_exist
new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data
next
FAILED_RESULT = [nil, nil].freeze # reduce allocation cost

def filter_with_time(tag, time, record)
raw_value = record[@key_name]
if raw_value.nil?
log.warn "#{@key_name} does not exist" unless @ignore_key_not_exist
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
return FAILED_RESULT
end
end
begin
@parser.parse(raw_value) do |t, values|
if values
t ||= time
r = handle_parsed(tag, record, t, values)
return t, r
else
log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
return t, r
else
return FAILED_RESULT
end
end
end
begin
@parser.parse(raw_value) do |t, values|
rescue Fluent::TextParser::ParserError => e
log.warn e.message unless @suppress_parse_error_log
rescue ArgumentError => e
if @replace_invalid_sequence
unless e.message.index("invalid byte sequence in") == 0
raise
end
replaced_string = replace_invalid_byte(raw_value)
@parser.parse(replaced_string) do |t, values|
if values
t ||= time
r = handle_parsed(tag, record, t, values)
new_es.add(t, r)
return t, r
else
log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
new_es.add(t, r)
end
end
end
rescue Fluent::TextParser::ParserError => e
log.warn e.message unless @suppress_parse_error_log
rescue ArgumentError => e
if @replace_invalid_sequence
unless e.message.index("invalid byte sequence in") == 0
raise
end
replaced_string = replace_invalid_byte(raw_value)
@parser.parse(replaced_string) do |t, values|
if values
t ||= time
r = handle_parsed(tag, record, t, values)
new_es.add(t, r)
return t, r
else
log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
new_es.add(t, r)
end
return FAILED_RESULT
end
end
else
raise
end
rescue => e
log.warn "parse failed #{e.message}" unless @suppress_parse_error_log
else
raise
end
rescue => e
log.warn "parse failed #{e.message}" unless @suppress_parse_error_log
return FAILED_RESULT
end
new_es
end

private
Expand Down
134 changes: 52 additions & 82 deletions test/plugin/test_filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,13 @@ def test_filter_invalid_byte
invalid_utf8 = "\xff".force_encoding('UTF-8')

d = create_driver(CONFIG_NOT_REPLACE)
assert_raise(ArgumentError) {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => invalid_utf8})
end
}
d.run(shutdown: false) do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => invalid_utf8})
end
error_event = d.error_events.first
assert_equal "test", error_event[0]
assert_instance_of ArgumentError, error_event[3]
d.instance_shutdown

d = create_driver(CONFIG_INVALID_BYTE)
assert_nothing_raised {
Expand Down Expand Up @@ -564,129 +566,97 @@ def test_filter_key_not_exist
end

# suppress_parse_error_log test
CONFIG_DISABELED_SUPPRESS_PARSE_ERROR_LOG = %[
tag hogelog
format /^col1=(?<col1>.+) col2=(?<col2>.+)$/
key_name message
suppress_parse_error_log false
]
CONFIG_ENABELED_SUPPRESS_PARSE_ERROR_LOG = %[
tag hogelog
format /^col1=(?<col1>.+) col2=(?<col2>.+)$/
key_name message
suppress_parse_error_log true
]
CONFIG_DEFAULT_SUPPRESS_PARSE_ERROR_LOG = %[
tag hogelog
format /^col1=(?<col1>.+) col2=(?<col2>.+)$/
key_name message
]

INVALID_MESSAGE = 'foo bar'
VALID_MESSAGE = 'col1=foo col2=bar'

# if call warn() raise exception
class DummyLoggerWarnedException < StandardError; end
class DummyLogger
def reset
end
def warn(message)
raise DummyLoggerWarnedException
end
end

def swap_logger(instance)
raise "use with block" unless block_given?
dummy = DummyLogger.new
saved_logger = instance.log
instance.log = dummy
restore = lambda { instance.log = saved_logger }

yield

restore.call
end

def test_parser_error_warning
d = create_driver(CONFIG_INVALID_TIME_VALUE)
swap_logger(d.instance) do
assert_raise(DummyLoggerWarnedException) {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
end
}
d.run(shutdown: false) do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
end
assert_match(/\[warn\]: invalid time value:/, d.instance.log.logs.first)
d.instance_shutdown
end

CONFIG_DEFAULT_SUPPRESS_PARSE_ERROR_LOG = %[
tag hogelog
format /^col1=(?<col1>.+) col2=(?<col2>.+)$/
key_name message
]

class DefaultSuppressParseErrorLogTest < self
setup do
# default(disabled) 'suppress_parse_error_log' is not specify
@d = create_driver(CONFIG_DEFAULT_SUPPRESS_PARSE_ERROR_LOG)
end

def test_raise_exception
swap_logger(@d.instance) do
assert_raise(DummyLoggerWarnedException) {
@d.run do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
end
}
@d.run(shutdown: false) do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
end
assert_match(/\[warn\]: pattern not match with data/, @d.instance.log.logs.first)
@d.instance_shutdown
end

def test_nothing_raised
swap_logger(@d.instance) do
assert_nothing_raised {
@d.run do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE})
end
}
@d.run(shutdown: false) do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE})
end
assert_empty @d.instance.log.logs
@d.instance_shutdown
end
end

CONFIG_DISABELED_SUPPRESS_PARSE_ERROR_LOG = %[
tag hogelog
format /^col1=(?<col1>.+) col2=(?<col2>.+)$/
key_name message
suppress_parse_error_log false
]

class DisabledSuppressParseErrorLogTest < self
setup do
# disabled 'suppress_parse_error_log'
@d = create_driver(CONFIG_DISABELED_SUPPRESS_PARSE_ERROR_LOG)
end

def test_raise_exception
swap_logger(@d.instance) do
assert_raise(DummyLoggerWarnedException) {
@d.run do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
end
}
@d.run(shutdown: false) do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
end
assert_match(/\[warn\]: pattern not match with data/, @d.instance.log.logs.first)
@d.instance_shutdown
end

def test_nothing_raised
swap_logger(@d.instance) do
assert_nothing_raised {
@d.run do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE})
end
}
@d.run(shutdown: false) do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE})
end
assert_empty @d.instance.log.logs
@d.instance_shutdown
end
end

CONFIG_ENABELED_SUPPRESS_PARSE_ERROR_LOG = %[
tag hogelog
format /^col1=(?<col1>.+) col2=(?<col2>.+)$/
key_name message
suppress_parse_error_log true
]

class EnabledSuppressParseErrorLogTest < self
setup do
# enabled 'suppress_parse_error_log'
@d = create_driver(CONFIG_ENABELED_SUPPRESS_PARSE_ERROR_LOG)
end

def test_nothing_raised
swap_logger(@d.instance) do
assert_nothing_raised {
@d.run do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE})
end
}
@d.run(shutdown: false) do
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
@d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE})
end
assert_empty @d.instance.log.logs
@d.instance_shutdown
end
end
end

0 comments on commit 030dfd8

Please sign in to comment.