From 06a8c00010df667dcf9fb6f083bce7ea9f6a4ee4 Mon Sep 17 00:00:00 2001 From: Esad Akar Date: Fri, 7 May 2021 18:32:00 +0300 Subject: [PATCH] fix(libp2p,kademlia): fixes discrepency-kademlia and libp2p peers lists --- pkg/p2p/libp2p/libp2p.go | 97 ++++++++++++++++++++----------- pkg/topology/kademlia/kademlia.go | 4 ++ 2 files changed, 68 insertions(+), 33 deletions(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index b33a67c57c8..dc1301c9bec 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -256,70 +256,80 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay i, err := s.handshakeService.Handle(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID) if err != nil { s.logger.Debugf("handshake: handle %s: %v", peerID, err) - s.logger.Errorf("unable to handshake with peer %v", peerID) + s.logger.Errorf("handshake: unable to handshake with peer id %v", peerID) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return } - blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay) + overlay := i.BzzAddress.Overlay + + if s.notifier == nil { + s.logger.Warningf("stream handler: notifier is not set for peer %s", overlay) + } + + blocked, err := s.blocklist.Exists(overlay) if err != nil { - s.logger.Debugf("blocklisting: exists %s: %v", peerID, err) - s.logger.Errorf("internal error while connecting with peer %s", peerID) + s.logger.Debugf("stream handler: blocklisting: exists %s: %v", overlay, err) + s.logger.Errorf("stream handler: internal error while connecting with peer %s", overlay) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return } if blocked { - s.logger.Errorf("blocked connection from blocklisted peer %s", peerID) + s.logger.Errorf("stream handler: blocked connection from blocklisted peer %s", overlay) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return } if s.notifier != nil { - if !s.notifier.Pick(p2p.Peer{Address: i.BzzAddress.Overlay}) { - s.logger.Errorf("don't want incoming peer %s. disconnecting", peerID) + if !s.notifier.Pick(p2p.Peer{Address: overlay}) { + s.logger.Warningf("stream handler: don't want incoming peer %s. disconnecting", overlay) _ = handshakeStream.Reset() _ = s.host.Network().ClosePeer(peerID) return } } - if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { + if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists { + s.logger.Debugf("stream handler: peer %s already exists", overlay) if err = handshakeStream.FullClose(); err != nil { - s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) - s.logger.Errorf("unable to handshake with peer %v", peerID) - _ = s.Disconnect(i.BzzAddress.Overlay) + s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err) + s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay) + _ = s.Disconnect(overlay) } return } if err = handshakeStream.FullClose(); err != nil { - s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) - s.logger.Errorf("unable to handshake with peer %v", peerID) - _ = s.Disconnect(i.BzzAddress.Overlay) + s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err) + s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay) + _ = s.Disconnect(overlay) return } if i.FullNode { err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) if err != nil { - s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err) + s.logger.Debugf("stream handler: addressbook put error %s: %v", peerID, err) s.logger.Errorf("unable to persist peer %v", peerID) _ = s.Disconnect(i.BzzAddress.Overlay) return } } - peer := p2p.Peer{Address: i.BzzAddress.Overlay} + peer := p2p.Peer{Address: overlay} s.protocolsmu.RLock() for _, tn := range s.protocols { if tn.ConnectIn != nil { if err := tn.ConnectIn(ctx, peer); err != nil { - s.logger.Debugf("connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err) + s.logger.Debugf("stream handler: connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err) + _ = s.Disconnect(overlay) + s.protocolsmu.RUnlock() + return } } } @@ -343,12 +353,18 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay // interface, in addition to the possibility of deciding whether // a peer connection is wanted prior to adding the peer to the // peer registry and starting the protocols. - _ = s.Disconnect(i.BzzAddress.Overlay) + _ = s.Disconnect(overlay) return } } s.metrics.HandledStreamCount.Inc() + if !s.peers.Exists(overlay) { + s.logger.Debugf("libp2p: inbound peer %s does not exist, disconnecting", overlay) + _ = s.Disconnect(overlay) + return + } + s.logger.Debugf("successfully connected to peer %s%s (inbound)", i.BzzAddress.ShortString(), i.LightString()) s.logger.Infof("successfully connected to peer %s%s (inbound)", i.BzzAddress.Overlay, i.LightString()) }) @@ -379,7 +395,6 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { peerID := streamlibp2p.Conn().RemotePeer() overlay, found := s.peers.overlay(peerID) if !found { - _ = s.Disconnect(overlay) _ = streamlibp2p.Reset() s.logger.Debugf("overlay address for peer %q not found", peerID) return @@ -537,7 +552,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. return nil, fmt.Errorf("handshake: %w", err) } - blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay) + overlay := i.BzzAddress.Overlay + + blocked, err := s.blocklist.Exists(overlay) if err != nil { s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err) s.logger.Errorf("internal error while connecting with peer %s", info.ID) @@ -553,9 +570,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. return nil, fmt.Errorf("peer blocklisted") } - if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { + if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists { if err := handshakeStream.FullClose(); err != nil { - _ = s.Disconnect(i.BzzAddress.Overlay) + _ = s.Disconnect(overlay) return nil, fmt.Errorf("peer exists, full close: %w", err) } @@ -563,14 +580,14 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. } if err := handshakeStream.FullClose(); err != nil { - _ = s.Disconnect(i.BzzAddress.Overlay) + _ = s.Disconnect(overlay) return nil, fmt.Errorf("connect full close %w", err) } if i.FullNode { - err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) + err = s.addressbook.Put(overlay, *i.BzzAddress) if err != nil { - _ = s.Disconnect(i.BzzAddress.Overlay) + _ = s.Disconnect(overlay) return nil, fmt.Errorf("storing bzz address: %w", err) } } @@ -578,27 +595,35 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. s.protocolsmu.RLock() for _, tn := range s.protocols { if tn.ConnectOut != nil { - if err := tn.ConnectOut(ctx, p2p.Peer{Address: i.BzzAddress.Overlay}); err != nil { - s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err) + if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay}); err != nil { + s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err) + _ = s.Disconnect(overlay) + s.protocolsmu.RUnlock() + return nil, fmt.Errorf("connectOut: protocol %w", err) } } } - s.protocolsmu.RUnlock() + if !s.peers.Exists(overlay) { + s.logger.Debugf("libp2p Connect: peer %s does not exist, disconnecting", overlay) + _ = s.Disconnect(overlay) + return nil, p2p.ErrPeerNotFound + } + s.metrics.CreatedConnectionCount.Inc() s.logger.Debugf("successfully connected to peer %s%s (outbound)", i.BzzAddress.ShortString(), i.LightString()) - s.logger.Infof("successfully connected to peer %s%s (outbound)", i.BzzAddress.Overlay, i.LightString()) + s.logger.Infof("successfully connected to peer %s%s (outbound)", overlay, i.LightString()) return i.BzzAddress, nil } func (s *Service) Disconnect(overlay swarm.Address) error { s.metrics.DisconnectCount.Inc() + + s.logger.Debugf("libp2p disconnect: disconnecting peer %s", overlay) + found, peerID := s.peers.remove(overlay) - if !found { - return p2p.ErrPeerNotFound - } _ = s.host.Network().ClosePeer(peerID) @@ -612,8 +637,8 @@ func (s *Service) Disconnect(overlay swarm.Address) error { } } } - s.protocolsmu.RUnlock() + if s.notifier != nil { s.notifier.Disconnected(peer) } @@ -621,11 +646,17 @@ func (s *Service) Disconnect(overlay swarm.Address) error { s.lightNodes.Disconnected(peer) } + if !found { + s.logger.Debugf("libp2p disconnect: peer %s not founds", overlay) + return p2p.ErrPeerNotFound + } + return nil } // disconnected is a registered peer registry event func (s *Service) disconnected(address swarm.Address) { + peer := p2p.Peer{Address: address} s.protocolsmu.RLock() for _, tn := range s.protocols { diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 099f0081bab..702eb92438c 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -701,6 +701,7 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error { err := k.discovery.BroadcastPeers(ctx, peer, addrs...) if err != nil { + k.logger.Errorf("kademlia: could not broadcast to peer %s", peer) _ = k.p2p.Disconnect(peer) } @@ -787,6 +788,9 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { // Disconnected is called when peer disconnects. func (k *Kad) Disconnected(peer p2p.Peer) { + + k.logger.Debugf("kademlia: disconnected peer %s", peer.Address) + po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes()) k.connectedPeers.Remove(peer.Address, po)