Skip to content

Commit

Permalink
swarm: cleanup stream handler goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Oct 17, 2023
1 parent a8cbee8 commit 0362792
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 9 deletions.
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {

log.Debugf("negotiated: %s (took %s)", protoID, took)

go handle(protoID, s)
handle(protoID, s)
}

// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (bh *BlankHost) newStreamHandler(s network.Stream) {

s.SetProtocol(protoID)

go handle(protoID, s)
handle(protoID, s)
}

// TODO: i'm not sure this really needs to be here
Expand Down
4 changes: 3 additions & 1 deletion p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (c *Conn) start() {
if h := c.swarm.StreamHandler(); h != nil {
h(s)
}
s.completeAcceptStreamGoroutine()
}()
}
}()
Expand Down Expand Up @@ -238,7 +239,8 @@ func (c *Conn) addStream(ts network.MuxedStream, dir network.Direction, scope ne
Direction: dir,
Opened: time.Now(),
},
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
acceptStreamGoroutineCompleted: dir != network.DirInbound,
}
c.stat.NumStreams++
c.streams.m[s] = struct{}{}
Expand Down
37 changes: 31 additions & 6 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type Stream struct {
conn *Conn
scope network.StreamManagementScope

closeOnce sync.Once
closeMx sync.Mutex
isClosed bool
// acceptStreamGoroutineCompleted indicates whether the goroutine handling the incoming stream has exited
acceptStreamGoroutineCompleted bool

protocol atomic.Pointer[protocol.ID]

Expand Down Expand Up @@ -76,18 +79,33 @@ func (s *Stream) Write(p []byte) (int, error) {
// resources.
func (s *Stream) Close() error {
err := s.stream.Close()
s.closeOnce.Do(s.remove)
s.closeAndRemoveStream()
return err
}

// Reset resets the stream, signaling an error on both ends and freeing all
// associated resources.
func (s *Stream) Reset() error {
err := s.stream.Reset()
s.closeOnce.Do(s.remove)
s.closeAndRemoveStream()
return err
}

func (s *Stream) closeAndRemoveStream() {
s.closeMx.Lock()
defer s.closeMx.Unlock()
if s.isClosed {
return
}
s.isClosed = true
// We don't want to keep swarm from closing till the stream handler has exited
s.conn.swarm.refs.Done()
// Cleanup the stream from connection only after the stream handler has completed
if s.acceptStreamGoroutineCompleted {
s.conn.removeStream(s)
}
}

// CloseWrite closes the stream for writing, flushing all data and sending an EOF.
// This function does not free resources, call Close or Reset when done with the
// stream.
Expand All @@ -101,9 +119,16 @@ func (s *Stream) CloseRead() error {
return s.stream.CloseRead()
}

func (s *Stream) remove() {
s.conn.removeStream(s)
s.conn.swarm.refs.Done()
func (s *Stream) completeAcceptStreamGoroutine() {
s.closeMx.Lock()
defer s.closeMx.Unlock()
if s.acceptStreamGoroutineCompleted {
return
}
s.acceptStreamGoroutineCompleted = true
if s.isClosed {
s.conn.removeStream(s)
}
}

// Protocol returns the protocol negotiated on this stream (if set).
Expand Down
84 changes: 84 additions & 0 deletions p2p/test/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package swarm_test

import (
"context"
"io"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -68,3 +72,83 @@ func TestDialPeerTransientConnection(t *testing.T) {
require.Error(t, err)
require.Nil(t, conn)
}

func TestLimitStreamsWhenHangingHandlers(t *testing.T) {
var partial rcmgr.PartialLimitConfig
const streamLimit = 10
partial.System.Streams = streamLimit
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(partial.Build(rcmgr.InfiniteLimits)))
require.NoError(t, err)

maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic-v1")
require.NoError(t, err)

receiver, err := libp2p.New(
libp2p.ResourceManager(mgr),
libp2p.ListenAddrs(maddr),
)
require.NoError(t, err)
t.Cleanup(func() { receiver.Close() })

var wg sync.WaitGroup
wg.Add(1)

const pid = "/test"
receiver.SetStreamHandler(pid, func(s network.Stream) {
defer s.Close()
s.Write([]byte{42})
wg.Wait()
})

// Open streamLimit streams
success := 0
// we make a lot of tries because identify and identify push take up a few streams
for i := 0; i < 1000 && success < streamLimit; i++ {
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err != nil {
continue
}

var b [1]byte
_, err = io.ReadFull(s, b[:])
if err == nil {
success++
}
sender.Close()
}
require.Equal(t, streamLimit, success)
// We have the maximum number of streams open. Next call should fail.
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

_, err = sender.NewStream(context.Background(), receiver.ID(), pid)
require.Error(t, err)

// Close the open streams
wg.Done()

// Next call should succeed
require.Eventually(t, func() bool {
s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err == nil {
s.Close()
return true
}
return false
}, 5*time.Second, 100*time.Millisecond)
}

0 comments on commit 0362792

Please sign in to comment.