Skip to content

Commit

Permalink
feat: p2p disconnect and blocklist with reason (#2471)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Sep 8, 2021
1 parent bece49b commit 048a4b3
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 63 deletions.
12 changes: 6 additions & 6 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
acceptedAmount, timestamp, err := a.refreshFunction(context.Background(), peer, paymentAmount, shadowBalance)
if err != nil {
a.metrics.AccountingDisconnectsEnforceRefreshCount.Inc()
_ = a.blocklist(peer, 1)
_ = a.blocklist(peer, 1, "failed to refresh")
return fmt.Errorf("refresh failure: %w", err)
}

Expand Down Expand Up @@ -1020,7 +1020,7 @@ func (d *debitAction) Cleanup() {
d.accountingPeer.ghostBalance = new(big.Int).Add(d.accountingPeer.ghostBalance, d.price)
if d.accountingPeer.ghostBalance.Cmp(a.disconnectLimit) > 0 {
a.metrics.AccountingDisconnectsGhostOverdrawCount.Inc()
_ = a.blocklist(d.peer, 1)
_ = a.blocklist(d.peer, 1, "ghost overdraw")
}
}
}
Expand All @@ -1047,14 +1047,14 @@ func (a *Accounting) blocklistUntil(peer swarm.Address, multiplier int64) (int64
return kInt, nil
}

func (a *Accounting) blocklist(peer swarm.Address, multiplier int64) error {
func (a *Accounting) blocklist(peer swarm.Address, multiplier int64, reason string) error {

disconnectFor, err := a.blocklistUntil(peer, multiplier)
if err != nil {
return a.p2p.Blocklist(peer, 1*time.Minute)
return a.p2p.Blocklist(peer, 1*time.Minute, reason)
}

return a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
return a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second, reason)
}

func (a *Accounting) Connect(peer swarm.Address) {
Expand Down Expand Up @@ -1140,7 +1140,7 @@ func (a *Accounting) Disconnect(peer swarm.Address) {
disconnectFor = int64(60)
}
accountingPeer.connected = false
_ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
_ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second, "disconnected")
}
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/accounting/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,8 @@ func TestAccountingCallPaymentFailureRetries(t *testing.T) {
acc.Release(peer1Addr, 1)
}

var errInvalidReason = errors.New("invalid blocklist reason")

func TestAccountingGhostOverdraft(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)

