Skip to content

Commit

Permalink
parser_mgspack: make sure return Hash
Browse files Browse the repository at this point in the history
It is wrong for Parser to return a record that is not Hash.
Subsequent processing may result in errors.

Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
  • Loading branch information
daipom committed Apr 30, 2024
1 parent b010a09 commit 8433ca7
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 3 deletions.
27 changes: 24 additions & 3 deletions lib/fluent/plugin/parser_msgpack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,39 @@ def parser_type
:binary
end

def parse(data)
def parse(data, &block)
@unpacker.feed_each(data) do |obj|
yield convert_values(parse_time(obj), obj)
parse_unpacked_data(obj, &block)
end
end
alias parse_partial_data parse

def parse_io(io, &block)
u = Fluent::MessagePackFactory.engine_factory.unpacker(io)
u.each do |obj|
time, record = convert_values(parse_time(obj), obj)
parse_unpacked_data(obj, &block)
end
end

def parse_unpacked_data(data)
if data.is_a?(Hash)
time, record = convert_values(parse_time(data), data)
yield time, record
return
end

unless data.is_a?(Array)
yield nil, nil
return
end

data.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
time, converted_record = convert_values(parse_time(record), record)
yield time, converted_record
end
end
end
Expand Down
127 changes: 127 additions & 0 deletions test/plugin/test_parser_msgpack.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
require_relative '../helper'
require 'fluent/test/driver/parser'
require 'fluent/plugin/parser_msgpack'

class MessagePackParserTest < ::Test::Unit::TestCase
def setup
Fluent::Test.setup
end

def create_driver(conf)
Fluent::Test::Driver::Parser.new(Fluent::Plugin::MessagePackParser).configure(conf)
end

sub_test_case "simple setting" do
data(
"Normal Hash",
{
input: "\x82\xA7message\xADHello msgpack\xA3numd",
expected: [{"message" => "Hello msgpack", "num" => 100}]
},
keep: true
)
data(
"Array of multiple Hash",
{
input: "\x92\x81\xA7message\xA3foo\x81\xA7message\xA3bar",
expected: [{"message"=>"foo"}, {"message"=>"bar"}]
},
keep: true
)
data(
"String",
{
# "Hello msgpack".to_msgpack
input: "\xADHello msgpack",
expected: [nil]
},
keep: true
)
data(
"Array of String",
{
# ["foo", "bar"].to_msgpack
input: "\x92\xA3foo\xA3bar",
expected: [nil, nil]
},
keep: true
)
data(
"Array of String and Hash",
{
# ["foo", {message: "bar"}].to_msgpack
input: "\x92\xA3foo\x81\xA7message\xA3bar",
expected: [nil, {"message"=>"bar"}]
},
keep: true
)

def test_parse(data)
parsed_records = []
create_driver("").instance.parse(data[:input]) do |time, record|
parsed_records.append(record)
end
assert_equal(data[:expected], parsed_records)
end

def test_parse_io(data)
parsed_records = []
StringIO.open(data[:input]) do |io|
create_driver("").instance.parse_io(io) do |time, record|
parsed_records.append(record)
end
end
assert_equal(data[:expected], parsed_records)
end
end

# This becomes NoMethodError if a non-Hash object is passed to convert_values.
# https://github.com/fluent/fluentd/issues/4100
sub_test_case "execute_convert_values with null_empty_string" do
data(
"Normal hash",
{
# {message: "foo", empty: ""}.to_msgpack
input: "\x82\xA7message\xA3foo\xA5empty\xA0",
expected: [{"message" => "foo", "empty" => nil}]
},
keep: true
)
data(
"Array of multiple Hash",
{
# [{message: "foo", empty: ""}, {message: "bar", empty: ""}].to_msgpack
input: "\x92\x82\xA7message\xA3foo\xA5empty\xA0\x82\xA7message\xA3bar\xA5empty\xA0",
expected: [{"message"=>"foo", "empty" => nil}, {"message"=>"bar", "empty" => nil}]
},
keep: true
)
data(
"String",
{
# "Hello msgpack".to_msgpack
input: "\xADHello msgpack",
expected: [nil]
},
keep: true
)

def test_parse(data)
parsed_records = []
create_driver("null_empty_string").instance.parse(data[:input]) do |time, record|
parsed_records.append(record)
end
assert_equal(data[:expected], parsed_records)
end

def test_parse_io(data)
parsed_records = []
StringIO.open(data[:input]) do |io|
create_driver("null_empty_string").instance.parse_io(io) do |time, record|
parsed_records.append(record)
end
end
assert_equal(data[:expected], parsed_records)
end
end
end

0 comments on commit 8433ca7

Please sign in to comment.