diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb index 295b6ce864..eeec824167 100644 --- a/lib/fluent/plugin/in_exec.rb +++ b/lib/fluent/plugin/in_exec.rb @@ -17,14 +17,16 @@ require 'strptime' require 'yajl' -require 'fluent/input' +require 'fluent/plugin/input' require 'fluent/time' require 'fluent/timezone' require 'fluent/config/error' -module Fluent - class ExecInput < Input - Plugin.register_input('exec', self) +module Fluent::Plugin + class ExecInput < Fluent::Plugin::Input + Fluent::Plugin.register_input('exec', self) + + helpers :child_process def initialize super @@ -65,7 +67,7 @@ def configure(conf) end if !@tag && !@tag_key - raise ConfigError, "'tag' or 'tag_key' option is required on exec input" + raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input" end if @time_key @@ -90,7 +92,7 @@ def setup_parser(conf) case @format when 'tsv' if @keys.empty? - raise ConfigError, "keys option is required on exec input for tsv format" + raise Fluent::ConfigError, "keys option is required on exec input for tsv format" end ExecUtil::TSVParser.new(@keys, method(:on_message)) when 'json' @@ -106,57 +108,18 @@ def start super if @run_interval - @finished = false - @thread = Thread.new(&method(:run_periodic)) - else - @io = IO.popen(@command, "r") - @pid = @io.pid - @thread = Thread.new(&method(:run)) - end - end - - def shutdown - if @run_interval - @finished = true - # call Thread#run which interupts sleep in order to stop run_periodic thread immediately. - @thread.run - @thread.join - else - begin - Process.kill(:TERM, @pid) - rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM - end - if @thread.join(60) # TODO wait time - return + child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io| + run(io) end - - begin - Process.kill(:KILL, @pid) - rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM + else + child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io| + run(io) end - @thread.join end - - super end - def run - @parser.call(@io) - end - - def run_periodic - sleep @run_interval - until @finished - begin - io = IO.popen(@command, "r") - @parser.call(io) - Process.waitpid(io.pid) - sleep @run_interval - rescue - log.error "exec failed to run or shutdown child process", error: $! - log.warn_backtrace $!.backtrace - end - end + def run(io) + @parser.call(io) end private @@ -174,7 +137,7 @@ def on_message(record, parsed_time = nil) if val = record.delete(@time_key) time = @time_parse_proc.call(val) else - time = Engine.now + time = Fluent::EventTime.now end end diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index e83a40fa53..c3f00d988d 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -1,17 +1,17 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_exec' require 'net/http' class ExecInputTest < Test::Unit::TestCase def setup Fluent::Test.setup - @test_time = Fluent::EventTime.parse("2011-01-02 13:14:15") + @test_time = event_time("2011-01-02 13:14:15") @script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) end def create_driver(conf = tsv_config) - Fluent::Test::InputTestDriver.new(Fluent::ExecInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::ExecInput).configure(conf) end def tsv_config @@ -90,7 +90,7 @@ def test_emit sleep 2 end - emits = d.emits + emits = d.events assert_equal true, emits.length > 0 assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0] assert_equal_event_time(@test_time, emits[0][1]) @@ -103,7 +103,7 @@ def test_emit_json sleep 2 end - emits = d.emits + emits = d.events assert_equal true, emits.length > 0 assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0] assert_equal_event_time(@test_time, emits[0][1]) @@ -116,7 +116,7 @@ def test_emit_msgpack sleep 2 end - emits = d.emits + emits = d.events assert_equal true, emits.length > 0 assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0] assert_equal_event_time(@test_time, emits[0][1]) @@ -129,7 +129,7 @@ def test_emit_regexp sleep 2 end - emits = d.emits + emits = d.events assert_equal true, emits.length > 0 assert_equal ["regex_tag", @test_time, {"message"=>"hello"}], emits[0] assert_equal_event_time(@test_time, emits[0][1])