diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 392a00ed..2d55f1ca 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -14,6 +14,7 @@ import ( cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p/core/connmgr" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -85,6 +86,8 @@ type impl struct { // inbound messages from the network are forwarded to the receiver receivers []Receiver + + cancel context.CancelFunc } type streamMessageSender struct { @@ -353,16 +356,87 @@ func (bsnet *impl) Start(r ...Receiver) { bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...) } for _, proto := range bsnet.supportedProtocols { + log.Debugf("setting up handler for protocol: %s", proto) bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream) } + + // first, subscribe to libp2p events that indicate a change in connection state + sub, err := bsnet.host.EventBus().Subscribe([]interface{}{ + &event.EvtPeerProtocolsUpdated{}, + &event.EvtPeerIdentificationCompleted{}, + }) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + bsnet.cancel = cancel + + go bsnet.peerUpdatedSubscription(ctx, sub) + + // next, add any peers with existing connections that support bitswap protocols + for _, conn := range bsnet.host.Network().Conns() { + peerID := conn.RemotePeer() + if bsnet.peerSupportsBitswap(peerID) { + log.Debugf("connecting to existing peer: %s", peerID) + bsnet.connectEvtMgr.Connected(peerID) + } + } + + // finally, listen for disconnects and start processing the events bsnet.host.Network().Notify((*netNotifiee)(bsnet)) bsnet.connectEvtMgr.Start() - } func (bsnet *impl) Stop() { bsnet.connectEvtMgr.Stop() bsnet.host.Network().StopNotify((*netNotifiee)(bsnet)) + bsnet.cancel() +} + +func (bsnet *impl) peerUpdatedSubscription(ctx context.Context, sub event.Subscription) { + for { + select { + case <-ctx.Done(): + return + case evt := <-sub.Out(): + if peu, ok := evt.(event.EvtPeerProtocolsUpdated); ok { + if bsnet.hasBitswapProtocol(peu.Added) { + log.Debugf("connecting to peer with updated protocol list: %s", peu.Peer) + bsnet.connectEvtMgr.Connected(peu.Peer) + continue + } + + if bsnet.hasBitswapProtocol(peu.Removed) && !bsnet.peerSupportsBitswap(peu.Peer) { + log.Debugf("disconnecting from peer with updated protocol list: %s", peu.Peer) + bsnet.connectEvtMgr.Disconnected(peu.Peer) + } + continue + } + + if peic, ok := evt.(event.EvtPeerIdentificationCompleted); ok { + if bsnet.peerSupportsBitswap(peic.Peer) { + log.Debugf("connecting to peer with new identification: %s", peic.Peer) + bsnet.connectEvtMgr.Connected(peic.Peer) + } + } + } + } +} + +func (bsnet *impl) peerSupportsBitswap(peerID peer.ID) bool { + protocols, err := bsnet.host.Peerstore().SupportsProtocols(peerID, protocol.ConvertToStrings(bsnet.supportedProtocols)...) + return err == nil && len(protocols) > 0 +} + +func (bsnet *impl) hasBitswapProtocol(protos []protocol.ID) bool { + for _, p := range protos { + switch p { + case bsnet.protocolBitswap, bsnet.protocolBitswapOneOne, bsnet.protocolBitswapOneZero, bsnet.protocolBitswapNoVers: + return true + } + } + return false } func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { @@ -450,22 +524,16 @@ func (nn *netNotifiee) impl() *impl { return (*impl)(nn) } -func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { - // ignore transient connections - if v.Stat().Transient { - return - } - - nn.impl().connectEvtMgr.Connected(v.RemotePeer()) -} func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) { // Only record a "disconnect" when we actually disconnect. if n.Connectedness(v.RemotePeer()) == network.Connected { return } + log.Debugf("peer disconnected: %s", v.RemotePeer()) nn.impl().connectEvtMgr.Disconnected(v.RemotePeer()) } +func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {} func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {} func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {} func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}