Skip to content

Commit

Permalink
in_http: Fix "ignored parser options" problem on bulk insertion
Browse files Browse the repository at this point in the history
What used to happen is that options in the <parser> section get ignored
when we send events as a JSON array. This occurs because parser plugins
are designed to handle a single hash value and do not generallly handle
an array input well.

Until now, we have tried to solve this issue by emulating the semantics
of parser plugins in Fluent::Plugin::HttpInput (see 1afbfb1, 39f3a0d
and f560017). However, this approach turned out to be error prone and
rather tedious.

This patch takes a different approach:

 - Whenever @parser is available, reuse @parser.convert_values() by
   manually applying it to each record in an input array.
 - Otherwise, fall back to the current logic.

... and should solve this class of issues.

Signed-off-by: Fujimoto Seiji <fujimoto@clear-code.com>
  • Loading branch information
Fujimoto Seiji committed Jul 5, 2018
1 parent 9ee8db3 commit 2e5f963
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
14 changes: 8 additions & 6 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ def on_request(path_info, params)
single_record['REMOTE_ADDR'] = params['REMOTE_ADDR']
end

if single_record.has_key?(@parser_time_key)
single_time = Fluent::EventTime.from_time(Time.at(single_record[@parser_time_key]))
unless @parser and @parser.keep_time_key
single_record.delete(@parser_time_key)
end
if defined? @parser
single_time = @parser.parse_time(single_record)
single_time, single_record = @parser.convert_values(single_time, single_record)
else
single_time = time
single_time = if t = single_record.delete(@parser_time_key)
Fluent::EventTime.from_time(Time.at(t))
else
time
end
end

mes.add(single_time, single_record)
Expand Down
16 changes: 10 additions & 6 deletions test/plugin/test_in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,20 @@ def test_json_with_add_http_headers
assert include_http_header?(d.events[1][2])
end

def test_multi_json_with_keep_time_key
def test_multi_json_with_custom_parser
d = create_driver(CONFIG + %[
<parse>
@type json
keep_time_key true
time_key foo
time_format %iso8601
</parse>
])
time1 = event_time("2011-01-02 13:14:15 UTC")
time2 = event_time("2012-01-02 13:14:15 UTC")
records = [{"time"=>time1.to_i},{"time"=>time2.to_i}]

time = event_time("2011-01-02 13:14:15 UTC")
time_s = Time.at(time).iso8601

records = [{"foo"=>time_s,"bar"=>"test1"},{"foo"=>time_s,"bar"=>"test2"}]
tag = "tag1"
res_codes = []

Expand All @@ -390,11 +394,11 @@ def test_multi_json_with_keep_time_key
assert_equal ["200"], res_codes

assert_equal "tag1", d.events[0][0]
assert_equal_event_time time1, d.events[0][1]
assert_equal_event_time time, d.events[0][1]
assert_equal d.events[0][2], records[0]

assert_equal "tag1", d.events[1][0]
assert_equal_event_time time2, d.events[1][1]
assert_equal_event_time time, d.events[1][1]
assert_equal d.events[1][2], records[1]
end

Expand Down

0 comments on commit 2e5f963

Please sign in to comment.