Skip to content

Commit

Permalink
adding test drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed May 20, 2016
1 parent b0712e0 commit ea45c41
Show file tree
Hide file tree
Showing 6 changed files with 464 additions and 0 deletions.
217 changes: 217 additions & 0 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/test/driver/test_event_router'

require 'timeout'

module Fluent
module Test
module Driver
class Base
def initialize(klass, opts: {}, &block)
if klass.is_a?(Class)
if block
# Create new class for test w/ overwritten methods
# klass.dup is worse because its ancestors does NOT include original class name
klass = Class.new(klass)
klass.module_eval(&block)
end
@instance = klass.new
else
@instance = klass
end
if opts
@instance.system_config_override(opts)
end
@instance.log = TestLogger.new
@logs = @instance.log.out.logs

@run_post_conditions = []
@run_breaking_conditions = []

@broken = false

@event_streams = nil
@error_events = nil
end

attr_reader :instance, :logs

def configure(conf, syntax: :v1)
if str.is_a?(Fluent::Config::Element)
@config = str
else
@config = Config.parse(str, "(test)", "(test_dir)", syntax: syntax)
end

if @instance.respond_to?(:router=)
@event_streams = []
@error_events = []

driver = self
mojule = Module.new do
define_method(:event_emitter_router) do |label_name|
TestEventRouter.new(driver)
end
end
@instance.singleton_class.module_eval do
prepend mojule
end
end

@instance.configure(@config)
self
end

def end_if(&block)
raise ArgumentError, "block is not given" unless block_given?
@run_post_conditions << block
end

def break_if(&block)
raise ArgumentError, "block is not given" unless block_given?
@run_breaking_conditions << block
end

def broken?
@broken
end

Emit = Struct.new(:tag, :es)
ErrorEvent = Struct.new(:tag, :time, :record, :error)

# via TestEventRouter
def emit_event_stream(tag, es)
@event_streams << Emit.new(tag, es)
end

def emit_error_event(tag, time, record, error)
@error_events << ErrorEvent.new(tag, time, record, error)
end

def events(tag: nil)
selected = @event_streams.select{|e| tag.nil? ? true : e.tag == tag }
if block_given?
selected.each do |e|
e.es.each do |time, record|
yield e.tag, time, record
end
end
else
list = []
selected.each do |e|
e.es.each do |time, record|
list << [e.tag, time, record]
end
end
list
end
end

def error_events(tag: nil)
selected = @error_events.select{|e| tag.nil? ? true : e.tag == tag }
if block_given?
selected.each do |e|
yield e.tag, e.time, e.record, e.error
end
else
selected.map{|e| [e.tag, e.time, e.record, e.error] }
end
end

def run(expect_emits: nil, expect_records: nil, timeout: nil, start: true, shutdown: true, &block)
if start
@instance.start unless @instance.started?
end

begin
run_actual(expect_emits: expect_emits, expect_records: expect_records, timeout: timeout, &block)
ensure
if shutdown
@instance.stop unless @instance.stopped?
@instance.before_shutdown? unless @instance.before_shutdown?
@instance.shutdown unless @instance.shutdown?
@instance.after_shutdown? unless @instance.after_shutdown?
@instance.close unless @instance.closed?
@instance.terminate unless @instance.terminated?
end
end
end

def run_actual(expect_emits: nil, expect_records: nil, timeout: nil, &block)
if @instance.respond_to?(:_threads)
until @instance._threads.values.all?(&:alive?)
sleep 0.01
end
end

if @instance.respond_to?(:event_loop_running?)
until @instance.event_loop_running?
sleep 0.01
end
end

if expect_emits
@run_post_conditions << ->(){ @emit_streams.size >= expect_emits }
end
if expect_records
@run_post_conditions << ->(){ @emit_streams.reduce(0){|a, e| a + e.es.size } >= expected_records }
end
if timeout
stop_at = Time.now + timeout
@run_breaking_conditions << ->(){ Time.now >= stop_at }
end

if !block_given && @run_post_conditions.empty?
raise ArgumentError, "no stop conditions nor block specified"
end

if !block_given
block = ->(){ sleep 0.1 until stop? }
end

