Skip to content

Commit

Permalink
emit Connectedness notifications from the swarm
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Feb 5, 2023
1 parent f43813f commit 7119beb
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
31 changes: 27 additions & 4 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type Swarm struct {
// down before continuing.
refs sync.WaitGroup

eventBus event.Bus
emitter event.Emitter

rcmgr network.ResourceManager

Expand Down Expand Up @@ -167,11 +167,15 @@ type Swarm struct {

// NewSwarm constructs a Swarm.
func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*Swarm, error) {
emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged))
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
s := &Swarm{
local: local,
peers: peers,
eventBus: eventBus,
emitter: emitter,
ctx: ctx,
ctxCancel: cancel,
dialTimeout: defaultDialTimeout,
Expand Down Expand Up @@ -207,6 +211,8 @@ func (s *Swarm) Close() error {
func (s *Swarm) close() {
s.ctxCancel()

s.emitter.Close()

// Prevents new connections and/or listeners from being added to the swarm.
s.listeners.Lock()
listeners := s.listeners.m
Expand Down Expand Up @@ -323,6 +329,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
}

c.streams.m = make(map[*Stream]struct{})
isFirstConn := len(s.conns.m[p]) == 0
s.conns.m[p] = append(s.conns.m[p], c)

// Add two swarm refs:
Expand All @@ -340,6 +347,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
})
c.notifyLk.Unlock()

if isFirstConn {
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.Connected,
})
}

c.start()
return c, nil
}
Expand Down Expand Up @@ -610,13 +624,14 @@ func (s *Swarm) StopNotify(f network.Notifiee) {
func (s *Swarm) removeConn(c *Conn) {
p := c.RemotePeer()

var disconnected bool
s.conns.Lock()
defer s.conns.Unlock()
cs := s.conns.m[p]
for i, ci := range cs {
if ci == c {
if len(cs) == 1 {
delete(s.conns.m, p)
disconnected = true
} else {
// NOTE: We're intentionally preserving order.
// This way, connections to a peer are always
Expand All @@ -625,9 +640,17 @@ func (s *Swarm) removeConn(c *Conn) {
cs[len(cs)-1] = nil
s.conns.m[p] = cs[:len(cs)-1]
}
return
break
}
}
s.conns.Unlock()

if disconnected {
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.NotConnected,
})
}
}

// String returns a string representation of Network.
Expand Down
66 changes: 66 additions & 0 deletions p2p/net/swarm/swarm_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package swarm_test

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
. "github.com/libp2p/go-libp2p/p2p/net/swarm"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func newSwarmWithSubscription(t *testing.T) (*Swarm, event.Subscription) {
t.Helper()
bus := eventbus.NewBus()
sw := swarmt.GenSwarm(t, swarmt.EventBus(bus))
t.Cleanup(func() { sw.Close() })
sub, err := bus.Subscribe(new(event.EvtPeerConnectednessChanged))
require.NoError(t, err)
t.Cleanup(func() { sub.Close() })
return sw, sub
}

func checkEvent(t *testing.T, sub event.Subscription, expected event.EvtPeerConnectednessChanged) {
t.Helper()
select {
case ev, ok := <-sub.Out():
require.True(t, ok)
evt := ev.(event.EvtPeerConnectednessChanged)
require.Equal(t, expected.Connectedness, evt.Connectedness, "wrong connectedness state")
require.Equal(t, expected.Peer, evt.Peer)
case <-time.After(time.Second):
t.Fatal("didn't get PeerConnectedness event")
}

// check that there are no more events
select {
case <-sub.Out():
t.Fatal("didn't expect any more events")
case <-time.After(50 * time.Millisecond):
return
}
}

func TestConnectednessEventsSingleConn(t *testing.T) {
s1, sub1 := newSwarmWithSubscription(t)
s2, sub2 := newSwarmWithSubscription(t)

s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{s2.ListenAddresses()[0]}, time.Hour)
_, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.NoError(t, err)

checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.Connected})
checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.Connected})

for _, c := range s2.ConnsToPeer(s1.LocalPeer()) {
require.NoError(t, c.Close())
}
checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.NotConnected})
checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.NotConnected})
}
14 changes: 13 additions & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -36,6 +37,7 @@ type config struct {
connectionGater connmgr.ConnectionGater
sk crypto.PrivKey
swarmOpts []swarm.Option
eventBus event.Bus
clock
}

Expand Down Expand Up @@ -99,6 +101,12 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option {
}
}

func EventBus(b event.Bus) Option {
return func(_ *testing.T, c *config) {
c.eventBus = b
}
}

// GenUpgrader creates a new connection upgrader for use with this swarm.
func GenUpgrader(t *testing.T, n *swarm.Swarm, connGater connmgr.ConnectionGater, opts ...tptu.Option) transport.Upgrader {
id := n.LocalPeer()
Expand Down Expand Up @@ -141,7 +149,11 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater))
}

s, err := swarm.NewSwarm(id, ps, eventbus.NewBus(), swarmOpts...)
eventBus := cfg.eventBus
if eventBus == nil {
eventBus = eventbus.NewBus()
}
s, err := swarm.NewSwarm(id, ps, eventBus, swarmOpts...)
require.NoError(t, err)

upgrader := GenUpgrader(t, s, cfg.connectionGater)
Expand Down

0 comments on commit 7119beb

Please sign in to comment.