From 7a731143fcfefae97f715ea8141d893e88247402 Mon Sep 17 00:00:00 2001 From: Shibo Wang Date: Wed, 26 May 2021 13:51:29 -0700 Subject: [PATCH] Add support for port reuse by implementing multiplexing listener --- config/configgrpc/configgrpc.go | 43 +++++++++++++++++++++++++++- config/configgrpc/configgrpc_test.go | 24 ++++++++++++++++ config/confighttp/confighttp.go | 28 +++++++++++++++++- config/confighttp/confighttp_test.go | 21 ++++++++++++++ 4 files changed, 114 insertions(+), 2 deletions(-) diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 6577cd7c8a2..fed76fd9868 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -165,6 +165,10 @@ type GRPCServerSettings struct { // Auth for this receiver Auth *configauth.Authentication `mapstructure:"auth,omitempty"` + + // MuxListener is a multiplexing listener for the Endpoint of NetAddr. Through multiplexing, + // a single listener can support multiple sockets on the same port. + MuxListener net.Listener } // ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC @@ -237,7 +241,44 @@ func validateBalancerName(balancerName string) bool { // ToListener returns the net.Listener constructed from the settings. func (gss *GRPCServerSettings) ToListener() (net.Listener, error) { - return gss.NetAddr.Listen() + var addr net.Addr + var err error + + switch gss.NetAddr.Transport { + case "tcp", "tcp4", "tcp6": + addr, err = net.ResolveTCPAddr(gss.NetAddr.Transport, gss.NetAddr.Endpoint) + case "udp", "udp4", "udp6": + addr, err = net.ResolveUDPAddr(gss.NetAddr.Transport, gss.NetAddr.Endpoint) + case "ip", "ip4", "ip6": + addr, err = net.ResolveIPAddr(gss.NetAddr.Transport, gss.NetAddr.Endpoint) + case "unix", "unixgram", "unixpacket": + addr, err = net.ResolveUnixAddr(gss.NetAddr.Transport, gss.NetAddr.Endpoint) + default: + return nil, fmt.Errorf("unknown network protocol") + } + if err != nil { + return nil, err + } + + if gss.MuxListener != nil && addr.String() == gss.MuxListener.Addr().String() { + return gss.MuxListener, nil + } + + // if gss.MuxListener != nil { + // err = gss.MuxListener.Close() + // if err != nil { + // return nil, err + // } + // } + + listener, err := gss.NetAddr.Listen() + if err != nil { + return nil, err + } + + gss.MuxListener = listener + + return listener, nil } // ToServerOption maps configgrpc.GRPCServerSettings to a slice of server options for gRPC diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index bcab6164f36..f1a219198d0 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -536,3 +536,27 @@ func tempSocketName(t *testing.T) string { require.NoError(t, os.Remove(socket)) return socket } + +func TestGRPCPortReuse(t *testing.T) { + settings := GRPCServerSettings{ + NetAddr: confignet.NetAddr{ + Endpoint: "localhost: 2873", + Transport: "tcp", + }, + } + muxListener, err := settings.ToListener() + if err != nil { + t.Errorf("cannot initialize multiplexing listener") + } + + for i := 0; i < 100; i++ { + t.Run("", func(t *testing.T) { + _, err := settings.ToListener() + if err != nil { + t.Error("something went wrong, port cannot be reused") + } + assert.NotNil(t, settings.MuxListener) + assert.Equal(t, muxListener, settings.MuxListener) + }) + } +} diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 8bdd45315fe..b55da678bf4 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -123,15 +123,38 @@ type HTTPServerSettings struct { // CORS needs to be enabled first by providing a non-empty list in CorsOrigins // A wildcard (*) can be used to match any header. CorsHeaders []string `mapstructure:"cors_allowed_headers"` + + // MuxListener is a multiplexing listener for the Endpoint. Through multiplexing, + // a single listener can support multiple sockets on the same port. + MuxListener net.Listener } // ToListener creates a net.Listener. func (hss *HTTPServerSettings) ToListener() (net.Listener, error) { - listener, err := net.Listen("tcp", hss.Endpoint) + var listener net.Listener + var addr net.Addr + var err error + + addr, err = net.ResolveTCPAddr("tcp", hss.Endpoint) if err != nil { return nil, err } + if hss.MuxListener != nil && addr.String() == hss.MuxListener.Addr().String() { + listener = hss.MuxListener + } else { + // if hss.MuxListener != nil { + // err = hss.MuxListener.Close() + // if err != nil { + // return nil, err + // } + // } + listener, err = net.Listen("tcp", hss.Endpoint) + if err != nil { + return nil, err + } + } + if hss.TLSSetting != nil { var tlsCfg *tls.Config tlsCfg, err = hss.TLSSetting.LoadTLSConfig() @@ -140,6 +163,9 @@ func (hss *HTTPServerSettings) ToListener() (net.Listener, error) { } listener = tls.NewListener(listener, tlsCfg) } + + hss.MuxListener = listener + return listener, nil } diff --git a/config/confighttp/confighttp_test.go b/config/confighttp/confighttp_test.go index c2593bd3693..f436bbac13f 100644 --- a/config/confighttp/confighttp_test.go +++ b/config/confighttp/confighttp_test.go @@ -482,3 +482,24 @@ func TestHttpHeaders(t *testing.T) { }) } } + +func TestHTTPPortReuse(t *testing.T) { + settings := HTTPServerSettings{ + Endpoint: "localhost: 6489", + } + muxListener, err := settings.ToListener() + if err != nil { + t.Errorf("cannot initialize multiplexing listener") + } + + for i := 0; i < 100; i++ { + t.Run("", func(t *testing.T) { + _, err := settings.ToListener() + if err != nil { + t.Error("something went wrong, port cannot be reused") + } + assert.NotNil(t, settings.MuxListener) + assert.Equal(t, muxListener, settings.MuxListener) + }) + } +}