Skip to content

Commit

Permalink
Merge pull request #270 from jveski/fix-failed-conn-leak
Browse files Browse the repository at this point in the history
Fix obscure HTTP CONNECT goroutine leak
  • Loading branch information
k8s-ci-robot authored Oct 18, 2021
2 parents 3716172 + 8b141c7 commit 4e8b12d
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 9 deletions.
17 changes: 17 additions & 0 deletions pkg/server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ServerMetrics struct {
latencies *prometheus.HistogramVec
frontendLatencies *prometheus.HistogramVec
connections *prometheus.GaugeVec
httpConnections prometheus.Gauge
backend *prometheus.GaugeVec
pendingDials *prometheus.GaugeVec
}
Expand Down Expand Up @@ -83,6 +84,14 @@ func newServerMetrics() *ServerMetrics {
"service_method",
},
)
httpConnections := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "http_connections",
Help: "Number of current HTTP CONNECT connections",
},
)
backend := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Expand All @@ -105,12 +114,14 @@ func newServerMetrics() *ServerMetrics {
prometheus.MustRegister(latencies)
prometheus.MustRegister(frontendLatencies)
prometheus.MustRegister(connections)
prometheus.MustRegister(httpConnections)
prometheus.MustRegister(backend)
prometheus.MustRegister(pendingDials)
return &ServerMetrics{
latencies: latencies,
frontendLatencies: frontendLatencies,
connections: connections,
httpConnections: httpConnections,
backend: backend,
pendingDials: pendingDials,
}
Expand Down Expand Up @@ -142,6 +153,12 @@ func (a *ServerMetrics) ConnectionDec(serviceMethod string) {
a.connections.With(prometheus.Labels{"service_method": serviceMethod}).Dec()
}

// HTTPConnectionDec increments a new HTTP CONNECTION connection.
func (a *ServerMetrics) HTTPConnectionInc() { a.httpConnections.Inc() }

// HTTPConnectionDec decrements a finished HTTP CONNECTION connection.
func (a *ServerMetrics) HTTPConnectionDec() { a.httpConnections.Dec() }

// SetBackendCount sets the number of backend connection.
func (a *ServerMetrics) SetBackendCount(count int) {
a.backend.WithLabelValues().Set(float64(count))
Expand Down
8 changes: 4 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
Expand All @@ -47,7 +46,8 @@ type key int
type ProxyClientConnection struct {
Mode string
Grpc client.ProxyService_ProxyServer
HTTP net.Conn
HTTP io.ReadWriter
CloseHTTP func() error
connected chan struct{}
connectID int64
agentID string
Expand All @@ -67,13 +67,13 @@ func (c *ProxyClientConnection) send(pkt *client.Packet) error {
return stream.Send(pkt)
} else if c.Mode == "http-connect" {
if pkt.Type == client.PacketType_CLOSE_RSP {
return c.HTTP.Close()
return c.CloseHTTP()
} else if pkt.Type == client.PacketType_DATA {
_, err := c.HTTP.Write(pkt.GetData().Data)
return err
} else if pkt.Type == client.PacketType_DIAL_RSP {
if pkt.GetDialResponse().Error != "" {
return c.HTTP.Close()
return c.CloseHTTP()
}
return nil
} else {
Expand Down
18 changes: 16 additions & 2 deletions pkg/server/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"io"
"math/rand"
"net/http"
"sync"
"time"

"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics"
)

// Tunnel implements Proxy based on HTTP Connect, which tunnels the traffic to
Expand All @@ -34,6 +36,9 @@ type Tunnel struct {
}

func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
metrics.Metrics.HTTPConnectionInc()
defer metrics.Metrics.HTTPConnectionDec()

klog.V(2).InfoS("Received request for host", "method", r.Method, "host", r.Host, "userAgent", r.UserAgent())
if r.TLS != nil {
klog.V(2).InfoS("TLS", "commonName", r.TLS.PeerCertificates[0].Subject.CommonName)
Expand All @@ -55,6 +60,8 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var closeOnce sync.Once
defer closeOnce.Do(func() { conn.Close() })

random := rand.Int63() /* #nosec G404 */
dialRequest := &client.Packet{
Expand All @@ -74,10 +81,16 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("currently no tunnels available: %v", err), http.StatusInternalServerError)
return
}
closed := make(chan struct{})
connected := make(chan struct{})
connection := &ProxyClientConnection{
Mode: "http-connect",
HTTP: conn,
Mode: "http-connect",
HTTP: io.ReadWriter(conn), // pass as ReadWriter so the caller must close with CloseHTTP
CloseHTTP: func() error {
closeOnce.Do(func() { conn.Close() })
close(closed)
return nil
},
connected: connected,
start: time.Now(),
backend: backend,
Expand All @@ -100,6 +113,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {

select {
case <-connection.connected: // Waiting for response before we begin full communication.
case <-closed: // Connection was closed before being established
}

defer func() {
Expand Down
81 changes: 78 additions & 3 deletions tests/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/httptest"
"net/url"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -336,6 +337,71 @@ func TestBasicProxy_HTTPCONN(t *testing.T) {

}

func TestFailedDial_HTTPCONN(t *testing.T) {
server := httptest.NewServer(newEchoServer("hello"))
server.Close() // cleanup immediately so connections will fail

stopCh := make(chan struct{})
defer close(stopCh)

proxy, cleanup, err := runHTTPConnProxyServer()
if err != nil {
t.Fatal(err)
}
defer cleanup()

runAgent(proxy.agent, stopCh)

// Wait for agent to register on proxy server
time.Sleep(time.Second)

conn, err := net.Dial("tcp", proxy.front)
if err != nil {
t.Error(err)
}

serverURL, _ := url.Parse(server.URL)

// Send HTTP-Connect request
_, err = fmt.Fprintf(conn, "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", serverURL.Host, "127.0.0.1")
if err != nil {
t.Error(err)
}

// Parse the HTTP response for Connect
br := bufio.NewReader(conn)
res, err := http.ReadResponse(br, nil)
if err != nil {
t.Errorf("reading HTTP response from CONNECT: %v", err)
}
if res.StatusCode != 200 {
t.Errorf("expect 200; got %d", res.StatusCode)
}

dialer := func(network, addr string) (net.Conn, error) {
return conn, nil
}

c := &http.Client{
Transport: &http.Transport{
Dial: dialer,
},
}

_, err = c.Get(server.URL)
if err == nil || !strings.Contains(err.Error(), "connection reset by peer") {
t.Error(err)
}

for i := 0; i < 20; i++ {
if proxy.getActiveHTTPConnectConns() == 0 {
return
}
time.Sleep(time.Millisecond * 10)
}
t.Errorf("expected connection to eventually be closed")
}

func localAddr(addr net.Addr) string {
return addr.String()
}
Expand All @@ -344,6 +410,8 @@ type proxy struct {
server *server.ProxyServer
front string
agent string

getActiveHTTPConnectConns func() int
}

func runGRPCProxyServer() (proxy, func(), error) {
Expand Down Expand Up @@ -409,10 +477,17 @@ func runHTTPConnProxyServer() (proxy, func(), error) {
proxy.agent = localAddr(lis.Addr())

// http-connect
active := int32(0)
proxy.getActiveHTTPConnectConns = func() int { return int(atomic.LoadInt32(&active)) }
handler := &server.Tunnel{
Server: s,
}
httpServer := &http.Server{
Handler: &server.Tunnel{
Server: s,
},
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&active, 1)
defer atomic.AddInt32(&active, -1)
handler.ServeHTTP(w, r)
}),
}
lis2, err := net.Listen("tcp", "")
if err != nil {
Expand Down

0 comments on commit 4e8b12d

Please sign in to comment.