From 34567ee165ef40671fc7c8dbe24214c954b71cb3 Mon Sep 17 00:00:00 2001 From: Ben Wheatley Date: Sat, 13 Apr 2019 23:01:30 +0100 Subject: [PATCH] parser_json: Add stream_buffer_size config param Allow configuration of the size of the buffer that Yajl uses when parsing streaming input. The advantage of this is that when using `out_exec_filter`, and parsing as JSON, it's now possible to configure this plugin to avoid having to wait for 8092 bytes of data to be parsed before events are emitted. Configuration in the `out_exec_filter` tests has been modified to use this parameter, as it shaves 60 seconds off the test run time. Signed-off-by: Ben Wheatley --- lib/fluent/plugin/out_exec_filter.rb | 1 + lib/fluent/plugin/parser_json.rb | 8 +++++++- test/plugin/test_out_exec_filter.rb | 6 ++++++ test/plugin/test_parser_json.rb | 24 ++++++++++++++++++++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_exec_filter.rb b/lib/fluent/plugin/out_exec_filter.rb index 2bcda92615..2dec7997dd 100644 --- a/lib/fluent/plugin/out_exec_filter.rb +++ b/lib/fluent/plugin/out_exec_filter.rb @@ -95,6 +95,7 @@ class ExecFilterOutput < Output COMPAT_PARSE_PARAMS = { 'out_format' => '@type', 'out_keys' => 'keys', + 'out_stream_buffer_size' => 'stream_buffer_size', } COMPAT_EXTRACT_PARAMS = { 'out_tag_key' => 'tag_key', diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb index 50a80a31c9..7be12efa49 100644 --- a/lib/fluent/plugin/parser_json.rb +++ b/lib/fluent/plugin/parser_json.rb @@ -30,6 +30,12 @@ class JSONParser < Parser desc 'Set JSON parser' config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj + # The Yajl library defines a default buffer size of 8KiB when parsing + # from IO streams, so maintain this for backwards-compatibility. + # https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse + desc 'Set the buffer size that Yajl will use when parsing streaming input' + config_param :stream_buffer_size, :integer, default: 8192 + config_set_default :time_type, :float def configure(conf) @@ -81,7 +87,7 @@ def parse_io(io, &block) y.on_parse_complete = ->(record){ block.call(parse_time(record), record) } - y.parse(io) + y.parse(io, @stream_buffer_size) end end end diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb index 8d9be9f1fa..d3ced55c2c 100644 --- a/test/plugin/test_out_exec_filter.rb +++ b/test/plugin/test_out_exec_filter.rb @@ -328,6 +328,7 @@ def create_driver(conf) @type json + stream_buffer_size 1 tag_key tag @@ -338,6 +339,7 @@ def create_driver(conf) command cat in_keys message out_format json + out_stream_buffer_size 1 time_key time tag_key tag ] @@ -372,6 +374,7 @@ def create_driver(conf) @type json + stream_buffer_size 1 tag_key tag @@ -382,6 +385,7 @@ def create_driver(conf) command cat in_keys message out_format json + out_stream_buffer_size 1 time_key time tag_key tag ] @@ -414,6 +418,7 @@ def create_driver(conf) @type json + stream_buffer_size 1 tag_key tag @@ -426,6 +431,7 @@ def create_driver(conf) command cat in_keys message out_format json + out_stream_buffer_size 1 time_key time time_format %d/%b/%Y %H:%M:%S.%N %z tag_key tag diff --git a/test/plugin/test_parser_json.rb b/test/plugin/test_parser_json.rb index 46c5f47d9f..19c45402d1 100644 --- a/test/plugin/test_parser_json.rb +++ b/test/plugin/test_parser_json.rb @@ -111,4 +111,28 @@ def test_parse_with_keep_time_key_without_time_format(data) assert_equal text, record['time'] end end + + def test_yajl_parse_io_with_buffer_smaller_than_input + parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::JSONParser) + parser.configure( + 'keep_time_key' => 'true', + 'json_parser' => 'yajl', + 'stream_buffer_size' => 1, + ) + text = "100" + + waiting(5) do + rd, wr = IO.pipe + wr.write "{\"time\":\"#{text}\"}" + + parser.instance.parse_io(rd) do |time, record| + assert_equal text.to_i, time.sec + assert_equal text, record['time'] + + # Once a record has been received the 'write' end of the pipe must be + # closed, otherwise the test will block waiting for more input. + wr.close + end + end + end end