diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index be10120e03..05e7d4e96d 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -203,28 +203,23 @@ def configure(conf) end end + socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil + @connection_manager = ConnectionManager.new( + log: @log, + secure: !!@security, + connection_factory: method(:create_transfer_socket), + socket_cache: socket_cache, + ) + @servers.each do |server| failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) name = server.name || "#{server.host}:#{server.port}" - socket_cache = - if @keepalive - SocketCache.new(@keepalive_timeout, @log) - else - nil - end - connection_manager = ConnectionManager.new( - log: @log, - secure: !!@security, - connection_factory: method(:create_transfer_socket), - socket_cache: socket_cache, - ) - log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id if @heartbeat_type == :none - @nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: connection_manager) + @nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager) else - node = Node.new(self, server, failure: failure, connection_manager: connection_manager) + node = Node.new(self, server, failure: failure, connection_manager: @connection_manager) begin node.validate_host_resolution! rescue => e @@ -315,12 +310,17 @@ def close @usock.close rescue nil end - if @keepalive && @keepalive_timeout - @nodes.each(&:clear) - end super end + def stop + super + + if @keepalive + @connection_manager.stop + end + end + def write(chunk) return if chunk.empty? tag = chunk.metadata.tag @@ -425,7 +425,7 @@ def on_heartbeat(sockaddr, msg) end def on_purge_obsolete_socks - @nodes.each(&:purge_obsolete_socks) + @connection_manager.purge_obsolete_socks end # return chunk id to be committed @@ -670,14 +670,6 @@ def send_data(tag, chunk) nil end - def clear - @connection_manager.stop - end - - def purge_obsolete_socks - @connection_manager.purge_obsolete_socks - end - def close(sock) @connection_manager.close(sock) end