Skip to content

Commit

Permalink
Merge pull request #1297 from fluent/migrate-exec-plugins-to-v0.14-api
Browse files Browse the repository at this point in the history
Migrate exec plugins to v0.14 api
  • Loading branch information
tagomoris authored Nov 11, 2016
2 parents 7b0c40b + 8b19791 commit e3edf3e
Show file tree
Hide file tree
Showing 22 changed files with 1,932 additions and 859 deletions.
42 changes: 42 additions & 0 deletions example/out_exec_filter.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<source>
@type dummy
@label @exec
tag exec_input
rate 10
auto_increment_key num
dummy {"data":"mydata"}
</source>

<label @exec>
<match exec_input>
@type exec_filter
@label @stdout
tag result
command ruby -e 'STDOUT.sync = true; proc = ->(){line = STDIN.readline.chomp; puts line + "\t" + Process.pid.to_s}; 1000.times{ proc.call }'
num_children 3
child_respawn -1
<inject>
time_key time
time_type float
</inject>
<format>
@type tsv
keys data, num, time
</format>
<parse>
@type tsv
keys data, num, time, pid
</parse>
<extract>
time_key time
time_type float
</extract>
</match>
</label>

<label @stdout>
<match result>
@type stdout
</match>
</label>

5 changes: 5 additions & 0 deletions lib/fluent/plugin/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class Formatter < Base

configured_in :format

PARSER_TYPES = [:text_per_line, :text, :binary]
def formatter_type
:text_per_line
end

def format(tag, time, record)
raise NotImplementedError, "Implement this method in child class"
end
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/formatter_msgpack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ module Plugin
class MessagePackFormatter < Formatter
Plugin.register_formatter('msgpack', self)

def formatter_type
:binary
end

def format(tag, time, record)
record.to_msgpack
end
Expand Down
34 changes: 34 additions & 0 deletions lib/fluent/plugin/formatter_tsv.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/formatter'

module Fluent
module Plugin
class TSVFormatter < Formatter
Plugin.register_formatter('tsv', self)

desc 'Field names included in each lines'
config_param :keys, :array, value_type: :string
desc 'The delimiter character (or string) of TSV values'
config_param :delimiter, :string, default: "\t"

def format(tag, time, record)
@keys.map{|k| record[k].to_s }.join(@delimiter)
end
end
end
end
141 changes: 48 additions & 93 deletions lib/fluent/plugin/in_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,136 +14,91 @@
# limitations under the License.
#

require 'strptime'
require 'yajl'

require 'fluent/plugin/input'
require 'fluent/time'
require 'fluent/timezone'
require 'fluent/config/error'
require 'yajl'

module Fluent::Plugin
class ExecInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('exec', self)

helpers :child_process

def initialize
super
require 'fluent/plugin/exec_util'
end
helpers :compat_parameters, :extract, :parser, :child_process

desc 'The command (program) to execute.'
config_param :command, :string
desc 'The format used to map the program output to the incoming event.(tsv,json,msgpack)'
config_param :format, :string, default: 'tsv'
desc 'Specify the comma-separated keys when using the tsv format.'
config_param :keys, default: [] do |val|
val.split(',')

config_section :parse do
config_set_default :@type, 'tsv'
config_set_default :time_type, :float
config_set_default :time_key, nil
config_set_default :estimate_current_event, false
end

config_section :extract do
config_set_default :time_type, :float
end

desc 'Tag of the output events.'
config_param :tag, :string, default: nil
desc 'The key to use as the event tag instead of the value in the event record. '
config_param :tag_key, :string, default: nil
desc 'The key to use as the event time instead of the value in the event record.'
config_param :time_key, :string, default: nil
desc 'The format of the event time used for the time_key parameter.'
config_param :time_format, :string, default: nil
desc 'The interval time between periodic program runs.'
config_param :run_interval, :time, default: nil
desc 'The default block size to read if parser requires partial read.'
config_param :read_block_size, :size, default: 10240 # 10k

def configure(conf)
super

if conf['localtime']
@localtime = true
elsif conf['utc']
@localtime = false
end

if conf['timezone']
@timezone = conf['timezone']
Fluent::Timezone.validate!(@timezone)
end

if !@tag && !@tag_key
raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input"
end
attr_reader :parser

if @time_key
if @time_format
f = @time_format
@time_parse_proc =
begin
strptime = Strptime.new(f)
Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) }
rescue
Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) }
end
else
@time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) }
def configure(conf)
compat_parameters_convert(conf, :extract, :parser)
['parse', 'extract'].each do |subsection_name|
if subsection = conf.elements(subsection_name).first
if subsection.has_key?('time_format')
subsection['time_type'] ||= 'string'
end
end
end

@parser = setup_parser(conf)
end
super

def setup_parser(conf)
case @format
when 'tsv'
if @keys.empty?
raise Fluent::ConfigError, "keys option is required on exec input for tsv format"
end
Fluent::ExecUtil::TSVParser.new(@keys, method(:on_message))
when 'json'
Fluent::ExecUtil::JSONParser.new(method(:on_message))
when 'msgpack'
Fluent::ExecUtil::MessagePackParser.new(method(:on_message))
else
Fluent::ExecUtil::TextParserWrapperParser.new(conf, method(:on_message))
if !@tag && (!@extract_config || !@extract_config.tag_key)
raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input"
end
@parser = parser_create
end

def start
super

if @run_interval
child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io|
run(io)
end
child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read], &method(:run))
else
child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io|
run(io)
end
child_process_execute(:exec_input, @command, immediate: true, mode: [:read], &method(:run))
end
end

def run(io)
@parser.call(io)
end

private

def on_message(record, parsed_time = nil)
if val = record.delete(@tag_key)
tag = val
else
tag = @tag
end

if parsed_time
time = parsed_time
else
if val = record.delete(@time_key)
time = @time_parse_proc.call(val)
else
time = Fluent::EventTime.now
case
when @parser.implement?(:parse_io)
@parser.parse_io(io, &method(:on_record))
when @parser.implement?(:parse_partial_data)
until io.eof?
@parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
end
when @parser.parser_type == :text_per_line
io.each_line do |line|
@parser.parse(line.chomp, &method(:on_record))
end
else
@parser.parse(io.read, &method(:on_record))
end
end

def on_record(time, record)
tag = extract_tag_from_record(record)
tag ||= @tag
time ||= extract_time_from_record(record) || Fluent::EventTime.now
router.emit(tag, time, record)
rescue => e
log.error "exec failed to emit", error: e, tag: tag, record: Yajl.dump(record)
log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e
router.emit_error_event(tag, time, record, e) if tag && time && record
end
end
end
Loading

0 comments on commit e3edf3e

Please sign in to comment.