Skip to content

Commit

Permalink
Merge pull request #1078 from cosmo0920/migrate-in_exec-to-v0.14
Browse files Browse the repository at this point in the history
Migrate in exec to v0.14
  • Loading branch information
tagomoris authored Jul 11, 2016
2 parents 4082b65 + 3a0aa30 commit a933b56
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 60 deletions.
69 changes: 16 additions & 53 deletions lib/fluent/plugin/in_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
require 'strptime'
require 'yajl'

require 'fluent/input'
require 'fluent/plugin/input'
require 'fluent/time'
require 'fluent/timezone'
require 'fluent/config/error'

module Fluent
class ExecInput < Input
Plugin.register_input('exec', self)
module Fluent::Plugin
class ExecInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('exec', self)

helpers :child_process

def initialize
super
Expand Down Expand Up @@ -65,7 +67,7 @@ def configure(conf)
end

if !@tag && !@tag_key
raise ConfigError, "'tag' or 'tag_key' option is required on exec input"
raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input"
end

if @time_key
Expand All @@ -90,7 +92,7 @@ def setup_parser(conf)
case @format
when 'tsv'
if @keys.empty?
raise ConfigError, "keys option is required on exec input for tsv format"
raise Fluent::ConfigError, "keys option is required on exec input for tsv format"
end
ExecUtil::TSVParser.new(@keys, method(:on_message))
when 'json'
Expand All @@ -106,57 +108,18 @@ def start
super

if @run_interval
@finished = false
@thread = Thread.new(&method(:run_periodic))
else
@io = IO.popen(@command, "r")
@pid = @io.pid
@thread = Thread.new(&method(:run))
end
end

def shutdown
if @run_interval
@finished = true
# call Thread#run which interupts sleep in order to stop run_periodic thread immediately.
@thread.run
@thread.join
else
begin
Process.kill(:TERM, @pid)
rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
end
if @thread.join(60) # TODO wait time
return
child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io|
run(io)
end

begin
Process.kill(:KILL, @pid)
rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
else
child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io|
run(io)
end
@thread.join
end

super
end

def run
@parser.call(@io)
end

def run_periodic
sleep @run_interval
until @finished
begin
io = IO.popen(@command, "r")
@parser.call(io)
Process.waitpid(io.pid)
sleep @run_interval
rescue
log.error "exec failed to run or shutdown child process", error: $!
log.warn_backtrace $!.backtrace
end
end
def run(io)
@parser.call(io)
end

private
Expand All @@ -174,7 +137,7 @@ def on_message(record, parsed_time = nil)
if val = record.delete(@time_key)
time = @time_parse_proc.call(val)
else
time = Engine.now
time = Fluent::EventTime.now
end
end

Expand Down
14 changes: 7 additions & 7 deletions test/plugin/test_in_exec.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
require_relative '../helper'
require 'fluent/test'
require 'fluent/test/driver/input'
require 'fluent/plugin/in_exec'
require 'net/http'

class ExecInputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
@test_time = Fluent::EventTime.parse("2011-01-02 13:14:15")
@test_time = event_time("2011-01-02 13:14:15")
@script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb'))
end

def create_driver(conf = tsv_config)
Fluent::Test::InputTestDriver.new(Fluent::ExecInput).configure(conf)
Fluent::Test::Driver::Input.new(Fluent::Plugin::ExecInput).configure(conf)
end

def tsv_config
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_emit
sleep 2
end

emits = d.emits
emits = d.events
assert_equal true, emits.length > 0
assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0]
assert_equal_event_time(@test_time, emits[0][1])
Expand All @@ -103,7 +103,7 @@ def test_emit_json
sleep 2
end

emits = d.emits
emits = d.events
assert_equal true, emits.length > 0
assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0]
assert_equal_event_time(@test_time, emits[0][1])
Expand All @@ -116,7 +116,7 @@ def test_emit_msgpack
sleep 2
end

emits = d.emits
emits = d.events
assert_equal true, emits.length > 0
assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0]
assert_equal_event_time(@test_time, emits[0][1])
Expand All @@ -129,7 +129,7 @@ def test_emit_regexp
sleep 2
end

emits = d.emits
emits = d.events
assert_equal true, emits.length > 0
assert_equal ["regex_tag", @test_time, {"message"=>"hello"}], emits[0]
assert_equal_event_time(@test_time, emits[0][1])
Expand Down

0 comments on commit a933b56

Please sign in to comment.