Skip to content

Commit

Permalink
Release v0.27.7 (#2374)
Browse files Browse the repository at this point in the history
* fix: in the swarm move Connectedness emit after releasing conns (#2373)

* fix: in the swarm move Connectedness emit after releasing conns

go-libp2p-kad-dht now listen to both EvtPeerIdentificationCompleted and EvtPeerConnectednessChanged
and EvtPeerIdentificationCompleted calls .ConnsToPeer inorder to do some filtering.

However it happens that it deadlocks because if the swarm is trying to emit a EvtPeerConnectednessChanged
while the subscriber is trying to process an EvtPeerIdentificationCompleted, the subscriber is stuck on
s.conns.RLock() while the swarm wont release it before having sent EvtPeerConnectednessChanged.
Deadlock !

I havn't confirmed this fixes my bug given this takes time to reproduce, I'll startup a new experiment soon.

* Fix other deadlock and add a test

* Make test a little faster

* Bind on localhost

---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>

* Release version v0.27.7

* identify: set stream deadlines for Identify and Identify Push streams (#2382)

---------

Co-authored-by: Jorropo <jorropo.pgm@gmail.com>
Co-authored-by: Marten Seemann <martenseemann@gmail.com>
  • Loading branch information
3 people authored Jun 19, 2023
1 parent 2df518f commit 68ad5ea
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 33 deletions.
53 changes: 32 additions & 21 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
}

c.streams.m = make(map[*Stream]struct{})
if len(s.conns.m[p]) == 0 { // first connection
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.Connected,
})
}
isFirstConnection := len(s.conns.m[p]) == 0
s.conns.m[p] = append(s.conns.m[p], c)

// Add two swarm refs:
Expand All @@ -347,6 +342,15 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
c.notifyLk.Lock()
s.conns.Unlock()

// Emit event after releasing `s.conns` lock so that a consumer can still
// use swarm methods that need the `s.conns` lock.
if isFirstConnection {
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.Connected,
})
}

s.notifyAll(func(f network.Notifiee) {
f.Connected(s, c)
})
Expand Down Expand Up @@ -626,25 +630,32 @@ func (s *Swarm) removeConn(c *Conn) {
p := c.RemotePeer()

s.conns.Lock()
defer s.conns.Unlock()

cs := s.conns.m[p]

if len(cs) == 1 {
delete(s.conns.m, p)
s.conns.Unlock()

// Emit event after releasing `s.conns` lock so that a consumer can still
// use swarm methods that need the `s.conns` lock.
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.NotConnected,
})
return
}

defer s.conns.Unlock()

for i, ci := range cs {
if ci == c {
if len(cs) == 1 {
delete(s.conns.m, p)
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.NotConnected,
})
} else {
// NOTE: We're intentionally preserving order.
// This way, connections to a peer are always
// sorted oldest to newest.
copy(cs[i:], cs[i+1:])
cs[len(cs)-1] = nil
s.conns.m[p] = cs[:len(cs)-1]
}
// NOTE: We're intentionally preserving order.
// This way, connections to a peer are always
// sorted oldest to newest.
copy(cs[i:], cs[i+1:])
cs[len(cs)-1] = nil
s.conns.m[p] = cs[:len(cs)-1]
break
}
}
Expand Down
49 changes: 49 additions & 0 deletions p2p/net/swarm/swarm_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,52 @@ func TestConnectednessEventsSingleConn(t *testing.T) {
checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.NotConnected})
checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.NotConnected})
}

func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) {
dialerEventBus := eventbus.NewBus()
dialer := swarmt.GenSwarm(t, swarmt.OptDialOnly, swarmt.EventBus(dialerEventBus))
defer dialer.Close()

listener := swarmt.GenSwarm(t, swarmt.OptDialOnly)
addrsToListen := []ma.Multiaddr{
ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
}

if err := listener.Listen(addrsToListen...); err != nil {
t.Fatal(err)
}
listenedAddrs := listener.ListenAddresses()

dialer.Peerstore().AddAddrs(listener.LocalPeer(), listenedAddrs, time.Hour)

sub, err := dialerEventBus.Subscribe(new(event.EvtPeerConnectednessChanged))
require.NoError(t, err)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// A slow consumer
go func() {
for {
select {
case <-ctx.Done():
return
case <-sub.Out():
time.Sleep(100 * time.Millisecond)
// Do something with the swarm that needs the conns lock
_ = dialer.ConnsToPeer(listener.LocalPeer())
time.Sleep(100 * time.Millisecond)
}
}
}()

for i := 0; i < 10; i++ {
// Connect and disconnect to trigger a bunch of events
_, err := dialer.DialPeer(context.Background(), listener.LocalPeer())
require.NoError(t, err)
dialer.ClosePeer(listener.LocalPeer())
}

// The test should finish without deadlocking
}
11 changes: 6 additions & 5 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ const ServiceName = "libp2p.identify"

const maxPushConcurrency = 32

// StreamReadTimeout is the read timeout on all incoming Identify family streams.
var StreamReadTimeout = 60 * time.Second
var Timeout = 60 * time.Second // timeout on all incoming Identify interactions

const (
legacyIDSize = 2 * 1024 // 2k Bytes
Expand Down Expand Up @@ -416,11 +415,14 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
}

func (ids *idService) identifyConn(c network.Conn) error {
s, err := c.NewStream(network.WithUseTransient(context.TODO(), "identify"))
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
defer cancel()
s, err := c.NewStream(network.WithUseTransient(ctx, "identify"))
if err != nil {
log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err)
return err
}
s.SetDeadline(time.Now().Add(Timeout))

if err := s.SetProtocol(ID); err != nil {
log.Warnf("error setting identify protocol for stream: %s", err)
Expand All @@ -439,6 +441,7 @@ func (ids *idService) identifyConn(c network.Conn) error {

// handlePush handles incoming identify push streams
func (ids *idService) handlePush(s network.Stream) {
s.SetDeadline(time.Now().Add(Timeout))
ids.handleIdentifyResponse(s, true)
}

Expand Down Expand Up @@ -500,8 +503,6 @@ func (ids *idService) handleIdentifyResponse(s network.Stream, isPush bool) erro
}
defer s.Scope().ReleaseMemory(signedIDSize)

_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))

c := s.Conn()

r := pbio.NewDelimitedReader(s, signedIDSize)
Expand Down
12 changes: 6 additions & 6 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,10 +804,10 @@ func TestLargePushMessage(t *testing.T) {
}

func TestIdentifyResponseReadTimeout(t *testing.T) {
timeout := identify.StreamReadTimeout
identify.StreamReadTimeout = 100 * time.Millisecond
timeout := identify.Timeout
identify.Timeout = 100 * time.Millisecond
defer func() {
identify.StreamReadTimeout = timeout
identify.Timeout = timeout
}()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -850,10 +850,10 @@ func TestIdentifyResponseReadTimeout(t *testing.T) {
}

func TestIncomingIDStreamsTimeout(t *testing.T) {
timeout := identify.StreamReadTimeout
identify.StreamReadTimeout = 100 * time.Millisecond
timeout := identify.Timeout
identify.Timeout = 100 * time.Millisecond
defer func() {
identify.StreamReadTimeout = timeout
identify.Timeout = timeout
}()

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.27.6"
"version": "v0.27.7"
}

0 comments on commit 68ad5ea

Please sign in to comment.