Skip to content

Commit

Permalink
fix(libp2p,kademlia): fixes discrepency-kademlia and libp2p peers lists
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed May 14, 2021
1 parent e98c03b commit 4b5e043
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 41 deletions.
110 changes: 69 additions & 41 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,71 +256,77 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
handshakeStream := NewStream(stream)
i, err := s.handshakeService.Handle(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID)
if err != nil {
s.logger.Debugf("handshake: handle %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
s.logger.Debugf("stream handler: handshake: handle %s: %v", peerID, err)
s.logger.Errorf("stream handler: handshake: unable to handshake with peer id %v", peerID)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}

blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay)
overlay := i.BzzAddress.Overlay

blocked, err := s.blocklist.Exists(overlay)
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", peerID, err)
s.logger.Errorf("internal error while connecting with peer %s", peerID)
s.logger.Debugf("stream handler: blocklisting: exists %s: %v", overlay, err)
s.logger.Errorf("stream handler: internal error while connecting with peer %s", overlay)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}

if blocked {
s.logger.Errorf("blocked connection from blocklisted peer %s", peerID)
s.logger.Errorf("stream handler: blocked connection from blocklisted peer %s", overlay)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}

if s.notifier != nil {
if !s.notifier.Pick(p2p.Peer{Address: i.BzzAddress.Overlay}) {
s.logger.Errorf("don't want incoming peer %s. disconnecting", peerID)
if !s.notifier.Pick(p2p.Peer{Address: overlay}) {
s.logger.Warningf("stream handler: don't want incoming peer %s. disconnecting", overlay)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}
}

if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists {
s.logger.Debugf("stream handler: peer %s already exists", overlay)
if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay)
s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(overlay)
}
return
}

if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay)
s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(overlay)
return
}

if i.FullNode {
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil {
s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err)
s.logger.Errorf("unable to persist peer %v", peerID)
s.logger.Debugf("stream handler: addressbook put error %s: %v", peerID, err)
s.logger.Errorf("stream handler: unable to persist peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay)
return
}
}

peer := p2p.Peer{Address: i.BzzAddress.Overlay}
peer := p2p.Peer{Address: overlay}

s.protocolsmu.RLock()
for _, tn := range s.protocols {
if tn.ConnectIn != nil {
if err := tn.ConnectIn(ctx, peer); err != nil {
s.logger.Debugf("connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err)
s.logger.Debugf("stream handler: connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
s.protocolsmu.RUnlock()
return
}
}
}
Expand All @@ -331,10 +337,10 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
s.lightNodes.Connected(ctx, peer)
//light node announces explicitly
if err := s.notifier.Announce(ctx, peer.Address); err != nil {
s.logger.Debugf("notifier.Announce: %s: %v", peer.Address.String(), err)
s.logger.Debugf("stream handler: notifier.Announce: %s: %v", peer.Address.String(), err)
}
} else if err := s.notifier.Connected(ctx, peer); err != nil { // full node announces implicitly
s.logger.Debugf("notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
Expand All @@ -344,14 +350,20 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(overlay)
return
}
}

s.metrics.HandledStreamCount.Inc()
s.logger.Debugf("successfully connected to peer %s%s (inbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("successfully connected to peer %s%s (inbound)", i.BzzAddress.Overlay, i.LightString())
if !s.peers.Exists(overlay) {
s.logger.Warningf("stream handler: inbound peer %s does not exist, disconnecting", overlay)
_ = s.Disconnect(overlay)
return
}

s.logger.Debugf("stream handler: successfully connected to peer %s%s (inbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("stream handler: successfully connected to peer %s%s (inbound)", i.BzzAddress.Overlay, i.LightString())
})

h.Network().SetConnHandler(func(_ network.Conn) {
Expand Down Expand Up @@ -380,7 +392,6 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
peerID := streamlibp2p.Conn().RemotePeer()
overlay, found := s.peers.overlay(peerID)
if !found {
_ = s.Disconnect(overlay)
_ = streamlibp2p.Reset()
s.logger.Debugf("overlay address for peer %q not found", peerID)
return
Expand Down Expand Up @@ -544,7 +555,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, p2p.ErrDialLightNode
}

blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay)
overlay := i.BzzAddress.Overlay

blocked, err := s.blocklist.Exists(overlay)
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err)
s.logger.Errorf("internal error while connecting with peer %s", info.ID)
Expand All @@ -560,50 +573,59 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("peer blocklisted")
}

if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists {
if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("peer exists, full close: %w", err)
}

return i.BzzAddress, nil
}

if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("connect full close %w", err)
}

err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
return nil, fmt.Errorf("storing bzz address: %w", err)
if i.FullNode {
err = s.addressbook.Put(overlay, *i.BzzAddress)
if err != nil {
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("storing bzz address: %w", err)
}
}

s.protocolsmu.RLock()
for _, tn := range s.protocols {
if tn.ConnectOut != nil {
if err := tn.ConnectOut(ctx, p2p.Peer{Address: i.BzzAddress.Overlay}); err != nil {
s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err)
if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay}); err != nil {
s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
s.protocolsmu.RUnlock()
return nil, fmt.Errorf("connectOut: protocol: %s, version:%s: %w", tn.Name, tn.Version, err)
}
}
}

s.protocolsmu.RUnlock()

if !s.peers.Exists(overlay) {
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("libp2p connect: peer %s does not exist %w", overlay, p2p.ErrPeerNotFound)
}

s.metrics.CreatedConnectionCount.Inc()

s.logger.Debugf("successfully connected to peer %s%s (outbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("successfully connected to peer %s%s (outbound)", i.BzzAddress.Overlay, i.LightString())
s.logger.Infof("successfully connected to peer %s%s (outbound)", overlay, i.LightString())
return i.BzzAddress, nil
}

func (s *Service) Disconnect(overlay swarm.Address) error {
s.metrics.DisconnectCount.Inc()

s.logger.Debugf("libp2p disconnect: disconnecting peer %s", overlay)

found, peerID := s.peers.remove(overlay)
if !found {
return p2p.ErrPeerNotFound
}

_ = s.host.Network().ClosePeer(peerID)

Expand All @@ -617,20 +639,26 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
}
}
}

s.protocolsmu.RUnlock()

if s.notifier != nil {
s.notifier.Disconnected(peer)
}
if s.lightNodes != nil {
s.lightNodes.Disconnected(peer)
}

if !found {
s.logger.Debugf("libp2p disconnect: peer %s not found", overlay)
return p2p.ErrPeerNotFound
}

return nil
}

// disconnected is a registered peer registry event
func (s *Service) disconnected(address swarm.Address) {

peer := p2p.Peer{Address: address}
s.protocolsmu.RLock()
for _, tn := range s.protocols {
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error {

err := k.discovery.BroadcastPeers(ctx, peer, addrs...)
if err != nil {
k.logger.Errorf("kademlia: could not broadcast to peer %s", peer)
_ = k.p2p.Disconnect(peer)
}

Expand Down Expand Up @@ -807,6 +808,9 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {

// Disconnected is called when peer disconnects.
func (k *Kad) Disconnected(peer p2p.Peer) {

k.logger.Debugf("kademlia: disconnected peer %s", peer.Address)

po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
k.connectedPeers.Remove(peer.Address, po)

Expand Down

0 comments on commit 4b5e043

Please sign in to comment.