diff --git a/.travis.yml b/.travis.yml index a50fc73..acf7ac5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,8 @@ import: -- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file + - logstash-plugins/.ci:travis/defaults.yml@1.x + - logstash-plugins/.ci:travis/exec.yml@1.x + +env: + jobs: + - ELASTIC_STACK_VERSION=8.x + - SNAPSHOT=true ELASTIC_STACK_VERSION=8.x \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 25f6ebb..21aab5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 6.1.0 + - Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47) + - Fix: close server and client sockets on plugin close + ## 6.0.2 - Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 4df55a2..b5fd844 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -45,6 +45,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |a valid filesystem path|No | <> |<>|No +| <> |<>|No | <> |<>|No |======================================================================= @@ -130,6 +131,20 @@ SSL key path SSL key passphrase +[id="plugins-{type}s-{plugin}-ssl_supported_protocols"] +===== `ssl_supported_protocols` + + * Value type is <> + * Allowed values are: `'TLSv1.1'`, `'TLSv1.2'`, `'TLSv1.3'` + * Default depends on the JDK being used. With up-to-date Logstash, the default is `['TLSv1.2', 'TLSv1.3']`. + `'TLSv1.1'` is not considered secure and is only provided for legacy applications. + +List of allowed SSL/TLS versions to use when establishing a secure connection. + +NOTE: If you configure the plugin to use `'TLSv1.1'` on any recent JVM, such as the one packaged with Logstash, +the protocol is disabled by default and needs to be enabled manually by changing `jdk.tls.disabledAlgorithms` in +the *$JDK_HOME/conf/security/java.security* configuration file. That is, `TLSv1.1` needs to be removed from the list. + [id="plugins-{type}s-{plugin}-ssl_verify"] ===== `ssl_verify` diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 3b06465..77cb1ab 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -51,38 +51,43 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil + # NOTE: the default setting [] uses SSL engine defaults + config :ssl_supported_protocols, :validate => ['TLSv1.1', 'TLSv1.2', 'TLSv1.3'], :default => [], :list => true + class Client - public + def initialize(socket, logger) @socket = socket @logger = logger @queue = Queue.new end - public def run loop do begin @socket.write(@queue.pop) rescue => e - @logger.warn("tcp output exception", :socket => @socket, - :exception => e) + log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil) break end end end # def run - public def write(msg) @queue.push(msg) end # def write + + def close + @socket.close + rescue => e + log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) + end end # class Client - private def setup_ssl require "openssl" - @ssl_context = OpenSSL::SSL::SSLContext.new + @ssl_context = new_ssl_context if @ssl_cert @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) if @ssl_key @@ -104,50 +109,74 @@ def setup_ssl @ssl_context.cert_store = @cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end - end # def setup_ssl - public + @ssl_context.min_version = :TLS1_1 # not strictly required - JVM should have disabled TLSv1 + if ssl_supported_protocols.any? + disabled_protocols = ['TLSv1.1', 'TLSv1.2', 'TLSv1.3'] - ssl_supported_protocols + unless OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 # work-around JRuby-OpenSSL bug - missing constant + @ssl_context.max_version = :TLS1_2 if disabled_protocols.delete('TLSv1.3') + end + # mapping 'TLSv1.2' -> OpenSSL::SSL::OP_NO_TLSv1_2 + disabled_protocols.map! { |v| OpenSSL::SSL.const_get "OP_NO_#{v.sub('.', '_')}" } + @ssl_context.options = disabled_protocols.reduce(@ssl_context.options, :|) + end + @ssl_context + end + private :setup_ssl + + # @note to be able to hook up into #ssl_context from tests + def new_ssl_context + OpenSSL::SSL::SSLContext.new + end + private :new_ssl_context + + # @overload Base#register def register require "socket" require "stud/try" - if @ssl_enable - setup_ssl - end # @ssl_enable + @closed = Concurrent::AtomicBoolean.new(false) + setup_ssl if @ssl_enable if server? @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") begin @server_socket = TCPServer.new(@host, @port) rescue Errno::EADDRINUSE - @logger.error("Could not start TCP server: Address in use", - :host => @host, :port => @port) + @logger.error("Could not start tcp server: Address in use", host: @host, port: @port) raise end if @ssl_enable @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) end # @ssl_enable - @client_threads = [] + @client_threads = Concurrent::Array.new @accept_thread = Thread.new(@server_socket) do |server_socket| + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") loop do - Thread.start(server_socket.accept) do |client_socket| + break if @closed.value + client_socket = server_socket.accept_nonblock exception: false + if client_socket == :wait_readable + IO.select [ server_socket ] + next + end + Thread.start(client_socket) do |client_socket| # monkeypatch a 'peer' method onto the socket. client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("Accepted connection", :client => client_socket.peer, - :server => "#{@host}:#{@port}") + @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") client = Client.new(client_socket, @logger) Thread.current[:client] = client + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}") @client_threads << Thread.current - client.run + client.run unless @closed.value end end end @codec.on_event do |event, payload| + @client_threads.select!(&:alive?) @client_threads.each do |client_thread| client_thread[:client].write(payload) end - @client_threads.reject! {|t| !t.alive? } end else client_socket = nil @@ -163,8 +192,7 @@ def register # Now send the payload client_socket.syswrite(payload) if w.any? rescue => e - @logger.warn("tcp output exception", :host => @host, :port => @port, - :exception => e, :backtrace => e.backtrace) + log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil) client_socket.close rescue nil client_socket = nil sleep @reconnect_interval @@ -172,9 +200,27 @@ def register end end end - end # def register + end + + # @overload Base#receive + def receive(event) + @codec.encode(event) + end + + # @overload Base#close + def close + @closed.make_true + @server_socket.close rescue nil if @server_socket + + return unless @client_threads + @client_threads.each do |thread| + client = thread[:client] + client.close rescue nil if client + end + end private + def connect begin client_socket = TCPSocket.new(@host, @port) @@ -183,29 +229,40 @@ def connect begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle - @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) + log_error 'connect ssl failure:', ssle, backtrace: false # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) raise end end client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("Opened connection", :client => "#{client_socket.peer}") + @logger.debug("opened connection", :client => client_socket.peer) return client_socket - rescue StandardError => e - @logger.error("Failed to connect: #{e.message}", :exception => e.class, :backtrace => e.backtrace) + rescue => e + log_error 'failed to connect:', e sleep @reconnect_interval retry end end # def connect - private def server? @mode == "server" end # def server? - public - def receive(event) - @codec.encode(event) - end # def receive + def pipeline_id + execution_context.pipeline_id || 'main' + end + + def log_warn(msg, e, backtrace: @logger.debug?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.warn(msg, details) + end + + def log_error(msg, e, backtrace: @logger.info?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.error(msg, details) + end + end # class LogStash::Outputs::Tcp diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index b65fe8d..14be8f9 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-tcp' - s.version = '6.0.2' + s.version = '6.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events over a TCP socket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -21,11 +21,14 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - + s.add_runtime_dependency 'logstash-core', '>= 8.1.0' s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'stud' + s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3 + s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'logstash-codec-plain' s.add_development_dependency 'flores' end diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index d5494b8..0708cbf 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -4,13 +4,112 @@ describe LogStash::Outputs::Tcp do subject { described_class.new(config) } - let(:config) { { - "host" => "localhost", - "port" => 2000 + rand(3000), - } } + + let(:port) do + begin + # Start high to better avoid common services + port = rand(10000..65535) + s = TCPServer.new("127.0.0.1", port) + s.close + + port + rescue Errno::EADDRINUSE + retry + end + end + + let(:server) { TCPServer.new("127.0.0.1", port) } + + let(:config) { { "host" => "localhost", "port" => port } } + + let(:event) { LogStash::Event.new('message' => 'foo bar') } + + context 'failing to connect' do + + before { subject.register } + + let(:config) { super().merge 'port' => 1000 } + + it 'fails to connect' do + expect( subject ).to receive(:log_error).and_call_original + Thread.start { subject.receive(event) } + sleep 1.0 + end + + end + + context 'server mode' do + + before { subject.register } + + let(:config) { super().merge 'mode' => 'server' } + + let(:client) do + Stud::try(3.times) { TCPSocket.new("127.0.0.1", port) } + end + + after { subject.close } + + it 'receives serialized data' do; require 'json' + client # connect + Thread.start { sleep 0.5; subject.receive event } + + read = client.recv(1000) + expect( read.size ).to be > 0 + expect( JSON.parse(read)['message'] ).to eql 'foo bar' + end + + end + + context "with forced protocol" do + let(:config) do + super().merge 'ssl_supported_protocols' => [ 'TLSv1.1' ] + end + + it "limits protocol selection" do + if OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 + ssl_context = subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_3).to_not eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to_not eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + else + ssl_context = OpenSSL::SSL::SSLContext.new + allow(subject).to receive(:new_ssl_context).and_return(ssl_context) + expect(ssl_context).to receive(:max_version=).with(:'TLS1_2').and_call_original + ssl_context = subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to_not eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + end + end + end + + context "with protocol range" do + let(:config) do + super().merge 'ssl_supported_protocols' => [ 'TLSv1.3', 'TLSv1.1', 'TLSv1.2' ] + end + + it "does not limit protocol selection (except min_version)" do + ssl_context = OpenSSL::SSL::SSLContext.new + allow(subject).to receive(:new_ssl_context).and_return(ssl_context) + expect(ssl_context).to receive(:min_version=).with(:'TLS1_1').at_least(1).and_call_original + + if OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 + subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_3).to eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + else + subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + end + + subject.send :setup_ssl + end + end context "when enabling SSL" do - let(:config) { super().merge("ssl_enable" => true) } + let(:config) { super().merge("ssl_enable" => true, 'codec' => 'plain') } context "and not providing a certificate/key pair" do it "registers without error" do expect { subject.register }.to_not raise_error @@ -51,6 +150,62 @@ end end + + let(:secure_server) do + ssl_context = OpenSSL::SSL::SSLContext.new + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE + ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(crt_file)) + ssl_context.key = OpenSSL::PKey::RSA.new(File.read(key_file), nil) + ssl_context.ssl_version = server_ssl_version if server_ssl_version + ssl_context.min_version = server_min_version if server_min_version + ssl_context.max_version = server_max_version if server_max_version + OpenSSL::SSL::SSLServer.new(server, ssl_context) + end + + let(:server_min_version) { nil } + let(:server_max_version) { nil } + let(:server_ssl_version) { nil } + + context 'with supported protocol' do + + let(:config) { super().merge("ssl_supported_protocols" => ['TLSv1.2']) } + + let(:server_min_version) { 'TLS1_2' } + + before { subject.register } + after { secure_server.close } + + it 'reads plain data' do + Thread.start { sleep 0.25; subject.receive event } + socket = secure_server.accept + read = socket.sysread(100) + expect( read.size ).to be > 0 + expect( read ).to end_with 'foo bar' + end + + end + + context 'with unsupported protocol (on server)' do + + let(:config) { super().merge("ssl_supported_protocols" => ['TLSv1.1']) } + + let(:server_min_version) { 'TLS1_2' } + + before { subject.register } + after { secure_server.close } + + it 'fails (and loops retrying)' do + expect(subject.logger).to receive(:error).with(/connect ssl failure/i, hash_including(message: /No appropriate protocol/i)).and_call_original + expect(subject.logger).to receive(:error).with(/failed to connect/i, hash_including(exception: OpenSSL::SSL::SSLError)).and_call_original + expect(subject).to receive(:sleep).once.and_call_original + expect(subject).to receive(:sleep).once.and_throw :TEST_DONE # to be able to abort the retry loop + + Thread.start { secure_server.accept rescue nil } + expect { subject.receive event }.to throw_symbol(:TEST_DONE) + end + + end if LOGSTASH_VERSION > '7.0' + end context "encrypted key using PKCS#1" do