Skip to content

Commit

Permalink
nodes share socketmanager
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Jul 19, 2019
1 parent c520865 commit 6c604fc
Showing 1 changed file with 19 additions and 27 deletions.
46 changes: 19 additions & 27 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,28 +203,23 @@ def configure(conf)
end
end

socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@connection_manager = ConnectionManager.new(
log: @log,
secure: !!@security,
connection_factory: method(:create_transfer_socket),
socket_cache: socket_cache,
)

@servers.each do |server|
failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
name = server.name || "#{server.host}:#{server.port}"

socket_cache =
if @keepalive
SocketCache.new(@keepalive_timeout, @log)
else
nil
end
connection_manager = ConnectionManager.new(
log: @log,
secure: !!@security,
connection_factory: method(:create_transfer_socket),
socket_cache: socket_cache,
)

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: connection_manager)
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager)
else
node = Node.new(self, server, failure: failure, connection_manager: connection_manager)
node = Node.new(self, server, failure: failure, connection_manager: @connection_manager)
begin
node.validate_host_resolution!
rescue => e
Expand Down Expand Up @@ -315,12 +310,17 @@ def close
@usock.close rescue nil
end

if @keepalive && @keepalive_timeout
@nodes.each(&:clear)
end
super
end

def stop
super

if @keepalive
@connection_manager.stop
end
end

def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag
Expand Down Expand Up @@ -425,7 +425,7 @@ def on_heartbeat(sockaddr, msg)
end

def on_purge_obsolete_socks
@nodes.each(&:purge_obsolete_socks)
@connection_manager.purge_obsolete_socks
end

# return chunk id to be committed
Expand Down Expand Up @@ -670,14 +670,6 @@ def send_data(tag, chunk)
nil
end

def clear
@connection_manager.stop
end

def purge_obsolete_socks
@connection_manager.purge_obsolete_socks
end

def close(sock)
@connection_manager.close(sock)
end
Expand Down

0 comments on commit 6c604fc

Please sign in to comment.