Skip to content

Commit

Permalink
fix(libp2p,kademlia): fixes discrepency-kademlia and libp2p peers lists
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed May 12, 2021
1 parent 0b0bb9e commit 06a8c00
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 33 deletions.
97 changes: 64 additions & 33 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,70 +256,80 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
i, err := s.handshakeService.Handle(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID)
if err != nil {
s.logger.Debugf("handshake: handle %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
s.logger.Errorf("handshake: unable to handshake with peer id %v", peerID)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}

blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay)
overlay := i.BzzAddress.Overlay

if s.notifier == nil {
s.logger.Warningf("stream handler: notifier is not set for peer %s", overlay)
}

blocked, err := s.blocklist.Exists(overlay)
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", peerID, err)
s.logger.Errorf("internal error while connecting with peer %s", peerID)
s.logger.Debugf("stream handler: blocklisting: exists %s: %v", overlay, err)
s.logger.Errorf("stream handler: internal error while connecting with peer %s", overlay)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}

if blocked {
s.logger.Errorf("blocked connection from blocklisted peer %s", peerID)
s.logger.Errorf("stream handler: blocked connection from blocklisted peer %s", overlay)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}

if s.notifier != nil {
if !s.notifier.Pick(p2p.Peer{Address: i.BzzAddress.Overlay}) {
s.logger.Errorf("don't want incoming peer %s. disconnecting", peerID)
if !s.notifier.Pick(p2p.Peer{Address: overlay}) {
s.logger.Warningf("stream handler: don't want incoming peer %s. disconnecting", overlay)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}
}

if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists {
s.logger.Debugf("stream handler: peer %s already exists", overlay)
if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay)
s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(overlay)
}
return
}

if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay)
s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(overlay)
return
}

if i.FullNode {
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil {
s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err)
s.logger.Debugf("stream handler: addressbook put error %s: %v", peerID, err)
s.logger.Errorf("unable to persist peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay)
return
}
}

peer := p2p.Peer{Address: i.BzzAddress.Overlay}
peer := p2p.Peer{Address: overlay}

s.protocolsmu.RLock()
for _, tn := range s.protocols {
if tn.ConnectIn != nil {
if err := tn.ConnectIn(ctx, peer); err != nil {
s.logger.Debugf("connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err)
s.logger.Debugf("stream handler: connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
s.protocolsmu.RUnlock()
return
}
}
}
Expand All @@ -343,12 +353,18 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(overlay)
return
}
}

s.metrics.HandledStreamCount.Inc()
if !s.peers.Exists(overlay) {
s.logger.Debugf("libp2p: inbound peer %s does not exist, disconnecting", overlay)
_ = s.Disconnect(overlay)
return
}

s.logger.Debugf("successfully connected to peer %s%s (inbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("successfully connected to peer %s%s (inbound)", i.BzzAddress.Overlay, i.LightString())
})
Expand Down Expand Up @@ -379,7 +395,6 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
peerID := streamlibp2p.Conn().RemotePeer()
overlay, found := s.peers.overlay(peerID)
if !found {
_ = s.Disconnect(overlay)
_ = streamlibp2p.Reset()
s.logger.Debugf("overlay address for peer %q not found", peerID)
return
Expand Down Expand Up @@ -537,7 +552,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("handshake: %w", err)
}

blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay)
overlay := i.BzzAddress.Overlay

blocked, err := s.blocklist.Exists(overlay)
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err)
s.logger.Errorf("internal error while connecting with peer %s", info.ID)
Expand All @@ -553,52 +570,60 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("peer blocklisted")
}

if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists {
if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("peer exists, full close: %w", err)
}

return i.BzzAddress, nil
}

if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("connect full close %w", err)
}

if i.FullNode {
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
err = s.addressbook.Put(overlay, *i.BzzAddress)
if err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("storing bzz address: %w", err)
}
}

s.protocolsmu.RLock()
for _, tn := range s.protocols {
if tn.ConnectOut != nil {
if err := tn.ConnectOut(ctx, p2p.Peer{Address: i.BzzAddress.Overlay}); err != nil {
s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err)
if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay}); err != nil {
s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
s.protocolsmu.RUnlock()
return nil, fmt.Errorf("connectOut: protocol %w", err)
}
}
}

s.protocolsmu.RUnlock()

if !s.peers.Exists(overlay) {
s.logger.Debugf("libp2p Connect: peer %s does not exist, disconnecting", overlay)
_ = s.Disconnect(overlay)
return nil, p2p.ErrPeerNotFound
}

s.metrics.CreatedConnectionCount.Inc()

s.logger.Debugf("successfully connected to peer %s%s (outbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("successfully connected to peer %s%s (outbound)", i.BzzAddress.Overlay, i.LightString())
s.logger.Infof("successfully connected to peer %s%s (outbound)", overlay, i.LightString())
return i.BzzAddress, nil
}

func (s *Service) Disconnect(overlay swarm.Address) error {
s.metrics.DisconnectCount.Inc()

s.logger.Debugf("libp2p disconnect: disconnecting peer %s", overlay)

found, peerID := s.peers.remove(overlay)
if !found {
return p2p.ErrPeerNotFound
}

_ = s.host.Network().ClosePeer(peerID)

Expand All @@ -612,20 +637,26 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
}
}
}

s.protocolsmu.RUnlock()

if s.notifier != nil {
s.notifier.Disconnected(peer)
}
if s.lightNodes != nil {
s.lightNodes.Disconnected(peer)
}

if !found {
s.logger.Debugf("libp2p disconnect: peer %s not founds", overlay)
return p2p.ErrPeerNotFound
}

return nil
}

// disconnected is a registered peer registry event
func (s *Service) disconnected(address swarm.Address) {

peer := p2p.Peer{Address: address}
s.protocolsmu.RLock()
for _, tn := range s.protocols {
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error {

err := k.discovery.BroadcastPeers(ctx, peer, addrs...)
if err != nil {
k.logger.Errorf("kademlia: could not broadcast to peer %s", peer)
_ = k.p2p.Disconnect(peer)
}

Expand Down Expand Up @@ -787,6 +788,9 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {

// Disconnected is called when peer disconnects.
func (k *Kad) Disconnected(peer p2p.Peer) {

k.logger.Debugf("kademlia: disconnected peer %s", peer.Address)

po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
k.connectedPeers.Remove(peer.Address, po)

Expand Down

0 comments on commit 06a8c00

Please sign in to comment.