Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix keepalive and shared key bug #2456

Merged
merged 2 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,7 @@ def standby?
end

def verify_connection
connect do |sock|
ri = RequestInfo.new(@sender.security ? :helo : :established)
connect do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
Expand Down Expand Up @@ -810,11 +809,6 @@ def establish_connection(sock, ri)
end

def send_data_actual(sock, tag, chunk)
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
end

unless available?
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
Expand All @@ -841,7 +835,10 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
sock = connect
sock, ri = connect
if ri.state != :established
establish_connection(sock, ri)
end

begin
send_data_actual(sock, tag, chunk)
Expand Down Expand Up @@ -1073,26 +1070,36 @@ def on_read(sock, ri, data)
private

def connect(host = nil)
sock = if @keepalive
@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
else
@log.debug('connect new socket')
@sender.create_transfer_socket(host || resolved_host, port, @hostname)
end
socket, request_info =
if @keepalive
ri = RequestInfo.new(:established)
sock = @socket_cache.fetch_or do
s = @sender.create_transfer_socket(host || resolved_host, port, @hostname)
ri = RequestInfo.new(@sender.security ? :helo : :established) # overwrite if new connection
s
end
[sock, ri]
else
@log.debug('connect new socket')
[@sender.create_transfer_socket(host || resolved_host, port, @hostname), RequestInfo.new(@sender.security ? :helo : :established)]
end

if block_given?
ret = nil
begin
yield(sock)
ret = yield(socket, request_info)
rescue
@socket_cache.revoke if @keepalive
raise
else
@socket_cache.dec_ref if @keepalive
ensure
sock.close unless @keepalive
socket.close unless @keepalive
end

ret
else
sock
[socket, request_info]
end
end
end
Expand Down
50 changes: 49 additions & 1 deletion test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,55 @@ def read_ack_from_sock(sock, unpacker)
assert_equal(['test', time, records[1]], events[1])
end

test 'authentication_with_user_auth' do
test 'keepalive + shared_key' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
shared_key fluentd-sharedkey
</security>
]
target_input_driver = create_target_input_driver(conf: input_conf)

output_conf = %[
send_timeout 51
keepalive true
<security>
self_hostname localhost
shared_key fluentd-sharedkey
</security>
<server>
name test
host #{TARGET_HOST}
port #{TARGET_PORT}
</server>
]
@d = d = create_driver(output_conf)

time = event_time('2011-01-02 13:14:15 UTC')
records = [{ 'a' => 1 }, { 'a' => 2 }]
records2 = [{ 'b' => 1}, { 'b' => 2}]
target_input_driver.run(expect_records: 4, timeout: 15) do
d.run(default_tag: 'test') do
records.each do |record|
d.feed(time, record)
end

d.flush # emit buffer to reuse same socket later
records2.each do |record|
d.feed(time, record)
end
end
end

events = target_input_driver.events
assert{ events != [] }
assert_equal(['test', time, records[0]], events[0])
assert_equal(['test', time, records[1]], events[1])
assert_equal(['test', time, records2[0]], events[2])
assert_equal(['test', time, records2[1]], events[3])
end

test 'authentication_with_user_auth' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
Expand Down