Expand All @@ -1283,7 +1285,10 @@ func TestAccountingGhostOverdraft(t *testing.T) {

paymentThresholdInRefreshmentSeconds := new(big.Int).Div(testPaymentThreshold, big.NewInt(testRefreshRate)).Uint64()

f := func(s swarm.Address, t time.Duration) error {
f := func(s swarm.Address, t time.Duration, reason string) error {
if reason != "ghost overdraw" {
return errInvalidReason
}
blocklistTime = int64(t.Seconds())
return nil
}
Expand Down Expand Up @@ -1354,7 +1359,10 @@ func TestAccountingReconnectBeforeAllowed(t *testing.T) {

paymentThresholdInRefreshmentSeconds := new(big.Int).Div(testPaymentThreshold, big.NewInt(testRefreshRate)).Uint64()

f := func(s swarm.Address, t time.Duration) error {
f := func(s swarm.Address, t time.Duration, reason string) error {
if reason != "disconnected" {
return errInvalidReason
}
blocklistTime = int64(t.Seconds())
return nil
}
Expand Down Expand Up @@ -1421,7 +1429,10 @@ func TestAccountingResetBalanceAfterReconnect(t *testing.T) {

paymentThresholdInRefreshmentSeconds := new(big.Int).Div(testPaymentThreshold, big.NewInt(testRefreshRate)).Uint64()

f := func(s swarm.Address, t time.Duration) error {
f := func(s swarm.Address, t time.Duration, reason string) error {
if reason != "disconnected" {
return errInvalidReason
}
blocklistTime = int64(t.Seconds())
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/debugapi/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *Service) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
}

if err := s.topologyDriver.Connected(r.Context(), p2p.Peer{Address: bzzAddr.Overlay}, true); err != nil {
_ = s.p2p.Disconnect(bzzAddr.Overlay)
_ = s.p2p.Disconnect(bzzAddr.Overlay, "failed to notify topology")
s.logger.Debugf("debug api: peer connect handler %s: %v", addr, err)
s.logger.Errorf("unable to connect to peer %s", addr)
jsonhttp.InternalServerError(w, err)
Expand All @@ -57,7 +57,7 @@ func (s *Service) peerDisconnectHandler(w http.ResponseWriter, r *http.Request)
return
}

if err := s.p2p.Disconnect(swarmAddr); err != nil {
if err := s.p2p.Disconnect(swarmAddr, "user requested disconnect"); err != nil {
s.logger.Debugf("debug api: peer disconnect %s: %v", addr, err)
if errors.Is(err, p2p.ErrPeerNotFound) {
jsonhttp.BadRequest(w, "peer not found")
Expand Down
6 changes: 5 additions & 1 deletion pkg/debugapi/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ func TestDisconnect(t *testing.T) {
testErr := errors.New("test error")

testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithDisconnectFunc(func(addr swarm.Address) error {
P2P: mock.New(mock.WithDisconnectFunc(func(addr swarm.Address, reason string) error {
if reason != "user requested disconnect" {
return testErr
}

if addr.Equal(address) {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/devnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) {
mockP2P.WithConnectFunc(func(ctx context.Context, addr multiaddr.Multiaddr) (address *bzz.Address, err error) {
return &bzz.Address{}, nil
}), mockP2P.WithDisconnectFunc(
func(overlay swarm.Address) error {
func(swarm.Address, string) error {
return nil
},
), mockP2P.WithAddressesFunc(
Expand Down
25 changes: 15 additions & 10 deletions pkg/p2p/libp2p/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

const (
testDisconnectMsg = "test disconnect"
testBlocklistMsg = "test blocklist"
)

func TestAddresses(t *testing.T) {
s, _ := newService(t, 1, libp2pServiceOpts{})

Expand Down Expand Up @@ -59,7 +64,7 @@ func TestConnectDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)

if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -175,14 +180,14 @@ func TestDoubleDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)

if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil {
t.Fatal(err)
}

expectPeers(t, s2)
expectPeersEventually(t, s1)

if err := s2.Disconnect(bzzAddr.Overlay); !errors.Is(err, p2p.ErrPeerNotFound) {
if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); !errors.Is(err, p2p.ErrPeerNotFound) {
t.Errorf("got error %v, want %v", err, p2p.ErrPeerNotFound)
}

Expand Down Expand Up @@ -210,7 +215,7 @@ func TestMultipleConnectDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)

if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil {
t.Fatal(err)
}

Expand All @@ -225,7 +230,7 @@ func TestMultipleConnectDisconnect(t *testing.T) {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)

if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -256,7 +261,7 @@ func TestConnectDisconnectOnAllAddresses(t *testing.T) {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)

if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -294,7 +299,7 @@ func TestDoubleConnectOnAllAddresses(t *testing.T) {
expectPeers(t, s2, overlay1)
expectPeers(t, s1, overlay2)

if err := s2.Disconnect(overlay1); err != nil {
if err := s2.Disconnect(overlay1, testDisconnectMsg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -407,7 +412,7 @@ func TestBlocklisting(t *testing.T) {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)

if err := s2.Blocklist(overlay1, 0); err != nil {
if err := s2.Blocklist(overlay1, 0, testBlocklistMsg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -511,7 +516,7 @@ func TestTopologyNotifier(t *testing.T) {
checkAddressbook(t, ab2, overlay1, addr)

// s2 disconnects from s1 so s1 disconnect notifiee should be called
if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -540,7 +545,7 @@ func TestTopologyNotifier(t *testing.T) {
waitAddrSet(t, &n2connectedPeer.Address, &mtx, overlay1)

// s1 disconnects from s2 so s2 disconnect notifiee should be called
if err := s1.Disconnect(bzzAddr2.Overlay); err != nil {
if err := s1.Disconnect(bzzAddr2.Overlay, testDisconnectMsg); err != nil {
t.Fatal(err)
}
expectPeers(t, s1)
Expand Down
42 changes: 21 additions & 21 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,15 @@ func (s *Service) handleIncoming(stream network.Stream) {
if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "unable to close handshake stream")
}
return
}

if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "could not fully close stream on handshake")
return
}

Expand All @@ -359,7 +359,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
if err != nil {
s.logger.Debugf("stream handler: addressbook put error %s: %v", peerID, err)
s.logger.Errorf("stream handler: unable to persist peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay)
_ = s.Disconnect(i.BzzAddress.Overlay, "unable to persist peer in addressbook")
return
}
}
Expand All @@ -371,7 +371,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
if tn.ConnectIn != nil {
if err := tn.ConnectIn(s.ctx, peer); err != nil {
s.logger.Debugf("stream handler: connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "failed to process inbound connection notifier")
s.protocolsmu.RUnlock()
return
}
Expand All @@ -392,12 +392,12 @@ func (s *Service) handleIncoming(stream network.Stream) {
p, err := s.lightNodes.RandomPeer(peer.Address)
if err != nil {
s.logger.Debugf("stream handler: cant find a peer slot for light node: %v", err)
_ = s.Disconnect(peer.Address)
_ = s.Disconnect(peer.Address, "unable to find peer slot for light node")
return
} else {
s.logger.Tracef("stream handler: kicking away light node %s to make room for %s", p.String(), peer.Address.String())
s.metrics.KickedOutPeersCount.Inc()
_ = s.Disconnect(p)
_ = s.Disconnect(p, "kicking away light node to make room for peer")
return
}
}
Expand All @@ -413,7 +413,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "unable to signal connection notifier")
return
}
// when a full node connects, we gossip about it to the
Expand All @@ -433,7 +433,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
s.metrics.HandledStreamCount.Inc()
if !s.peers.Exists(overlay) {
s.logger.Warningf("stream handler: inbound peer %s does not exist, disconnecting", overlay)
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "unknown inbound peer")
return
}

Expand Down Expand Up @@ -502,14 +502,14 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if errors.As(err, &de) {
logger.Tracef("libp2p handler(%s): disconnecting %s", p.Name, overlay.String())
_ = stream.Reset()
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, de.Error())
logger.Tracef("handler(%s): disconnecting %s due to disconnect error", p.Name, overlay.String())
}

var bpe *p2p.BlockPeerError
if errors.As(err, &bpe) {
_ = stream.Reset()
if err := s.Blocklist(overlay, bpe.Duration()); err != nil {
if err := s.Blocklist(overlay, bpe.Duration(), bpe.Error()); err != nil {
logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err)
logger.Errorf("unable to blocklist peer %v", peerID)
}
Expand Down Expand Up @@ -555,16 +555,16 @@ func (s *Service) NATManager() basichost.NATManager {
return s.natManager
}

func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error {
s.logger.Tracef("libp2p blocklist: peer %s for %v", overlay.String(), duration)
func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration, reason string) error {
s.logger.Tracef("libp2p blocklist: peer %s for %v reason: %s", overlay.String(), duration, reason)
if err := s.blocklist.Add(overlay, duration); err != nil {
s.metrics.BlocklistedPeerErrCount.Inc()
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "failed blocklisting peer")
return fmt.Errorf("blocklist peer %s: %v", overlay, err)
}
s.metrics.BlocklistedPeerCount.Inc()

_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "blocklisting peer")
return nil
}

Expand Down Expand Up @@ -652,22 +652,22 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.

if exists := s.peers.addIfNotExists(stream.Conn(), overlay, i.FullNode); exists {
if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "failed closing handshake stream after connect")
return nil, fmt.Errorf("peer exists, full close: %w", err)
}

return i.BzzAddress, nil
}

if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "could not fully close handshake stream after connect")
return nil, fmt.Errorf("connect full close %w", err)
}

if i.FullNode {
err = s.addressbook.Put(overlay, *i.BzzAddress)
if err != nil {
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "failed storing peer in addressbook")
return nil, fmt.Errorf("storing bzz address: %w", err)
}
}
Expand All @@ -677,7 +677,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
if tn.ConnectOut != nil {
if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay, FullNode: i.FullNode, EthereumAddress: i.BzzAddress.EthereumAddress}); err != nil {
s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "failed to process outbound connection notifier")
s.protocolsmu.RUnlock()
return nil, fmt.Errorf("connectOut: protocol: %s, version:%s: %w", tn.Name, tn.Version, err)
}
Expand All @@ -686,7 +686,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
s.protocolsmu.RUnlock()

if !s.peers.Exists(overlay) {
_ = s.Disconnect(overlay)
_ = s.Disconnect(overlay, "outbound peer does not exist")
return nil, fmt.Errorf("libp2p connect: peer %s does not exist %w", overlay, p2p.ErrPeerNotFound)
}

Expand All @@ -699,10 +699,10 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return i.BzzAddress, nil
}

func (s *Service) Disconnect(overlay swarm.Address) error {
func (s *Service) Disconnect(overlay swarm.Address, reason string) error {
s.metrics.DisconnectCount.Inc()

s.logger.Debugf("libp2p disconnect: disconnecting peer %s", overlay)
s.logger.Debugf("libp2p disconnect: disconnecting peer %s reason: %s", overlay, reason)

// found is checked at the bottom of the function
found, full, peerID := s.peers.remove(overlay)
Expand Down
Loading

0 comments on commit 048a4b3

Please sign in to comment.