Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: cleanup stream handler goroutine #2610

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {

log.Debugf("negotiated: %s (took %s)", protoID, took)

go handle(protoID, s)
handle(protoID, s)
}

// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (bh *BlankHost) newStreamHandler(s network.Stream) {

s.SetProtocol(protoID)

go handle(protoID, s)
handle(protoID, s)
}

// TODO: i'm not sure this really needs to be here
Expand Down
4 changes: 3 additions & 1 deletion p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (c *Conn) start() {
if h := c.swarm.StreamHandler(); h != nil {
h(s)
}
s.completeAcceptStreamGoroutine()
}()
}
}()
Expand Down Expand Up @@ -238,7 +239,8 @@ func (c *Conn) addStream(ts network.MuxedStream, dir network.Direction, scope ne
Direction: dir,
Opened: time.Now(),
},
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
acceptStreamGoroutineCompleted: dir != network.DirInbound,
}
c.stat.NumStreams++
c.streams.m[s] = struct{}{}
Expand Down
37 changes: 31 additions & 6 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type Stream struct {
conn *Conn
scope network.StreamManagementScope

closeOnce sync.Once
closeMx sync.Mutex
isClosed bool
// acceptStreamGoroutineCompleted indicates whether the goroutine handling the incoming stream has exited
acceptStreamGoroutineCompleted bool

protocol atomic.Pointer[protocol.ID]

Expand Down Expand Up @@ -76,18 +79,33 @@ func (s *Stream) Write(p []byte) (int, error) {
// resources.
func (s *Stream) Close() error {
err := s.stream.Close()
s.closeOnce.Do(s.remove)
s.closeAndRemoveStream()
return err
}

// Reset resets the stream, signaling an error on both ends and freeing all
// associated resources.
func (s *Stream) Reset() error {
err := s.stream.Reset()
s.closeOnce.Do(s.remove)
s.closeAndRemoveStream()
return err
}

func (s *Stream) closeAndRemoveStream() {
s.closeMx.Lock()
defer s.closeMx.Unlock()
if s.isClosed {
return
}
s.isClosed = true
// We don't want to keep swarm from closing till the stream handler has exited
s.conn.swarm.refs.Done()
// Cleanup the stream from connection only after the stream handler has completed
if s.acceptStreamGoroutineCompleted {
s.conn.removeStream(s)
}
}

// CloseWrite closes the stream for writing, flushing all data and sending an EOF.
// This function does not free resources, call Close or Reset when done with the
// stream.
Expand All @@ -101,9 +119,16 @@ func (s *Stream) CloseRead() error {
return s.stream.CloseRead()
}

func (s *Stream) remove() {
s.conn.removeStream(s)
s.conn.swarm.refs.Done()
func (s *Stream) completeAcceptStreamGoroutine() {
s.closeMx.Lock()
defer s.closeMx.Unlock()
if s.acceptStreamGoroutineCompleted {
return
}
s.acceptStreamGoroutineCompleted = true
if s.isClosed {
s.conn.removeStream(s)
}
}

// Protocol returns the protocol negotiated on this stream (if set).
Expand Down
83 changes: 83 additions & 0 deletions p2p/test/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package swarm_test

import (
"context"
"io"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -160,3 +163,83 @@ func TestNewStreamTransientConnection(t *testing.T) {
<-done
<-done
}

func TestLimitStreamsWhenHangingHandlers(t *testing.T) {
var partial rcmgr.PartialLimitConfig
const streamLimit = 10
partial.System.Streams = streamLimit
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(partial.Build(rcmgr.InfiniteLimits)))
require.NoError(t, err)

maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic-v1")
require.NoError(t, err)

receiver, err := libp2p.New(
libp2p.ResourceManager(mgr),
libp2p.ListenAddrs(maddr),
)
require.NoError(t, err)
t.Cleanup(func() { receiver.Close() })

var wg sync.WaitGroup
wg.Add(1)

const pid = "/test"
receiver.SetStreamHandler(pid, func(s network.Stream) {
defer s.Close()
s.Write([]byte{42})
wg.Wait()
})

// Open streamLimit streams
success := 0
// we make a lot of tries because identify and identify push take up a few streams
for i := 0; i < 1000 && success < streamLimit; i++ {
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err != nil {
continue
}

var b [1]byte
_, err = io.ReadFull(s, b[:])
if err == nil {
success++
}
sender.Close()
}
require.Equal(t, streamLimit, success)
// We have the maximum number of streams open. Next call should fail.
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

_, err = sender.NewStream(context.Background(), receiver.ID(), pid)
require.Error(t, err)

// Close the open streams
wg.Done()

// Next call should succeed
require.Eventually(t, func() bool {
s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err == nil {
s.Close()
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
}