Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flatten_json and flatten_key_delimiter option #2470

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class JSONParser < Parser
desc 'Set the buffer size that Yajl will use when parsing streaming input'
config_param :stream_buffer_size, :integer, default: 8192

desc 'Whether to flatten nested JSON.'
config_param :flatten_json, :bool, default: false

desc 'The delimiter character (or string) of key when flatten nested JSON.'
config_param :flatten_key_delimiter, :string, default: '.'

config_set_default :time_type, :float

def configure(conf)
Expand Down Expand Up @@ -72,7 +78,7 @@ def configure_json_parser(name)

def parse(text)
r = @load_proc.call(text)
time, record = convert_values(parse_time(r), r)
time, record = convert_values(parse_time(r), @flatten_json ? flatten_hash_from(r) : r)
yield time, record
rescue @error_class, EncodingError # EncodingError is for oj 3.x or later
yield nil, nil
Expand All @@ -89,6 +95,21 @@ def parse_io(io, &block)
}
y.parse(io, @stream_buffer_size)
end

private
def flatten_hash_from(hash_or_array)
if hash_or_array.is_a?(Array)
tmp_hash = {}
hash_or_array.each_with_index { |value, index| tmp_hash["#{index}"] = value }
end
hash = hash_or_array.is_a?(Array) ? tmp_hash : hash_or_array
hash.each_with_object({}) do |(key, value), flat_hash|
next flatten_hash_from(value).each do |k, v|
flat_hash["#{key}#{@flatten_key_delimiter}#{k}"] = v
end if value.is_a?(Hash) || value.is_a?(Array)
flat_hash[key] = value
end
end
end
end
end
51 changes: 51 additions & 0 deletions test/plugin/test_parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,57 @@ def test_parse(data)
}
end

data('oj' => 'oj', 'yajl' => 'yajl')
def test_parse_with_nested_json(data)
@parser.configure(
'flatten_json' => 'true',
'json_parser' => data
)
nested_json = '{"time": 1552109400, "nested_param": {
"hash_hash": {"a": {"a": 0}, "b": {"b": 1, "c": 2}},
"array_array": [[3], [4, 5], [6, 7, 8]],
"hash_array": {"a": [[9], [10, 11], [12, 13, 14]]},
"array_hash": [{"a": 15}, {"b": 16, "c": 17}]
}}'
@parser.instance.parse(nested_json) { |time, record|
assert_equal(event_time('2019-03-09 14:30:00 +0900').to_i, time)
assert_equal({
'nested_param.hash_hash.a.a' => 0,
'nested_param.hash_hash.b.b' => 1,
'nested_param.hash_hash.b.c' => 2,
'nested_param.array_array.0.0' => 3,
'nested_param.array_array.1.0' => 4,
'nested_param.array_array.1.1' => 5,
'nested_param.array_array.2.0' => 6,
'nested_param.array_array.2.1' => 7,
'nested_param.array_array.2.2' => 8,
'nested_param.hash_array.a.0.0' => 9,
'nested_param.hash_array.a.1.0' => 10,
'nested_param.hash_array.a.1.1' => 11,
'nested_param.hash_array.a.2.0' => 12,
'nested_param.hash_array.a.2.1' => 13,
'nested_param.hash_array.a.2.2' => 14,
'nested_param.array_hash.0.a' => 15,
'nested_param.array_hash.1.b' => 16,
'nested_param.array_hash.1.c' => 17
}, record)
}

parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::JSONParser)
parser.configure('json_parser' => data)
parser.instance.parse(nested_json) { |time, record|
assert_equal(event_time('2019-03-09 14:30:00 +0900').to_i, time)
assert_equal({
'nested_param' => {
'hash_hash' => {'a' => {'a' => 0}, 'b' => {'b' => 1, 'c' => 2}},
'array_array' => [[3], [4, 5], [6, 7, 8]],
'hash_array' => {'a' => [[9], [10, 11], [12, 13, 14]]},
'array_hash' => [{'a' => 15}, {'b' => 16, 'c' => 17}]
}
}, record)
}
end

data('oj' => 'oj', 'yajl' => 'yajl')
def test_parse_with_large_float(data)
@parser.configure('json_parser' => data)
Expand Down