diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index f2b4426bf0..3c73f724c5 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -58,60 +58,66 @@ def configure(conf) self end - def filter_stream(tag, es) - new_es = Fluent::MultiEventStream.new - es.each do |time, record| - raw_value = record[@key_name] - if raw_value.nil? - log.warn "#{@key_name} does not exist" unless @ignore_key_not_exist - new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data - next + FAILED_RESULT = [nil, nil].freeze # reduce allocation cost + + def filter_with_time(tag, time, record) + raw_value = record[@key_name] + if raw_value.nil? + log.warn "#{@key_name} does not exist" unless @ignore_key_not_exist + if @reserve_data + return time, handle_parsed(tag, record, time, {}) + else + return FAILED_RESULT + end + end + begin + @parser.parse(raw_value) do |t, values| + if values + t ||= time + r = handle_parsed(tag, record, t, values) + return t, r + else + log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log + if @reserve_data + t = time + r = handle_parsed(tag, record, time, {}) + return t, r + else + return FAILED_RESULT + end + end end - begin - @parser.parse(raw_value) do |t, values| + rescue Fluent::TextParser::ParserError => e + log.warn e.message unless @suppress_parse_error_log + rescue ArgumentError => e + if @replace_invalid_sequence + unless e.message.index("invalid byte sequence in") == 0 + raise + end + replaced_string = replace_invalid_byte(raw_value) + @parser.parse(replaced_string) do |t, values| if values t ||= time r = handle_parsed(tag, record, t, values) - new_es.add(t, r) + return t, r else log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log if @reserve_data t = time r = handle_parsed(tag, record, time, {}) - new_es.add(t, r) - end - end - end - rescue Fluent::TextParser::ParserError => e - log.warn e.message unless @suppress_parse_error_log - rescue ArgumentError => e - if @replace_invalid_sequence - unless e.message.index("invalid byte sequence in") == 0 - raise - end - replaced_string = replace_invalid_byte(raw_value) - @parser.parse(replaced_string) do |t, values| - if values - t ||= time - r = handle_parsed(tag, record, t, values) - new_es.add(t, r) + return t, r else - log.warn "pattern not match with data '#{raw_value}'" unless @suppress_parse_error_log - if @reserve_data - t = time - r = handle_parsed(tag, record, time, {}) - new_es.add(t, r) - end + return FAILED_RESULT end end - else - raise end - rescue => e - log.warn "parse failed #{e.message}" unless @suppress_parse_error_log + else + raise end + rescue => e + log.warn "parse failed #{e.message}" unless @suppress_parse_error_log + return FAILED_RESULT end - new_es end private diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb index 37cb336816..55f10011f6 100644 --- a/test/plugin/test_filter_parser.rb +++ b/test/plugin/test_filter_parser.rb @@ -479,11 +479,13 @@ def test_filter_invalid_byte invalid_utf8 = "\xff".force_encoding('UTF-8') d = create_driver(CONFIG_NOT_REPLACE) - assert_raise(ArgumentError) { - d.run do - d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => invalid_utf8}) - end - } + d.run(shutdown: false) do + d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => invalid_utf8}) + end + error_event = d.error_events.first + assert_equal "test", error_event[0] + assert_instance_of ArgumentError, error_event[3] + d.instance_shutdown d = create_driver(CONFIG_INVALID_BYTE) assert_nothing_raised { @@ -564,60 +566,24 @@ def test_filter_key_not_exist end # suppress_parse_error_log test - CONFIG_DISABELED_SUPPRESS_PARSE_ERROR_LOG = %[ - tag hogelog - format /^col1=(?.+) col2=(?.+)$/ - key_name message - suppress_parse_error_log false - ] - CONFIG_ENABELED_SUPPRESS_PARSE_ERROR_LOG = %[ - tag hogelog - format /^col1=(?.+) col2=(?.+)$/ - key_name message - suppress_parse_error_log true - ] - CONFIG_DEFAULT_SUPPRESS_PARSE_ERROR_LOG = %[ - tag hogelog - format /^col1=(?.+) col2=(?.+)$/ - key_name message - ] - INVALID_MESSAGE = 'foo bar' VALID_MESSAGE = 'col1=foo col2=bar' - # if call warn() raise exception - class DummyLoggerWarnedException < StandardError; end - class DummyLogger - def reset - end - def warn(message) - raise DummyLoggerWarnedException - end - end - - def swap_logger(instance) - raise "use with block" unless block_given? - dummy = DummyLogger.new - saved_logger = instance.log - instance.log = dummy - restore = lambda { instance.log = saved_logger } - - yield - - restore.call - end - def test_parser_error_warning d = create_driver(CONFIG_INVALID_TIME_VALUE) - swap_logger(d.instance) do - assert_raise(DummyLoggerWarnedException) { - d.run do - d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'}) - end - } + d.run(shutdown: false) do + d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'}) end + assert_match(/\[warn\]: invalid time value:/, d.instance.log.logs.first) + d.instance_shutdown end + CONFIG_DEFAULT_SUPPRESS_PARSE_ERROR_LOG = %[ + tag hogelog + format /^col1=(?.+) col2=(?.+)$/ + key_name message + ] + class DefaultSuppressParseErrorLogTest < self setup do # default(disabled) 'suppress_parse_error_log' is not specify @@ -625,26 +591,29 @@ class DefaultSuppressParseErrorLogTest < self end def test_raise_exception - swap_logger(@d.instance) do - assert_raise(DummyLoggerWarnedException) { - @d.run do - @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) - end - } + @d.run(shutdown: false) do + @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) end + assert_match(/\[warn\]: pattern not match with data/, @d.instance.log.logs.first) + @d.instance_shutdown end def test_nothing_raised - swap_logger(@d.instance) do - assert_nothing_raised { - @d.run do - @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE}) - end - } + @d.run(shutdown: false) do + @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE}) end + assert_empty @d.instance.log.logs + @d.instance_shutdown end end + CONFIG_DISABELED_SUPPRESS_PARSE_ERROR_LOG = %[ + tag hogelog + format /^col1=(?.+) col2=(?.+)$/ + key_name message + suppress_parse_error_log false + ] + class DisabledSuppressParseErrorLogTest < self setup do # disabled 'suppress_parse_error_log' @@ -652,26 +621,29 @@ class DisabledSuppressParseErrorLogTest < self end def test_raise_exception - swap_logger(@d.instance) do - assert_raise(DummyLoggerWarnedException) { - @d.run do - @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) - end - } + @d.run(shutdown: false) do + @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) end + assert_match(/\[warn\]: pattern not match with data/, @d.instance.log.logs.first) + @d.instance_shutdown end def test_nothing_raised - swap_logger(@d.instance) do - assert_nothing_raised { - @d.run do - @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE}) - end - } + @d.run(shutdown: false) do + @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE}) end + assert_empty @d.instance.log.logs + @d.instance_shutdown end end + CONFIG_ENABELED_SUPPRESS_PARSE_ERROR_LOG = %[ + tag hogelog + format /^col1=(?.+) col2=(?.+)$/ + key_name message + suppress_parse_error_log true + ] + class EnabledSuppressParseErrorLogTest < self setup do # enabled 'suppress_parse_error_log' @@ -679,14 +651,12 @@ class EnabledSuppressParseErrorLogTest < self end def test_nothing_raised - swap_logger(@d.instance) do - assert_nothing_raised { - @d.run do - @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) - @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE}) - end - } + @d.run(shutdown: false) do + @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE}) + @d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => VALID_MESSAGE}) end + assert_empty @d.instance.log.logs + @d.instance_shutdown end end end