Skip to content

Commit

Permalink
Cleanup portforward streams after their usage
Browse files Browse the repository at this point in the history
This implements a stream cleanup when using portforwardings. Before
applying this patch, the streams []httpstream.Stream within
`spdy/connection.go` would fill-up for each streaming request. This
could result in heavy memory usage. Now we use the stream identifier to
keep track of them and finally remove them again once they're no longer
needed.

Signed-off-by: Sascha Grunert <sgrunert@redhat.com>

Kubernetes-commit: b14bd44f33d93e1ee64c1d68fa7591d79eac5893
  • Loading branch information
saschagrunert authored and k8s-publishing-bot committed Mar 5, 2021
1 parent be0c813 commit c422483
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pkg/util/httpstream/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type Connection interface {
// SetIdleTimeout sets the amount of time the connection may remain idle before
// it is automatically closed.
SetIdleTimeout(timeout time.Duration)
// RemoveStreams can be used to remove a set of streams from the Connection.
RemoveStreams(streams ...Stream)
}

// Stream represents a bidirectional communications channel that is part of an
Expand Down
22 changes: 18 additions & 4 deletions pkg/util/httpstream/spdy/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// streams.
type connection struct {
conn *spdystream.Connection
streams []httpstream.Stream
streams map[uint32]httpstream.Stream
streamLock sync.Mutex
newStreamHandler httpstream.NewStreamHandler
ping func() (time.Duration, error)
Expand Down Expand Up @@ -85,7 +85,12 @@ func NewServerConnectionWithPings(conn net.Conn, newStreamHandler httpstream.New
// will be invoked when the server receives a newly created stream from the
// client.
func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration, pingFn func() (time.Duration, error)) httpstream.Connection {
c := &connection{conn: conn, newStreamHandler: newStreamHandler, ping: pingFn}
c := &connection{
conn: conn,
newStreamHandler: newStreamHandler,
ping: pingFn,
streams: make(map[uint32]httpstream.Stream),
}
go conn.Serve(c.newSpdyStream)
if pingPeriod > 0 && pingFn != nil {
go c.sendPings(pingPeriod)
Expand All @@ -105,7 +110,7 @@ func (c *connection) Close() error {
// calling Reset instead of Close ensures that all streams are fully torn down
s.Reset()
}
c.streams = make([]httpstream.Stream, 0)
c.streams = make(map[uint32]httpstream.Stream, 0)
c.streamLock.Unlock()

// now that all streams are fully torn down, it's safe to call close on the underlying connection,
Expand All @@ -114,6 +119,15 @@ func (c *connection) Close() error {
return c.conn.Close()
}

// RemoveStreams can be used to removes a set of streams from the Connection.
func (c *connection) RemoveStreams(streams ...httpstream.Stream) {
c.streamLock.Lock()
for _, stream := range streams {
delete(c.streams, stream.Identifier())
}
c.streamLock.Unlock()
}

// CreateStream creates a new stream with the specified headers and registers
// it with the connection.
func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) {
Expand All @@ -133,7 +147,7 @@ func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error
// it owns.
func (c *connection) registerStream(s httpstream.Stream) {
c.streamLock.Lock()
c.streams = append(c.streams, s)
c.streams[s.Identifier()] = s
c.streamLock.Unlock()
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/util/httpstream/spdy/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,41 @@ func TestConnectionPings(t *testing.T) {
t.Errorf("timed out waiting for server to exit")
}
}

type fakeStream struct{ id uint32 }

func (*fakeStream) Read(p []byte) (int, error) { return 0, nil }
func (*fakeStream) Write(p []byte) (int, error) { return 0, nil }
func (*fakeStream) Close() error { return nil }
func (*fakeStream) Reset() error { return nil }
func (*fakeStream) Headers() http.Header { return nil }
func (f *fakeStream) Identifier() uint32 { return f.id }

func TestConnectionRemoveStreams(t *testing.T) {
c := &connection{streams: make(map[uint32]httpstream.Stream)}
stream0 := &fakeStream{id: 0}
stream1 := &fakeStream{id: 1}
stream2 := &fakeStream{id: 2}

c.registerStream(stream0)
c.registerStream(stream1)

if len(c.streams) != 2 {
t.Fatalf("should have two streams, has %d", len(c.streams))
}

// not exists
c.RemoveStreams(stream2)

if len(c.streams) != 2 {
t.Fatalf("should have two streams, has %d", len(c.streams))
}

// remove all existing
c.RemoveStreams(stream0, stream1)

if len(c.streams) != 0 {
t.Fatalf("should not have any streams, has %d", len(c.streams))
}

}

0 comments on commit c422483

Please sign in to comment.