Skip to content

Commit

Permalink
refactor plugin shutdown
Browse files Browse the repository at this point in the history
comments, fixes and close

new specs styles

spec helper for input

revert data_timeout

comment rescue nil
  • Loading branch information
colinsurprenant committed Sep 26, 2015
1 parent a126989 commit 957848d
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 140 deletions.
245 changes: 132 additions & 113 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/util/socket_peer"

# Read events over a TCP socket.
Expand All @@ -22,11 +21,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
# When mode is `client`, the port to connect to.
config :port, :validate => :number, :required => true

# The 'read' timeout in seconds. If a particular tcp connection is idle for
# more than this timeout period, we will assume it is dead and close it.
#
# If you never want to timeout, use -1.
config :data_timeout, :validate => :number, :default => -1
config :data_timeout, :validate => :number, :default => -1, :deprecated => "This setting is not used by this plugin. It will be removed soon."

# Mode to operate in. `server` listens for client connections,
# `client` connects to a server.
Expand All @@ -53,12 +48,14 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base

def initialize(*args)
super(*args)
@interrupted = false

# threadsafe socket bookkeeping
@server_socket = nil
@client_socket = nil
end # def initialize
@connection_sockets = {}
@socket_mutex = Mutex.new
end

public
def register
require "socket"
require "timeout"
Expand Down Expand Up @@ -86,49 +83,127 @@ def register
@ssl_context.cert_store = @cert_store
@ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
end
end # @ssl_enable
end

# note that since we are opening a socket in register, we must also make sure we close it
# in the close method even if we also close it in the stop method since we could have
# a situation where register is called but not run & stop.

if server?
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
begin
@server_socket = TCPServer.new(@host, @port)
set_server_socket(TCPServer.new(@host, @port))
rescue Errno::EADDRINUSE
@logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
raise
end
if @ssl_enable
@server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
end # @ssl_enable

set_server_socket(OpenSSL::SSL::SSLServer.new(server_socket, @ssl_context)) if @ssl_enable
end
end

def run(output_queue)
if server?
run_server(output_queue)
else
run_client(output_queue)
end
end # def register
end

def stop
# force close all sockets which will escape any blocking read with a IO exception
# and any thread using them will exit.
# catch all rescue nil on close to discard any close errors or invalid socket
server_socket.close rescue nil
client_socket.close rescue nil
connection_sockets.each{|socket| socket.close rescue nil}
end

def close
# see related comment in register: we must make sure to close the server socket here
# because it is created in the register method and we could be in the context of having
# register called but never run & stop, only close.
# catch all rescue nil on close to discard any close errors or invalid socket
server_socket.close rescue nil
end

private
def handle_socket(socket, client_address, output_queue, codec)
while !@interrupted
buf = nil
# NOTE(petef): the timeout only hits after the line is read or socket dies
# TODO(sissel): Why do we have a Timeout here? What's the point?
if @data_timeout == -1
buf = read(socket)
else
Timeout::timeout(@data_timeout) do
buf = read(socket)

def run_server(output_queue)
while !stop?
begin
socket = add_connection_socket(server_socket.accept)
# start a new thread for each connection.
server_connection_thread(output_queue, socket)
rescue OpenSSL::SSL::SSLError => e
@logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
rescue
# if this exception occured while the plugin is stopping
# just ignore and exit
raise unless stop?
end
end
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
server_socket.close rescue nil
end

def run_client(output_queue)
while !stop?
set_client_socket(TCPSocket.new(@host, @port))

if @ssl_enable
set_client_socket(OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context))
begin
client_socket.connect
rescue OpenSSL::SSL::SSLError => e
@logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
sleep(1) # prevent hammering peer
next
rescue
# if this exception occured while the plugin is stopping
# just ignore and exit
raise unless stop?
end
end
codec.decode(buf) do |event|

@logger.debug? && @logger.debug("Opened connection", :client => "#{client_socket.peer}")
handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone)
end
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
client_socket.close rescue nil
end

