Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Improve peer connection handling #590

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 77 additions & 9 deletions network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -85,6 +86,8 @@ type impl struct {

// inbound messages from the network are forwarded to the receiver
receivers []Receiver

cancel context.CancelFunc
}

type streamMessageSender struct {
Expand Down Expand Up @@ -353,16 +356,87 @@ func (bsnet *impl) Start(r ...Receiver) {
bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...)
}
for _, proto := range bsnet.supportedProtocols {
log.Debugf("setting up handler for protocol: %s", proto)
bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
}

// first, subscribe to libp2p events that indicate a change in connection state
sub, err := bsnet.host.EventBus().Subscribe([]interface{}{
&event.EvtPeerProtocolsUpdated{},
&event.EvtPeerIdentificationCompleted{},
})
if err != nil {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())
bsnet.cancel = cancel

go bsnet.peerUpdatedSubscription(ctx, sub)

// next, add any peers with existing connections that support bitswap protocols
for _, conn := range bsnet.host.Network().Conns() {
peerID := conn.RemotePeer()
if bsnet.peerSupportsBitswap(peerID) {
log.Debugf("connecting to existing peer: %s", peerID)
bsnet.connectEvtMgr.Connected(peerID)
}
}

// finally, listen for disconnects and start processing the events
bsnet.host.Network().Notify((*netNotifiee)(bsnet))
bsnet.connectEvtMgr.Start()

}

func (bsnet *impl) Stop() {
bsnet.connectEvtMgr.Stop()
bsnet.host.Network().StopNotify((*netNotifiee)(bsnet))
bsnet.cancel()
}

func (bsnet *impl) peerUpdatedSubscription(ctx context.Context, sub event.Subscription) {
for {
select {
case <-ctx.Done():
return
case evt := <-sub.Out():
if peu, ok := evt.(event.EvtPeerProtocolsUpdated); ok {
if bsnet.hasBitswapProtocol(peu.Added) {
log.Debugf("connecting to peer with updated protocol list: %s", peu.Peer)
bsnet.connectEvtMgr.Connected(peu.Peer)
continue
}

if bsnet.hasBitswapProtocol(peu.Removed) && !bsnet.peerSupportsBitswap(peu.Peer) {
log.Debugf("disconnecting from peer with updated protocol list: %s", peu.Peer)
bsnet.connectEvtMgr.Disconnected(peu.Peer)
}
continue
}

if peic, ok := evt.(event.EvtPeerIdentificationCompleted); ok {
Comment on lines +403 to +417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a type switch

if bsnet.peerSupportsBitswap(peic.Peer) {
log.Debugf("connecting to peer with new identification: %s", peic.Peer)
bsnet.connectEvtMgr.Connected(peic.Peer)
}
}
}
}
}

func (bsnet *impl) peerSupportsBitswap(peerID peer.ID) bool {
protocols, err := bsnet.host.Peerstore().SupportsProtocols(peerID, protocol.ConvertToStrings(bsnet.supportedProtocols)...)
return err == nil && len(protocols) > 0
}

func (bsnet *impl) hasBitswapProtocol(protos []protocol.ID) bool {
for _, p := range protos {
switch p {
case bsnet.protocolBitswap, bsnet.protocolBitswapOneOne, bsnet.protocolBitswapOneZero, bsnet.protocolBitswapNoVers:
return true
}
}
return false
}

func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
Expand Down Expand Up @@ -450,22 +524,16 @@ func (nn *netNotifiee) impl() *impl {
return (*impl)(nn)
}

func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
// ignore transient connections
if v.Stat().Transient {
return
}

nn.impl().connectEvtMgr.Connected(v.RemotePeer())
}
func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
// Only record a "disconnect" when we actually disconnect.
if n.Connectedness(v.RemotePeer()) == network.Connected {
return
}

log.Debugf("peer disconnected: %s", v.RemotePeer())
nn.impl().connectEvtMgr.Disconnected(v.RemotePeer())
}
func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {}
func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
Expand Down