Skip to content

Commit

Permalink
Add test for ack_handler
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Jul 26, 2019
1 parent 11a8232 commit 2adec10
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions test/plugin/out_forward/test_ack_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
require_relative '../../helper'
require 'fluent/test/driver/output'
require 'flexmock/test_unit'

require 'fluent/plugin/out_forward'
require 'fluent/plugin/out_forward/ack_handler'

class AckHandlerTest < Test::Unit::TestCase
data(
'chunk_id is matched' => [MessagePack.pack({ 'ack' => Base64.encode64('chunk_id 111') }), Fluent::Plugin::ForwardOutput::AckHandler::Result::SUCCESS],
'chunk_id is not matched' => [MessagePack.pack({ 'ack' => 'unmatched' }), Fluent::Plugin::ForwardOutput::AckHandler::Result::CHUNKID_UNMATCHED],
'chunk_id is empty' => ['', Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED],
)
test 'returns chunk_id, node, sock and result status' do |args|
receved, state = args
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(10, log: $log, read_length: 100)
r, w = IO.pipe

node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)

w.write(chunk_id)
mock(r).recv(anything) { |_| receved } # IO does not have recv
ack.enqueue(r)

a1 = a2 = a3 = a4 = nil
ack_handler.ack_reader(1) do |cid, n, s, ret|
# This block is rescued by ack_handler so it needs to invoke assetion outside of this block
a1 = cid; a2 = n; a3 = s; a4 = ret
end

assert_equal chunk_id, a1
assert_equal node, a2
assert_equal r, a3
assert_equal state, a4
ensure
r.close rescue nil
w.close rescue nil
end

test 'returns nil if raise an error' do
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(10, log: $log, read_length: 100)
r, w = IO.pipe

node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)

w.write(chunk_id)
mock(r).recv(anything) { |_| raise 'unexpected error' } # IO does not have recv
ack.enqueue(r)

a1 = a2 = a3 = a4 = nil
ack_handler.ack_reader(1) do |cid, n, s, ret|
# This block is rescued by ack_handler so it needs to invoke assetion outside of this block
a1 = cid; a2 = n; a3 = s; a4 = ret
end

assert_nil a1
assert_nil a2
assert_nil a3
assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4
ensure
r.close rescue nil
w.close rescue nil
end

test 'when ack is expired' do
ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(0, log: $log, read_length: 100)
r, w = IO.pipe

node = flexmock('node', host: '127.0.0.1', port: '1000') # for log
chunk_id = 'chunk_id 111'
ack = ack_handler.create_ack(chunk_id, node)

w.write(chunk_id)
mock(r).recv(anything).never
ack.enqueue(r)

a1 = a2 = a3 = a4 = nil
ack_handler.ack_reader(1) do |cid, n, s, ret|
# This block is rescued by ack_handler so it needs to invoke assetion outside of this block
a1 = cid; a2 = n; a3 = s; a4 = ret
end

assert_equal chunk_id, a1
assert_equal node, a2
assert_equal r, a3
assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4
ensure
r.close rescue nil
w.close rescue nil
end
end

0 comments on commit 2adec10

Please sign in to comment.