diff --git a/filebeat/_meta/common.reference.inputs.yml b/filebeat/_meta/common.reference.inputs.yml index 7fa0671c114..97b377f5f32 100644 --- a/filebeat/_meta/common.reference.inputs.yml +++ b/filebeat/_meta/common.reference.inputs.yml @@ -263,6 +263,9 @@ filebeat.inputs: # Maximum size in bytes of the message received over TCP #max_message_size: 20MiB + # Max number of concurrent connections, or 0 for no limit. Default: 0 + #max_connections: 0 + # The number of seconds of inactivity before a remote connection is closed. #timeout: 300s diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index aad973b1572..b7a8e30a358 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -640,6 +640,9 @@ filebeat.inputs: # Maximum size in bytes of the message received over TCP #max_message_size: 20MiB + # Max number of concurrent connections, or 0 for no limit. Default: 0 + #max_connections: 0 + # The number of seconds of inactivity before a remote connection is closed. #timeout: 300s diff --git a/filebeat/inputsource/tcp/config.go b/filebeat/inputsource/tcp/config.go index 6bbe9aa4783..f6348179627 100644 --- a/filebeat/inputsource/tcp/config.go +++ b/filebeat/inputsource/tcp/config.go @@ -35,6 +35,7 @@ type Config struct { Host string `config:"host"` Timeout time.Duration `config:"timeout" validate:"nonzero,positive"` MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"` + MaxConnections int `config:"max_connections"` TLS *tlscommon.ServerConfig `config:"ssl"` } diff --git a/filebeat/inputsource/tcp/server.go b/filebeat/inputsource/tcp/server.go index f45a8bb13fd..69d8ec10f93 100644 --- a/filebeat/inputsource/tcp/server.go +++ b/filebeat/inputsource/tcp/server.go @@ -25,6 +25,8 @@ import ( "net" "sync" + "golang.org/x/net/netutil" + "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" @@ -175,12 +177,26 @@ func (s *Server) allClients() []*client { } func (s *Server) createServer() (net.Listener, error) { + var l net.Listener + var err error if s.tlsConfig != nil { t := s.tlsConfig.BuildModuleConfig(s.config.Host) s.log.Info("Listening over TLS") - return tls.Listen("tcp", s.config.Host, t) + l, err = tls.Listen("tcp", s.config.Host, t) + if err != nil { + return nil, err + } + } else { + l, err = net.Listen("tcp", s.config.Host) + if err != nil { + return nil, err + } + } + + if s.config.MaxConnections > 0 { + return netutil.LimitListener(l, s.config.MaxConnections), nil } - return net.Listen("tcp", s.config.Host) + return l, nil } func (s *Server) clientsCount() int { diff --git a/vendor/golang.org/x/net/netutil/listen.go b/vendor/golang.org/x/net/netutil/listen.go new file mode 100644 index 00000000000..cee46e331ff --- /dev/null +++ b/vendor/golang.org/x/net/netutil/listen.go @@ -0,0 +1,74 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package netutil provides network utility functions, complementing the more +// common ones in the net package. +package netutil // import "golang.org/x/net/netutil" + +import ( + "net" + "sync" +) + +// LimitListener returns a Listener that accepts at most n simultaneous +// connections from the provided Listener. +func LimitListener(l net.Listener, n int) net.Listener { + return &limitListener{ + Listener: l, + sem: make(chan struct{}, n), + done: make(chan struct{}), + } +} + +type limitListener struct { + net.Listener + sem chan struct{} + closeOnce sync.Once // ensures the done chan is only closed once + done chan struct{} // no values sent; closed when Close is called +} + +// acquire acquires the limiting semaphore. Returns true if successfully +// accquired, false if the listener is closed and the semaphore is not +// acquired. +func (l *limitListener) acquire() bool { + select { + case <-l.done: + return false + case l.sem <- struct{}{}: + return true + } +} +func (l *limitListener) release() { <-l.sem } + +func (l *limitListener) Accept() (net.Conn, error) { + acquired := l.acquire() + // If the semaphore isn't acquired because the listener was closed, expect + // that this call to accept won't block, but immediately return an error. + c, err := l.Listener.Accept() + if err != nil { + if acquired { + l.release() + } + return nil, err + } + return &limitListenerConn{Conn: c, release: l.release}, nil +} + +func (l *limitListener) Close() error { + err := l.Listener.Close() + l.closeOnce.Do(func() { close(l.done) }) + return err +} + +type limitListenerConn struct { + net.Conn + releaseOnce sync.Once + release func() +} + +func (l *limitListenerConn) Close() error { + err := l.Conn.Close() + l.releaseOnce.Do(l.release) + return err +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 143025ed106..c8b9ff9ec9d 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -2345,6 +2345,12 @@ "version": "release-branch.go1.9", "versionExact": "release-branch.go1.9" }, + { + "checksumSHA1": "CkkyCjLPBm0OGzfMuTfaDsd45X4=", + "path": "golang.org/x/net/netutil", + "revision": "3b0461eec859c4b73bb64fdc8285971fd33e3938", + "revisionTime": "2019-06-20T19:42:36Z" + }, { "checksumSHA1": "QEm/dePZ0lOnyOs+m22KjXfJ/IU=", "path": "golang.org/x/net/proxy", diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index ff470413f24..7f7d0fb2a08 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -759,6 +759,9 @@ filebeat.inputs: # Maximum size in bytes of the message received over TCP #max_message_size: 20MiB + # Max number of concurrent connections, or 0 for no limit. Default: 0 + #max_connections: 0 + # The number of seconds of inactivity before a remote connection is closed. #timeout: 300s