From b010a09df4618fac66fa23d68ddacdc4a4219768 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 30 Apr 2024 11:04:45 +0900 Subject: [PATCH 1/4] parser_json: Make sure return Hash It is wrong for Parser to return a record that is not Hash. Subsequent processing may result in errors. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/parser_json.rb | 27 ++++++-- test/plugin/test_parser_json.rb | 106 +++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb index 829aa4b72a..52dd6f5e8f 100644 --- a/lib/fluent/plugin/parser_json.rb +++ b/lib/fluent/plugin/parser_json.rb @@ -70,16 +70,33 @@ def configure_json_parser(name) end def parse(text) - record = @load_proc.call(text) - time = parse_time(record) - if @execute_convert_values - time, record = convert_values(time, record) + parsed_json = @load_proc.call(text) + + if parsed_json.is_a?(Hash) + time, record = parse_one_record(parsed_json) + yield time, record + elsif parsed_json.is_a?(Array) + parsed_json.each do |record| + unless record.is_a?(Hash) + yield nil, nil + next + end + time, parsed_record = parse_one_record(record) + yield time, parsed_record + end + else + yield nil, nil end - yield time, record + rescue @error_class, EncodingError # EncodingError is for oj 3.x or later yield nil, nil end + def parse_one_record(record) + time = parse_time(record) + convert_values(time, record) + end + def parser_type :text end diff --git a/test/plugin/test_parser_json.rb b/test/plugin/test_parser_json.rb index 19c45402d1..875611eb32 100644 --- a/test/plugin/test_parser_json.rb +++ b/test/plugin/test_parser_json.rb @@ -135,4 +135,110 @@ def test_yajl_parse_io_with_buffer_smaller_than_input end end end + + sub_test_case "various record pattern" do + data("Only string", { record: '"message"', expected: [nil] }, keep: true) + data("Only string without quotation", { record: "message", expected: [nil] }, keep: true) + data("Only number", { record: "0", expected: [nil] }, keep: true) + data( + "Array of Hash", + { + record: '[{"k1": 1}, {"k2": 2}]', + expected: [{"k1" => 1}, {"k2" => 2}] + }, + keep: true, + ) + data( + "Array of both Hash and invalid", + { + record: '[{"k1": 1}, "string", {"k2": 2}, 0]', + expected: [{"k1" => 1}, nil, {"k2" => 2}, nil] + }, + keep: true, + ) + data( + "Array of all invalid", + { + record: '["string", 0, [{"k": 0}]]', + expected: [nil, nil, nil] + }, + keep: true, + ) + + def test_oj(data) + parsed_records = [] + @parser.configure("json_parser" => "oj") + @parser.instance.parse(data[:record]) { |time, record| + parsed_records.append(record) + } + assert_equal(data[:expected], parsed_records) + end + + def test_yajl(data) + parsed_records = [] + @parser.configure("json_parser" => "yajl") + @parser.instance.parse(data[:record]) { |time, record| + parsed_records.append(record) + } + assert_equal(data[:expected], parsed_records) + end + + def test_json(json) + parsed_records = [] + @parser.configure("json_parser" => "json") + @parser.instance.parse(data[:record]) { |time, record| + parsed_records.append(record) + } + assert_equal(data[:expected], parsed_records) + end + end + + # This becomes NoMethodError if a non-Hash object is passed to convert_values. + # https://github.com/fluent/fluentd/issues/4100 + sub_test_case "execute_convert_values with null_empty_string" do + data("Only string", { record: '"message"', expected: [nil] }, keep: true) + data( + "Hash", + { + record: '{"k1": 1, "k2": ""}', + expected: [{"k1" => 1, "k2" => nil}] + }, + keep: true, + ) + data( + "Array of Hash", + { + record: '[{"k1": 1}, {"k2": ""}]', + expected: [{"k1" => 1}, {"k2" => nil}] + }, + keep: true, + ) + + def test_oj(data) + parsed_records = [] + @parser.configure("json_parser" => "oj", "null_empty_string" => true) + @parser.instance.parse(data[:record]) { |time, record| + parsed_records.append(record) + } + assert_equal(data[:expected], parsed_records) + end + + def test_yajl(data) + parsed_records = [] + @parser.configure("json_parser" => "yajl", "null_empty_string" => true) + @parser.instance.parse(data[:record]) { |time, record| + parsed_records.append(record) + } + assert_equal(data[:expected], parsed_records) + end + + def test_json(json) + parsed_records = [] + @parser.configure("json_parser" => "json", "null_empty_string" => true) + @parser.instance.parse(data[:record]) { |time, record| + parsed_records.append(record) + } + assert_equal(data[:expected], parsed_records) + end + end end From 8433ca7ff78525bf89e2fea0f58a048ff1e2b6a9 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 30 Apr 2024 10:52:01 +0900 Subject: [PATCH 2/4] parser_mgspack: make sure return Hash It is wrong for Parser to return a record that is not Hash. Subsequent processing may result in errors. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/parser_msgpack.rb | 27 +++++- test/plugin/test_parser_msgpack.rb | 127 ++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 test/plugin/test_parser_msgpack.rb diff --git a/lib/fluent/plugin/parser_msgpack.rb b/lib/fluent/plugin/parser_msgpack.rb index b35f04b9f6..4d29bc6b7e 100644 --- a/lib/fluent/plugin/parser_msgpack.rb +++ b/lib/fluent/plugin/parser_msgpack.rb @@ -31,9 +31,9 @@ def parser_type :binary end - def parse(data) + def parse(data, &block) @unpacker.feed_each(data) do |obj| - yield convert_values(parse_time(obj), obj) + parse_unpacked_data(obj, &block) end end alias parse_partial_data parse @@ -41,8 +41,29 @@ def parse(data) def parse_io(io, &block) u = Fluent::MessagePackFactory.engine_factory.unpacker(io) u.each do |obj| - time, record = convert_values(parse_time(obj), obj) + parse_unpacked_data(obj, &block) + end + end + + def parse_unpacked_data(data) + if data.is_a?(Hash) + time, record = convert_values(parse_time(data), data) yield time, record + return + end + + unless data.is_a?(Array) + yield nil, nil + return + end + + data.each do |record| + unless record.is_a?(Hash) + yield nil, nil + next + end + time, converted_record = convert_values(parse_time(record), record) + yield time, converted_record end end end diff --git a/test/plugin/test_parser_msgpack.rb b/test/plugin/test_parser_msgpack.rb new file mode 100644 index 0000000000..eafea1538c --- /dev/null +++ b/test/plugin/test_parser_msgpack.rb @@ -0,0 +1,127 @@ +require_relative '../helper' +require 'fluent/test/driver/parser' +require 'fluent/plugin/parser_msgpack' + +class MessagePackParserTest < ::Test::Unit::TestCase + def setup + Fluent::Test.setup + end + + def create_driver(conf) + Fluent::Test::Driver::Parser.new(Fluent::Plugin::MessagePackParser).configure(conf) + end + + sub_test_case "simple setting" do + data( + "Normal Hash", + { + input: "\x82\xA7message\xADHello msgpack\xA3numd", + expected: [{"message" => "Hello msgpack", "num" => 100}] + }, + keep: true + ) + data( + "Array of multiple Hash", + { + input: "\x92\x81\xA7message\xA3foo\x81\xA7message\xA3bar", + expected: [{"message"=>"foo"}, {"message"=>"bar"}] + }, + keep: true + ) + data( + "String", + { + # "Hello msgpack".to_msgpack + input: "\xADHello msgpack", + expected: [nil] + }, + keep: true + ) + data( + "Array of String", + { + # ["foo", "bar"].to_msgpack + input: "\x92\xA3foo\xA3bar", + expected: [nil, nil] + }, + keep: true + ) + data( + "Array of String and Hash", + { + # ["foo", {message: "bar"}].to_msgpack + input: "\x92\xA3foo\x81\xA7message\xA3bar", + expected: [nil, {"message"=>"bar"}] + }, + keep: true + ) + + def test_parse(data) + parsed_records = [] + create_driver("").instance.parse(data[:input]) do |time, record| + parsed_records.append(record) + end + assert_equal(data[:expected], parsed_records) + end + + def test_parse_io(data) + parsed_records = [] + StringIO.open(data[:input]) do |io| + create_driver("").instance.parse_io(io) do |time, record| + parsed_records.append(record) + end + end + assert_equal(data[:expected], parsed_records) + end + end + + # This becomes NoMethodError if a non-Hash object is passed to convert_values. + # https://github.com/fluent/fluentd/issues/4100 + sub_test_case "execute_convert_values with null_empty_string" do + data( + "Normal hash", + { + # {message: "foo", empty: ""}.to_msgpack + input: "\x82\xA7message\xA3foo\xA5empty\xA0", + expected: [{"message" => "foo", "empty" => nil}] + }, + keep: true + ) + data( + "Array of multiple Hash", + { + # [{message: "foo", empty: ""}, {message: "bar", empty: ""}].to_msgpack + input: "\x92\x82\xA7message\xA3foo\xA5empty\xA0\x82\xA7message\xA3bar\xA5empty\xA0", + expected: [{"message"=>"foo", "empty" => nil}, {"message"=>"bar", "empty" => nil}] + }, + keep: true + ) + data( + "String", + { + # "Hello msgpack".to_msgpack + input: "\xADHello msgpack", + expected: [nil] + }, + keep: true + ) + + def test_parse(data) + parsed_records = [] + create_driver("null_empty_string").instance.parse(data[:input]) do |time, record| + parsed_records.append(record) + end + assert_equal(data[:expected], parsed_records) + end + + def test_parse_io(data) + parsed_records = [] + StringIO.open(data[:input]) do |io| + create_driver("null_empty_string").instance.parse_io(io) do |time, record| + parsed_records.append(record) + end + end + assert_equal(data[:expected], parsed_records) + end + end +end \ No newline at end of file From 4bf78e6407d68ea64555cad070dbe831ea736701 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 26 Apr 2024 17:44:57 +0900 Subject: [PATCH 3/4] in_http: support Parser yield in_http didn't support yield of Parser. The specification assumed that Parser could return Array. However, this is wrong. Parser shouldn't return Array. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_http.rb | 67 ++++++++---------------------------- 1 file changed, 15 insertions(+), 52 deletions(-) diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 9f46caa8e8..6385a80a79 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -203,54 +203,24 @@ def on_request(path_info, params) begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') - record_time, record = parse_params(params) - # Skip nil record - if record.nil? - log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } - if @respond_with_empty_img - return RESPONSE_IMG - else - if @use_204_response - return RESPONSE_204 - else - return RESPONSE_200 - end + mes = Fluent::MultiEventStream.new + parse_params(params) do |record_time, record| + if record.nil? + log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } + next end - end - - mes = nil - # Support batched requests - if record.is_a?(Array) - mes = Fluent::MultiEventStream.new - record.each do |single_record| - add_params_to_record(single_record, params) - - if param_time = params['time'] - param_time = param_time.to_f - single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) - elsif @custom_parser - single_time = @custom_parser.parse_time(single_record) - single_time, single_record = @custom_parser.convert_values(single_time, single_record) - else - single_time = convert_time_field(single_record) - end - mes.add(single_time, single_record) - end - else add_params_to_record(record, params) time = if param_time = params['time'] param_time = param_time.to_f param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) else - if record_time.nil? - convert_time_field(record) - else - record_time - end + record_time.nil? ? convert_time_field(record) : record_time end + + mes.add(time, record) end rescue => e if @dump_error_log @@ -261,11 +231,7 @@ def on_request(path_info, params) # TODO server error begin - if mes - router.emit_stream(tag, mes) - else - router.emit(tag, time, record) - end + router.emit_stream(tag, mes) unless mes.empty? rescue => e if @dump_error_log log.error "failed to emit data", error: e @@ -308,20 +274,18 @@ def on_server_connect(conn) def parse_params_default(params) if msgpack = params['msgpack'] @parser_msgpack.parse(msgpack) do |_time, record| - return nil, record + yield nil, record end elsif js = params['json'] @parser_json.parse(js) do |_time, record| - return nil, record + yield nil, record end elsif ndjson = params['ndjson'] - events = [] ndjson.split(/\r?\n/).each do |js| @parser_json.parse(js) do |_time, record| - events.push(record) + yield nil, record end end - return nil, events else raise "'json', 'ndjson' or 'msgpack' parameter is required" end @@ -329,10 +293,9 @@ def parse_params_default(params) def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] - @custom_parser.parse(content) { |time, record| - raise "Received event is not #{@format_name}: #{content}" if record.nil? - return time, record - } + @custom_parser.parse(content) do |time, record| + yield time, record + end else raise "'#{EVENT_RECORD_PARAMETER}' parameter is required" end From 6cace97fcbe55da3106ba55d6561d8f9e40d476b Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 26 Apr 2024 18:24:14 +0900 Subject: [PATCH 4/4] Add comment about limitation of filter_parser specification Config to reproduce: @type sample tag test.array sample {"message": "[{\"k\":\"v\"}, {\"k2\":\"v2\"}]"} @type parser key_name message @type json @type stdout Result: 2023-03-21 23:24:52.004470792 +0900 test.array: {"k":"v"} 2023-03-21 23:24:52.004470792 +0900 test.array: {"k":"v"} ... Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/filter_parser.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index e9c03fd944..52909d0171 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -79,6 +79,10 @@ def filter_with_time(tag, time, record) end @accessor.delete(record) if @remove_key_name_field r = handle_parsed(tag, record, t, values) + # Note: https://github.com/fluent/fluentd/issues/4100 + # If the parser returns multiple records from one raw_value, + # this returns only the first one record. + # This should be fixed in the future version. return t, r else if @emit_invalid_record_to_error