From 7119bebb931c7abbe37808b8527ac28c9b7091dd Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 28 May 2022 10:50:15 +0200 Subject: [PATCH] emit Connectedness notifications from the swarm --- p2p/net/swarm/swarm.go | 31 +++++++++++++-- p2p/net/swarm/swarm_event_test.go | 66 +++++++++++++++++++++++++++++++ p2p/net/swarm/testing/testing.go | 14 ++++++- 3 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 p2p/net/swarm/swarm_event_test.go diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index a49c3b9873..c9c9b7fc4e 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -112,7 +112,7 @@ type Swarm struct { // down before continuing. refs sync.WaitGroup - eventBus event.Bus + emitter event.Emitter rcmgr network.ResourceManager @@ -167,11 +167,15 @@ type Swarm struct { // NewSwarm constructs a Swarm. func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*Swarm, error) { + emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) + if err != nil { + return nil, err + } ctx, cancel := context.WithCancel(context.Background()) s := &Swarm{ local: local, peers: peers, - eventBus: eventBus, + emitter: emitter, ctx: ctx, ctxCancel: cancel, dialTimeout: defaultDialTimeout, @@ -207,6 +211,8 @@ func (s *Swarm) Close() error { func (s *Swarm) close() { s.ctxCancel() + s.emitter.Close() + // Prevents new connections and/or listeners from being added to the swarm. s.listeners.Lock() listeners := s.listeners.m @@ -323,6 +329,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, } c.streams.m = make(map[*Stream]struct{}) + isFirstConn := len(s.conns.m[p]) == 0 s.conns.m[p] = append(s.conns.m[p], c) // Add two swarm refs: @@ -340,6 +347,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, }) c.notifyLk.Unlock() + if isFirstConn { + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: network.Connected, + }) + } + c.start() return c, nil } @@ -610,13 +624,14 @@ func (s *Swarm) StopNotify(f network.Notifiee) { func (s *Swarm) removeConn(c *Conn) { p := c.RemotePeer() + var disconnected bool s.conns.Lock() - defer s.conns.Unlock() cs := s.conns.m[p] for i, ci := range cs { if ci == c { if len(cs) == 1 { delete(s.conns.m, p) + disconnected = true } else { // NOTE: We're intentionally preserving order. // This way, connections to a peer are always @@ -625,9 +640,17 @@ func (s *Swarm) removeConn(c *Conn) { cs[len(cs)-1] = nil s.conns.m[p] = cs[:len(cs)-1] } - return + break } } + s.conns.Unlock() + + if disconnected { + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: network.NotConnected, + }) + } } // String returns a string representation of Network. diff --git a/p2p/net/swarm/swarm_event_test.go b/p2p/net/swarm/swarm_event_test.go new file mode 100644 index 0000000000..d7eba966c4 --- /dev/null +++ b/p2p/net/swarm/swarm_event_test.go @@ -0,0 +1,66 @@ +package swarm_test + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" + . "github.com/libp2p/go-libp2p/p2p/net/swarm" + swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func newSwarmWithSubscription(t *testing.T) (*Swarm, event.Subscription) { + t.Helper() + bus := eventbus.NewBus() + sw := swarmt.GenSwarm(t, swarmt.EventBus(bus)) + t.Cleanup(func() { sw.Close() }) + sub, err := bus.Subscribe(new(event.EvtPeerConnectednessChanged)) + require.NoError(t, err) + t.Cleanup(func() { sub.Close() }) + return sw, sub +} + +func checkEvent(t *testing.T, sub event.Subscription, expected event.EvtPeerConnectednessChanged) { + t.Helper() + select { + case ev, ok := <-sub.Out(): + require.True(t, ok) + evt := ev.(event.EvtPeerConnectednessChanged) + require.Equal(t, expected.Connectedness, evt.Connectedness, "wrong connectedness state") + require.Equal(t, expected.Peer, evt.Peer) + case <-time.After(time.Second): + t.Fatal("didn't get PeerConnectedness event") + } + + // check that there are no more events + select { + case <-sub.Out(): + t.Fatal("didn't expect any more events") + case <-time.After(50 * time.Millisecond): + return + } +} + +func TestConnectednessEventsSingleConn(t *testing.T) { + s1, sub1 := newSwarmWithSubscription(t) + s2, sub2 := newSwarmWithSubscription(t) + + s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{s2.ListenAddresses()[0]}, time.Hour) + _, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) + + checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.Connected}) + checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.Connected}) + + for _, c := range s2.ConnsToPeer(s1.LocalPeer()) { + require.NoError(t, c.Close()) + } + checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.NotConnected}) + checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.NotConnected}) +} diff --git a/p2p/net/swarm/testing/testing.go b/p2p/net/swarm/testing/testing.go index 69b107cdb6..b052a993ad 100644 --- a/p2p/net/swarm/testing/testing.go +++ b/p2p/net/swarm/testing/testing.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/control" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -36,6 +37,7 @@ type config struct { connectionGater connmgr.ConnectionGater sk crypto.PrivKey swarmOpts []swarm.Option + eventBus event.Bus clock } @@ -99,6 +101,12 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option { } } +func EventBus(b event.Bus) Option { + return func(_ *testing.T, c *config) { + c.eventBus = b + } +} + // GenUpgrader creates a new connection upgrader for use with this swarm. func GenUpgrader(t *testing.T, n *swarm.Swarm, connGater connmgr.ConnectionGater, opts ...tptu.Option) transport.Upgrader { id := n.LocalPeer() @@ -141,7 +149,11 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater)) } - s, err := swarm.NewSwarm(id, ps, eventbus.NewBus(), swarmOpts...) + eventBus := cfg.eventBus + if eventBus == nil { + eventBus = eventbus.NewBus() + } + s, err := swarm.NewSwarm(id, ps, eventBus, swarmOpts...) require.NoError(t, err) upgrader := GenUpgrader(t, s, cfg.connectionGater)