if timeout
begin
Timeout.timeout(timeout * 1.1, &block)
rescue Timeout::Error
@broken = true
end
else
block.call
end
end

def stop?
# Should stop running if post conditions are not registered.
return true unless @run_post_conditions

# Should stop running if all of the post conditions are true.
return true if @run_post_conditions.all? {|proc| proc.call }

# Should stop running if some of the breaking conditions is true.
# In this case, some post conditions may be not true.
if @run_breaking_conditions.any? {|proc| proc.call }
@broken = true
return true
end

false
end
end
end
end
end
98 changes: 98 additions & 0 deletions lib/fluent/test/driver/event_feeder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/event'
require 'fluent/time'

module Fluent
module Test
module Driver
module EventFeeder
def initialize(klass, opts: {}, &block)
super
@default_tag = nil
@feed_method = nil
end

def run(default_tag: nil, expect_emits: nil, expect_records: nil, timeout: nil, start: true, shutdown: true, &block)
@feed_method = if @instance.respond_to?(:filter_stream)
:filter_stream
else
:emit_events
end
if default_tag
@default_tag = default_tag
end
super(expect_emits: expect_emits, expect_records: expect_records, timeout: timeout, start: start, shutdown: shutdown, &block)
end

def feed_to_plugin(tag, es)
@instance.__send__(@feed_method, tag, es)
end

# d.run do
# d.feed('tag', time, {record})
# d.feed('tag', [ [time, {record}], [time, {record}], ... ])
# d.feed('tag', es)
# end
# d.run(default_tag: 'tag') do
# d.feed({record})
# d.feed(time, {record})
# d.feed([ [time, {record}], [time, {record}], ... ])
# d.feed(es)
# end
def feed(*args)
case args.size
when 1
raise ArgumentError, "tag not specified without default_tag" unless @default_tag
case args.first
when Fluent::EventStream
@instance.emit_events(@default_tag, args.first)
when Array
@instance.emit_events(@default_tag, ArrayEventStream.new(args.first))
when Hash
record = args.first
time = Fluent::EventTime.now
@instance.emit_events(@default_tag, OneEventStream.new(time, record))
else
raise ArgumentError, "unexpected events object (neither event(Hash), EventStream nor Array): #{args.first.class}"
end
when 2
if args[0].is_a?(String) && (args[1].is_a?(Array) || args[1].is_a?(Fluent::EventStream))
tag, es = args
es = ArrayEventStream.new(es) if es.is_a?(Array)
@instance.emit_events(tag, es)
elsif @default_tag && (args[0].is_a?(Fluent::EventTime) || args[0].is_a?(Integer)) && args[1].is_a?(Hash)
time, record = args
@instance.emit_events(@default_tag, OneEventStream.new(time, record))
else
raise ArgumentError, "unexpected values of argument: #{args[0].class}, #{args[1].class}"
end
when 3
tag, time, record = args
if tag.is_a?(String) && (time.is_a?(Fluent::EventTime) || time.is_a?(Integer)) && record.is_a?(Hash)
@instance.emit_events(tag, OneEventStream.new(time, record))
else
raise ArgumentError, "unexpected values of argument: #{tag.class}, #{time.class}, #{record.class}"
end
else
raise ArgumentError, "unexpected number of arguments: #{args}"
end
end
end
end
end
end
35 changes: 35 additions & 0 deletions lib/fluent/test/driver/filter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/test/driver/base'
require 'fluent/test/driver/event_feeder'

require 'fluent/plugin/filter'

module Fluent
module Test
module Driver
class Filter < Base
include EventFeeder

def initialize(klass, opts: {}, &block)
super
raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Filter" unless @instance.is_a? Fluent::Plugin::Filter
end
end
end
end
end
31 changes: 31 additions & 0 deletions lib/fluent/test/driver/input.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/test/driver/base'
require 'fluent/plugin/input'

module Fluent
module Test
module Driver
class Input < Base
def initialize(klass, opts: {}, &block)
super
raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Input" unless @instance.is_a? Fluent::Plugin::Input
end
end
end
end
end
Loading

0 comments on commit ea45c41

Please sign in to comment.