Skip to content

Commit

Permalink
If handshake was already done, Skip it on the same socket
Browse files Browse the repository at this point in the history
if the socket which already handshaked tries to handshake, it will be
blocked and raise IO::WaitReadable [here](https://github.com/fluent/fluentd/blob/12ffa0afe313737bf09900efce8c8656baaf77bc/lib/fluent/plugin/out_forward.rb#L787
).

Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Jun 13, 2019
1 parent 3cacaf5 commit 12f3380
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 18 deletions.
38 changes: 21 additions & 17 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,7 @@ def standby?
end

def verify_connection
connect do |sock|
ri = RequestInfo.new(@sender.security ? :helo : :established)
connect do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)
raise if ri.state != :established
Expand Down Expand Up @@ -810,11 +809,6 @@ def establish_connection(sock, ri)
end

def send_data_actual(sock, tag, chunk)
ri = RequestInfo.new(@sender.security ? :helo : :established)
if ri.state != :established
establish_connection(sock, ri)
end

unless available?
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
Expand All @@ -841,7 +835,10 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
sock = connect
sock, ri = connect
if ri.state != :established
establish_connection(sock, ri)
end

begin
send_data_actual(sock, tag, chunk)
Expand Down Expand Up @@ -1073,29 +1070,36 @@ def on_read(sock, ri, data)
private

def connect(host = nil)
sock = if @keepalive
@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
else
@log.debug('connect new socket')
@sender.create_transfer_socket(host || resolved_host, port, @hostname)
end
socket, request_info =
if @keepalive
ri = RequestInfo.new(:established)
sock = @socket_cache.fetch_or do
s = @sender.create_transfer_socket(host || resolved_host, port, @hostname)
ri = RequestInfo.new(@sender.security ? :helo : :established) # overwrite if new connection
s
end
[sock, ri]
else
@log.debug('connect new socket')
[@sender.create_transfer_socket(host || resolved_host, port, @hostname), RequestInfo.new(@sender.security ? :helo : :established)]
end

if block_given?
ret = nil
begin
ret = yield(sock)
ret = yield(socket, request_info)
rescue
@socket_cache.revoke if @keepalive
raise
else
@socket_cache.dec_ref if @keepalive
ensure
sock.close unless @keepalive
socket.close unless @keepalive
end

ret
else
sock
[socket, request_info]
end
end
end
Expand Down
50 changes: 49 additions & 1 deletion test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,55 @@ def read_ack_from_sock(sock, unpacker)
assert_equal(['test', time, records[1]], events[1])
end

test 'authentication_with_user_auth' do
test 'keepalive + shared_key' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
shared_key fluentd-sharedkey
</security>
]
target_input_driver = create_target_input_driver(conf: input_conf)

output_conf = %[
send_timeout 51
keepalive true
<security>
self_hostname localhost
shared_key fluentd-sharedkey
</security>
<server>
name test
host #{TARGET_HOST}
port #{TARGET_PORT}
</server>
]
@d = d = create_driver(output_conf)

time = event_time('2011-01-02 13:14:15 UTC')
records = [{ 'a' => 1 }, { 'a' => 2 }]
records2 = [{ 'b' => 1}, { 'b' => 2}]
target_input_driver.run(expect_records: 4, timeout: 15) do
d.run(default_tag: 'test') do
records.each do |record|
d.feed(time, record)
end

d.flush # emit buffer to reuse same socket later
records2.each do |record|
d.feed(time, record)
end
end
end

events = target_input_driver.events
assert{ events != [] }
assert_equal(['test', time, records[0]], events[0])
assert_equal(['test', time, records[1]], events[1])
assert_equal(['test', time, records2[0]], events[2])
assert_equal(['test', time, records2[1]], events[3])
end

test 'authentication_with_user_auth' do
input_conf = TARGET_CONFIG + %[
<security>
self_hostname in.localhost
Expand Down

0 comments on commit 12f3380

Please sign in to comment.