def server_connection_thread(output_queue, socket)
Thread.new(output_queue, socket) do |q, s|
begin
@logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
handle_socket(s, s.peeraddr[3], q, @codec.clone)
ensure
delete_connection_socket(s)
end
end
end

def handle_socket(socket, client_address, output_queue, codec)
while !stop?
codec.decode(read(socket)) do |event|
event["host"] ||= client_address
event["sslsubject"] ||= socket.peer_cert.subject if @ssl_enable && @ssl_verify
decorate(event)
output_queue << event
end
end # loop
end
rescue EOFError
@logger.debug? && @logger.debug("Connection closed", :client => socket.peer)
rescue Errno::ECONNRESET
@logger.debug? && @logger.debug("Connection reset by peer", :client => socket.peer)
rescue => e
@logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace)
# if plugin is stopping, don't bother logging it as an error
@logger.error("An error occurred. Closing connection", :client => socket.peer, :exception => e, :backtrace => e.backtrace) unless stop?
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
socket.close rescue nil

codec.respond_to?(:flush) && codec.flush do |event|
Expand All @@ -139,98 +214,42 @@ def handle_socket(socket, client_address, output_queue, codec)
end
end

private
def client_thread(output_queue, socket)
Thread.new(output_queue, socket) do |q, s|
begin
@logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
handle_socket(s, s.peeraddr[3], q, @codec.clone)
rescue LogStash::ShutdownSignal
@interrupted = true
s.close rescue nil
ensure
@client_threads_lock.synchronize{@client_threads.delete(Thread.current)}
end
end
end

private
def server?
@mode == "server"
end # def server?
end

private
def read(socket)
socket.sysread(16384)
end # def readline
end

public
def run(output_queue)
if server?
run_server(output_queue)
else
run_client(output_queue)
end
end # def run
# threadsafe sockets bookkeeping

def run_server(output_queue)
@client_threads = []
@client_threads_lock = Mutex.new
def set_client_socket(socket)
@socket_mutex.synchronize{@client_socket = socket}
end

while !@interrupted
begin
socket = @server_socket.accept
# start a new thread for each connection.
@client_threads_lock.synchronize{@client_threads << client_thread(output_queue, socket)}
rescue OpenSSL::SSL::SSLError => ssle
# NOTE(mrichar1): This doesn't return a useful error message for some reason
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
rescue IOError
raise unless @interrupted
end
end
rescue LogStash::ShutdownSignal
@interrupted = true
ensure
@server_socket.close rescue nil
def client_socket
@socket_mutex.synchronize{@client_socket}
end

threads = @client_threads_lock.synchronize{@client_threads.dup}
threads.each do |thread|
thread.raise(LogStash::ShutdownSignal) if thread.alive?
end
end # def run_server
def set_server_socket(socket)
@socket_mutex.synchronize{@server_socket = socket}
end

def run_client(output_queue)
while !@interrupted
@client_socket = TCPSocket.new(@host, @port)
if @ssl_enable
@client_socket = OpenSSL::SSL::SSLSocket.new(@client_socket, @ssl_context)
begin
@client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
next
end
end
@logger.debug? && @logger.debug("Opened connection", :client => "#{@client_socket.peer}")
handle_socket(@client_socket, @client_socket.peeraddr[3], output_queue, @codec.clone)
end # loop
ensure
@client_socket.close rescue nil
end # def run

public
def teardown
@interrupted = true
if @server_socket
@server_socket.close rescue nil
@server_socket = nil
end
if @client_socket
@client_socket.close rescue nil
@client_socket = nil
end
end # def teardown
end # class LogStash::Inputs::Tcp
def server_socket
@socket_mutex.synchronize{@server_socket}
end

def add_connection_socket(socket)
@socket_mutex.synchronize{@connection_sockets[socket] = true}
socket
end

def delete_connection_socket(socket)
@socket_mutex.synchronize{@connection_sockets.delete(socket)}
end

def connection_sockets
@socket_mutex.synchronize{@connection_sockets.keys.dup}
end
end
Loading

0 comments on commit 957848d

Please sign in to comment.