Skip to content

Commit

Permalink
UPSTREAM: <carry>: *: add support for socket options
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
  • Loading branch information
hexfusion committed Mar 10, 2021
1 parent b5a0bda commit 9f64079
Show file tree
Hide file tree
Showing 18 changed files with 464 additions and 53 deletions.
5 changes: 5 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ type Config struct {
// before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts

// PreVote is true to enable Raft Pre-Vote.
// If enabled, Raft runs an additional election phase
// to check whether it would get enough votes to win
Expand Down Expand Up @@ -395,6 +398,8 @@ func NewConfig() *Config {
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout,

SocketOpts: transport.SocketOpts{},

TickMs: 100,
ElectionMs: 1000,
InitialElectionTickAdvance: true,
Expand Down
26 changes: 22 additions & 4 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
e = nil
}()

if !cfg.SocketOpts.Empty() {
cfg.logger.Info(
"configuring socket options",
zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
)
}

if e.cfg.logger != nil {
e.cfg.logger.Info(
"configuring peer listeners",
Expand Down Expand Up @@ -188,6 +196,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
Expand Down Expand Up @@ -516,7 +525,11 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
}
}
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
transport.WithTLSInfo(&cfg.PeerTLSInfo),
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -646,8 +659,10 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
oldctx.insecure = oldctx.insecure || sctx.insecure
continue
}

if sctx.l, err = net.Listen(network, addr); err != nil {
if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme,
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithSkipTLSInfoCheck(true),
); err != nil {
return nil, err
}
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
Expand Down Expand Up @@ -771,7 +786,10 @@ func (e *Etcd) serveMetrics() (err error) {
if murl.Scheme == "http" {
tlsInfo = nil
}
ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo)
ml, err := transport.NewListenerWithOpts(murl.Host, murl.Scheme,
transport.WithTLSInfo(tlsInfo),
transport.WithSocketOpts(&e.cfg.SocketOpts),
)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.ec.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
fs.DurationVar(&cfg.ec.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.ec.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
fs.DurationVar(&cfg.ec.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.ec.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.")
fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.")

// clustering
fs.Var(
Expand Down
4 changes: 4 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ Member:
Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
--grpc-keepalive-timeout '20s'
Additional duration of wait before closing a non-responsive connection (0 to disable).
--socket-reuse-port 'false'
Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.
--socket-reuse-address 'false'
Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in TIME_WAIT state.
Clustering:
--initial-advertise-peer-urls 'http://localhost:2380'
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/api/rafthttp/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
// NewListener returns a listener for raft message transfer between peers.
// It uses timeout listener to identify broken streams promptly.
func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
return transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
return transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(tlsinfo), transport.WithTimeout(ConnReadTimeout, ConnWriteTimeout))
}

// NewRoundTripper returns a roundTripper used to send requests
Expand Down
3 changes: 3 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type ServerConfig struct {
// PreVote is true to enable Raft Pre-Vote.
PreVote bool

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts

// Logger logs server-side operations.
// If not nil, it disables "capnslog" and uses the given logger.
Logger *zap.Logger
Expand Down
72 changes: 66 additions & 6 deletions pkg/transport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package transport

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
Expand All @@ -39,18 +40,77 @@ import (

// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
if l, err = newListener(addr, scheme); err != nil {
return nil, err
}
return wrapTLS(scheme, tlsinfo, l)
return newListener(addr, scheme, WithTLSInfo(tlsinfo))
}

// NewListenerWithOpts creates a new listener which accpets listener options.
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
return newListener(addr, scheme, opts...)
}

func newListener(addr string, scheme string) (net.Listener, error) {
func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
if scheme == "unix" || scheme == "unixs" {
// unix sockets via unix://laddr
return NewUnixListener(addr)
}
return net.Listen("tcp", addr)

lnOpts := newListenOpts(opts...)

switch {
case lnOpts.IsSocketOpts():
// new ListenConfig with socket options.
config, err := newListenConfig(lnOpts.socketOpts)
if err != nil {
return nil, err
}
lnOpts.ListenConfig = config
// check for timeout
fallthrough
case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():
// timeout listener with socket options.
ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr)
if err != nil {
return nil, err
}
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
case lnOpts.IsTimeout():
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
default:
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
lnOpts.Listener = ln
}

// only skip if not passing TLSInfo
if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() {
return lnOpts.Listener, nil
}
return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
}

func newListenConfig(sopts *SocketOpts) (net.ListenConfig, error) {
lc := net.ListenConfig{}
if sopts != nil {
ctls := getControls(sopts)
if len(ctls) > 0 {
lc.Control = ctls.Control
}
}
return lc, nil
}

func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
Expand Down
76 changes: 76 additions & 0 deletions pkg/transport/listener_opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package transport

import (
"net"
"time"
)

type ListenerOptions struct {
Listener net.Listener
ListenConfig net.ListenConfig

socketOpts *SocketOpts
tlsInfo *TLSInfo
skipTLSInfoCheck bool
writeTimeout time.Duration
readTimeout time.Duration
}

func newListenOpts(opts ...ListenerOption) *ListenerOptions {
lnOpts := &ListenerOptions{}
lnOpts.applyOpts(opts)
return lnOpts
}

func (lo *ListenerOptions) applyOpts(opts []ListenerOption) {
for _, opt := range opts {
opt(lo)
}
}

// IsTimeout returns true if the listener has a read/write timeout defined.
func (lo *ListenerOptions) IsTimeout() bool { return lo.readTimeout != 0 || lo.writeTimeout != 0 }

// IsSocketOpts returns true if the listener options includes socket options.
func (lo *ListenerOptions) IsSocketOpts() bool {
if lo.socketOpts == nil {
return false
}
return lo.socketOpts.ReusePort == true || lo.socketOpts.ReuseAddress == true
}

// IsTLS returns true if listner options includes TLSInfo.
func (lo *ListenerOptions) IsTLS() bool {
if lo.tlsInfo == nil {
return false
}
return lo.tlsInfo.Empty() == false
}

// ListenerOption are options which can be applied to the listener.
type ListenerOption func(*ListenerOptions)

// WithTimeout allows for a read or write timeout to be applied to the listener.
func WithTimeout(read, write time.Duration) ListenerOption {
return func(lo *ListenerOptions) {
lo.writeTimeout = write
lo.readTimeout = read
}
}

// WithSocketOpts defines socket options that will be applied to the listener.
func WithSocketOpts(s *SocketOpts) ListenerOption {
return func(lo *ListenerOptions) { lo.socketOpts = s }
}

// WithTLSInfo adds TLS credentials to the listener.
func WithTLSInfo(t *TLSInfo) ListenerOption {
return func(lo *ListenerOptions) { lo.tlsInfo = t }
}

// WithSkipTLSInfoCheck when true a transport can be created with an https scheme
// without passing TLSInfo, circumventing not presented error. Skipping this check
// also requires that TLSInfo is not passed.
func WithSkipTLSInfoCheck(skip bool) ListenerOption {
return func(lo *ListenerOptions) { lo.skipTLSInfoCheck = skip }
}
Loading

0 comments on commit 9f64079

Please sign in to comment.