From 048a4b3459d78cdd8d721ac7c459d82b2d558c98 Mon Sep 17 00:00:00 2001 From: aloknerurkar Date: Wed, 8 Sep 2021 19:15:01 +0530 Subject: [PATCH] feat: p2p disconnect and blocklist with reason (#2471) --- pkg/accounting/accounting.go | 12 ++++---- pkg/accounting/accounting_test.go | 17 +++++++++-- pkg/debugapi/peer.go | 4 +-- pkg/debugapi/peer_test.go | 6 +++- pkg/node/devnode.go | 2 +- pkg/p2p/libp2p/connections_test.go | 25 +++++++++------ pkg/p2p/libp2p/libp2p.go | 42 +++++++++++++------------- pkg/p2p/libp2p/protocols_test.go | 2 +- pkg/p2p/mock/mock.go | 16 +++++----- pkg/p2p/p2p.go | 4 +-- pkg/p2p/streamtest/streamtest.go | 4 +-- pkg/topology/kademlia/kademlia.go | 10 +++--- pkg/topology/kademlia/kademlia_test.go | 2 +- 13 files changed, 83 insertions(+), 63 deletions(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index 3db6c748842..cb145555f76 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -362,7 +362,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { acceptedAmount, timestamp, err := a.refreshFunction(context.Background(), peer, paymentAmount, shadowBalance) if err != nil { a.metrics.AccountingDisconnectsEnforceRefreshCount.Inc() - _ = a.blocklist(peer, 1) + _ = a.blocklist(peer, 1, "failed to refresh") return fmt.Errorf("refresh failure: %w", err) } @@ -1020,7 +1020,7 @@ func (d *debitAction) Cleanup() { d.accountingPeer.ghostBalance = new(big.Int).Add(d.accountingPeer.ghostBalance, d.price) if d.accountingPeer.ghostBalance.Cmp(a.disconnectLimit) > 0 { a.metrics.AccountingDisconnectsGhostOverdrawCount.Inc() - _ = a.blocklist(d.peer, 1) + _ = a.blocklist(d.peer, 1, "ghost overdraw") } } } @@ -1047,14 +1047,14 @@ func (a *Accounting) blocklistUntil(peer swarm.Address, multiplier int64) (int64 return kInt, nil } -func (a *Accounting) blocklist(peer swarm.Address, multiplier int64) error { +func (a *Accounting) blocklist(peer swarm.Address, multiplier int64, reason string) error { disconnectFor, err := a.blocklistUntil(peer, multiplier) if err != nil { - return a.p2p.Blocklist(peer, 1*time.Minute) + return a.p2p.Blocklist(peer, 1*time.Minute, reason) } - return a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second) + return a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second, reason) } func (a *Accounting) Connect(peer swarm.Address) { @@ -1140,7 +1140,7 @@ func (a *Accounting) Disconnect(peer swarm.Address) { disconnectFor = int64(60) } accountingPeer.connected = false - _ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second) + _ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second, "disconnected") } } diff --git a/pkg/accounting/accounting_test.go b/pkg/accounting/accounting_test.go index aa8bf1c2961..c4d13fa957e 100644 --- a/pkg/accounting/accounting_test.go +++ b/pkg/accounting/accounting_test.go @@ -1273,6 +1273,8 @@ func TestAccountingCallPaymentFailureRetries(t *testing.T) { acc.Release(peer1Addr, 1) } +var errInvalidReason = errors.New("invalid blocklist reason") + func TestAccountingGhostOverdraft(t *testing.T) { logger := logging.New(ioutil.Discard, 0) @@ -1283,7 +1285,10 @@ func TestAccountingGhostOverdraft(t *testing.T) { paymentThresholdInRefreshmentSeconds := new(big.Int).Div(testPaymentThreshold, big.NewInt(testRefreshRate)).Uint64() - f := func(s swarm.Address, t time.Duration) error { + f := func(s swarm.Address, t time.Duration, reason string) error { + if reason != "ghost overdraw" { + return errInvalidReason + } blocklistTime = int64(t.Seconds()) return nil } @@ -1354,7 +1359,10 @@ func TestAccountingReconnectBeforeAllowed(t *testing.T) { paymentThresholdInRefreshmentSeconds := new(big.Int).Div(testPaymentThreshold, big.NewInt(testRefreshRate)).Uint64() - f := func(s swarm.Address, t time.Duration) error { + f := func(s swarm.Address, t time.Duration, reason string) error { + if reason != "disconnected" { + return errInvalidReason + } blocklistTime = int64(t.Seconds()) return nil } @@ -1421,7 +1429,10 @@ func TestAccountingResetBalanceAfterReconnect(t *testing.T) { paymentThresholdInRefreshmentSeconds := new(big.Int).Div(testPaymentThreshold, big.NewInt(testRefreshRate)).Uint64() - f := func(s swarm.Address, t time.Duration) error { + f := func(s swarm.Address, t time.Duration, reason string) error { + if reason != "disconnected" { + return errInvalidReason + } blocklistTime = int64(t.Seconds()) return nil } diff --git a/pkg/debugapi/peer.go b/pkg/debugapi/peer.go index 9c10a7c1705..81407f1be62 100644 --- a/pkg/debugapi/peer.go +++ b/pkg/debugapi/peer.go @@ -36,7 +36,7 @@ func (s *Service) peerConnectHandler(w http.ResponseWriter, r *http.Request) { } if err := s.topologyDriver.Connected(r.Context(), p2p.Peer{Address: bzzAddr.Overlay}, true); err != nil { - _ = s.p2p.Disconnect(bzzAddr.Overlay) + _ = s.p2p.Disconnect(bzzAddr.Overlay, "failed to notify topology") s.logger.Debugf("debug api: peer connect handler %s: %v", addr, err) s.logger.Errorf("unable to connect to peer %s", addr) jsonhttp.InternalServerError(w, err) @@ -57,7 +57,7 @@ func (s *Service) peerDisconnectHandler(w http.ResponseWriter, r *http.Request) return } - if err := s.p2p.Disconnect(swarmAddr); err != nil { + if err := s.p2p.Disconnect(swarmAddr, "user requested disconnect"); err != nil { s.logger.Debugf("debug api: peer disconnect %s: %v", addr, err) if errors.Is(err, p2p.ErrPeerNotFound) { jsonhttp.BadRequest(w, "peer not found") diff --git a/pkg/debugapi/peer_test.go b/pkg/debugapi/peer_test.go index d7221ca1cb8..aa591391778 100644 --- a/pkg/debugapi/peer_test.go +++ b/pkg/debugapi/peer_test.go @@ -109,7 +109,11 @@ func TestDisconnect(t *testing.T) { testErr := errors.New("test error") testServer := newTestServer(t, testServerOptions{ - P2P: mock.New(mock.WithDisconnectFunc(func(addr swarm.Address) error { + P2P: mock.New(mock.WithDisconnectFunc(func(addr swarm.Address, reason string) error { + if reason != "user requested disconnect" { + return testErr + } + if addr.Equal(address) { return nil } diff --git a/pkg/node/devnode.go b/pkg/node/devnode.go index bc1a20659a0..25c2f16bdf0 100644 --- a/pkg/node/devnode.go +++ b/pkg/node/devnode.go @@ -310,7 +310,7 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) { mockP2P.WithConnectFunc(func(ctx context.Context, addr multiaddr.Multiaddr) (address *bzz.Address, err error) { return &bzz.Address{}, nil }), mockP2P.WithDisconnectFunc( - func(overlay swarm.Address) error { + func(swarm.Address, string) error { return nil }, ), mockP2P.WithAddressesFunc( diff --git a/pkg/p2p/libp2p/connections_test.go b/pkg/p2p/libp2p/connections_test.go index 32113c81c45..a08f1ef9dba 100644 --- a/pkg/p2p/libp2p/connections_test.go +++ b/pkg/p2p/libp2p/connections_test.go @@ -28,6 +28,11 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +const ( + testDisconnectMsg = "test disconnect" + testBlocklistMsg = "test blocklist" +) + func TestAddresses(t *testing.T) { s, _ := newService(t, 1, libp2pServiceOpts{}) @@ -59,7 +64,7 @@ func TestConnectDisconnect(t *testing.T) { expectPeers(t, s2, overlay1) expectPeersEventually(t, s1, overlay2) - if err := s2.Disconnect(bzzAddr.Overlay); err != nil { + if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil { t.Fatal(err) } @@ -175,14 +180,14 @@ func TestDoubleDisconnect(t *testing.T) { expectPeers(t, s2, overlay1) expectPeersEventually(t, s1, overlay2) - if err := s2.Disconnect(bzzAddr.Overlay); err != nil { + if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil { t.Fatal(err) } expectPeers(t, s2) expectPeersEventually(t, s1) - if err := s2.Disconnect(bzzAddr.Overlay); !errors.Is(err, p2p.ErrPeerNotFound) { + if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); !errors.Is(err, p2p.ErrPeerNotFound) { t.Errorf("got error %v, want %v", err, p2p.ErrPeerNotFound) } @@ -210,7 +215,7 @@ func TestMultipleConnectDisconnect(t *testing.T) { expectPeers(t, s2, overlay1) expectPeersEventually(t, s1, overlay2) - if err := s2.Disconnect(bzzAddr.Overlay); err != nil { + if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil { t.Fatal(err) } @@ -225,7 +230,7 @@ func TestMultipleConnectDisconnect(t *testing.T) { expectPeers(t, s2, overlay1) expectPeersEventually(t, s1, overlay2) - if err := s2.Disconnect(bzzAddr.Overlay); err != nil { + if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil { t.Fatal(err) } @@ -256,7 +261,7 @@ func TestConnectDisconnectOnAllAddresses(t *testing.T) { expectPeers(t, s2, overlay1) expectPeersEventually(t, s1, overlay2) - if err := s2.Disconnect(bzzAddr.Overlay); err != nil { + if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil { t.Fatal(err) } @@ -294,7 +299,7 @@ func TestDoubleConnectOnAllAddresses(t *testing.T) { expectPeers(t, s2, overlay1) expectPeers(t, s1, overlay2) - if err := s2.Disconnect(overlay1); err != nil { + if err := s2.Disconnect(overlay1, testDisconnectMsg); err != nil { t.Fatal(err) } @@ -407,7 +412,7 @@ func TestBlocklisting(t *testing.T) { expectPeers(t, s2, overlay1) expectPeersEventually(t, s1, overlay2) - if err := s2.Blocklist(overlay1, 0); err != nil { + if err := s2.Blocklist(overlay1, 0, testBlocklistMsg); err != nil { t.Fatal(err) } @@ -511,7 +516,7 @@ func TestTopologyNotifier(t *testing.T) { checkAddressbook(t, ab2, overlay1, addr) // s2 disconnects from s1 so s1 disconnect notifiee should be called - if err := s2.Disconnect(bzzAddr.Overlay); err != nil { + if err := s2.Disconnect(bzzAddr.Overlay, testDisconnectMsg); err != nil { t.Fatal(err) } @@ -540,7 +545,7 @@ func TestTopologyNotifier(t *testing.T) { waitAddrSet(t, &n2connectedPeer.Address, &mtx, overlay1) // s1 disconnects from s2 so s2 disconnect notifiee should be called - if err := s1.Disconnect(bzzAddr2.Overlay); err != nil { + if err := s1.Disconnect(bzzAddr2.Overlay, testDisconnectMsg); err != nil { t.Fatal(err) } expectPeers(t, s1) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 9d20e74220a..368fc2ab25c 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -342,7 +342,7 @@ func (s *Service) handleIncoming(stream network.Stream) { if err = handshakeStream.FullClose(); err != nil { s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err) s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay) - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "unable to close handshake stream") } return } @@ -350,7 +350,7 @@ func (s *Service) handleIncoming(stream network.Stream) { if err = handshakeStream.FullClose(); err != nil { s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err) s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay) - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "could not fully close stream on handshake") return } @@ -359,7 +359,7 @@ func (s *Service) handleIncoming(stream network.Stream) { if err != nil { s.logger.Debugf("stream handler: addressbook put error %s: %v", peerID, err) s.logger.Errorf("stream handler: unable to persist peer %v", peerID) - _ = s.Disconnect(i.BzzAddress.Overlay) + _ = s.Disconnect(i.BzzAddress.Overlay, "unable to persist peer in addressbook") return } } @@ -371,7 +371,7 @@ func (s *Service) handleIncoming(stream network.Stream) { if tn.ConnectIn != nil { if err := tn.ConnectIn(s.ctx, peer); err != nil { s.logger.Debugf("stream handler: connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err) - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "failed to process inbound connection notifier") s.protocolsmu.RUnlock() return } @@ -392,12 +392,12 @@ func (s *Service) handleIncoming(stream network.Stream) { p, err := s.lightNodes.RandomPeer(peer.Address) if err != nil { s.logger.Debugf("stream handler: cant find a peer slot for light node: %v", err) - _ = s.Disconnect(peer.Address) + _ = s.Disconnect(peer.Address, "unable to find peer slot for light node") return } else { s.logger.Tracef("stream handler: kicking away light node %s to make room for %s", p.String(), peer.Address.String()) s.metrics.KickedOutPeersCount.Inc() - _ = s.Disconnect(p) + _ = s.Disconnect(p, "kicking away light node to make room for peer") return } } @@ -413,7 +413,7 @@ func (s *Service) handleIncoming(stream network.Stream) { // interface, in addition to the possibility of deciding whether // a peer connection is wanted prior to adding the peer to the // peer registry and starting the protocols. - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "unable to signal connection notifier") return } // when a full node connects, we gossip about it to the @@ -433,7 +433,7 @@ func (s *Service) handleIncoming(stream network.Stream) { s.metrics.HandledStreamCount.Inc() if !s.peers.Exists(overlay) { s.logger.Warningf("stream handler: inbound peer %s does not exist, disconnecting", overlay) - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "unknown inbound peer") return } @@ -502,14 +502,14 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { if errors.As(err, &de) { logger.Tracef("libp2p handler(%s): disconnecting %s", p.Name, overlay.String()) _ = stream.Reset() - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, de.Error()) logger.Tracef("handler(%s): disconnecting %s due to disconnect error", p.Name, overlay.String()) } var bpe *p2p.BlockPeerError if errors.As(err, &bpe) { _ = stream.Reset() - if err := s.Blocklist(overlay, bpe.Duration()); err != nil { + if err := s.Blocklist(overlay, bpe.Duration(), bpe.Error()); err != nil { logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err) logger.Errorf("unable to blocklist peer %v", peerID) } @@ -555,16 +555,16 @@ func (s *Service) NATManager() basichost.NATManager { return s.natManager } -func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error { - s.logger.Tracef("libp2p blocklist: peer %s for %v", overlay.String(), duration) +func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration, reason string) error { + s.logger.Tracef("libp2p blocklist: peer %s for %v reason: %s", overlay.String(), duration, reason) if err := s.blocklist.Add(overlay, duration); err != nil { s.metrics.BlocklistedPeerErrCount.Inc() - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "failed blocklisting peer") return fmt.Errorf("blocklist peer %s: %v", overlay, err) } s.metrics.BlocklistedPeerCount.Inc() - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "blocklisting peer") return nil } @@ -652,7 +652,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. if exists := s.peers.addIfNotExists(stream.Conn(), overlay, i.FullNode); exists { if err := handshakeStream.FullClose(); err != nil { - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "failed closing handshake stream after connect") return nil, fmt.Errorf("peer exists, full close: %w", err) } @@ -660,14 +660,14 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. } if err := handshakeStream.FullClose(); err != nil { - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "could not fully close handshake stream after connect") return nil, fmt.Errorf("connect full close %w", err) } if i.FullNode { err = s.addressbook.Put(overlay, *i.BzzAddress) if err != nil { - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "failed storing peer in addressbook") return nil, fmt.Errorf("storing bzz address: %w", err) } } @@ -677,7 +677,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. if tn.ConnectOut != nil { if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay, FullNode: i.FullNode, EthereumAddress: i.BzzAddress.EthereumAddress}); err != nil { s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err) - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "failed to process outbound connection notifier") s.protocolsmu.RUnlock() return nil, fmt.Errorf("connectOut: protocol: %s, version:%s: %w", tn.Name, tn.Version, err) } @@ -686,7 +686,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. s.protocolsmu.RUnlock() if !s.peers.Exists(overlay) { - _ = s.Disconnect(overlay) + _ = s.Disconnect(overlay, "outbound peer does not exist") return nil, fmt.Errorf("libp2p connect: peer %s does not exist %w", overlay, p2p.ErrPeerNotFound) } @@ -699,10 +699,10 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. return i.BzzAddress, nil } -func (s *Service) Disconnect(overlay swarm.Address) error { +func (s *Service) Disconnect(overlay swarm.Address, reason string) error { s.metrics.DisconnectCount.Inc() - s.logger.Debugf("libp2p disconnect: disconnecting peer %s", overlay) + s.logger.Debugf("libp2p disconnect: disconnecting peer %s reason: %s", overlay, reason) // found is checked at the bottom of the function found, full, peerID := s.peers.remove(overlay) diff --git a/pkg/p2p/libp2p/protocols_test.go b/pkg/p2p/libp2p/protocols_test.go index 73bcfa49da4..c124dd8bd1e 100644 --- a/pkg/p2p/libp2p/protocols_test.go +++ b/pkg/p2p/libp2p/protocols_test.go @@ -368,7 +368,7 @@ func TestConnectDisconnectEvents(t *testing.T) { expectCounter(t, &dinCount, 0, &countMU) expectCounter(t, &doutCount, 0, &countMU) - if err := s2.Disconnect(overlay1); err != nil { + if err := s2.Disconnect(overlay1, "test disconnect"); err != nil { t.Fatal(err) } diff --git a/pkg/p2p/mock/mock.go b/pkg/p2p/mock/mock.go index 03593c4dfc6..f488392b788 100644 --- a/pkg/p2p/mock/mock.go +++ b/pkg/p2p/mock/mock.go @@ -19,14 +19,14 @@ import ( type Service struct { addProtocolFunc func(p2p.ProtocolSpec) error connectFunc func(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) - disconnectFunc func(overlay swarm.Address) error + disconnectFunc func(overlay swarm.Address, reason string) error peersFunc func() []p2p.Peer blocklistedPeersFunc func() ([]p2p.Peer, error) addressesFunc func() ([]ma.Multiaddr, error) notifierFunc p2p.PickyNotifier setWelcomeMessageFunc func(string) error getWelcomeMessageFunc func() string - blocklistFunc func(swarm.Address, time.Duration) error + blocklistFunc func(swarm.Address, time.Duration, string) error welcomeMessage string } @@ -45,7 +45,7 @@ func WithConnectFunc(f func(ctx context.Context, addr ma.Multiaddr) (address *bz } // WithDisconnectFunc sets the mock implementation of the Disconnect function -func WithDisconnectFunc(f func(overlay swarm.Address) error) Option { +func WithDisconnectFunc(f func(overlay swarm.Address, reason string) error) Option { return optionFunc(func(s *Service) { s.disconnectFunc = f }) @@ -86,7 +86,7 @@ func WithSetWelcomeMessageFunc(f func(string) error) Option { }) } -func WithBlocklistFunc(f func(swarm.Address, time.Duration) error) Option { +func WithBlocklistFunc(f func(swarm.Address, time.Duration, string) error) Option { return optionFunc(func(s *Service) { s.blocklistFunc = f }) @@ -115,7 +115,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. return s.connectFunc(ctx, addr) } -func (s *Service) Disconnect(overlay swarm.Address) error { +func (s *Service) Disconnect(overlay swarm.Address, reason string) error { if s.disconnectFunc == nil { return errors.New("function Disconnect not configured") } @@ -124,7 +124,7 @@ func (s *Service) Disconnect(overlay swarm.Address) error { s.notifierFunc.Disconnected(p2p.Peer{Address: overlay}) } - return s.disconnectFunc(overlay) + return s.disconnectFunc(overlay, reason) } func (s *Service) Addresses() ([]ma.Multiaddr, error) { @@ -166,11 +166,11 @@ func (s *Service) GetWelcomeMessage() string { func (s *Service) Halt() {} -func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error { +func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration, reason string) error { if s.blocklistFunc == nil { return errors.New("function blocklist not configured") } - return s.blocklistFunc(overlay, duration) + return s.blocklistFunc(overlay, duration, reason) } func (s *Service) SetPickyNotifier(f p2p.PickyNotifier) { diff --git a/pkg/p2p/p2p.go b/pkg/p2p/p2p.go index 52ce6a453ec..aea06088ae3 100644 --- a/pkg/p2p/p2p.go +++ b/pkg/p2p/p2p.go @@ -30,10 +30,10 @@ type Service interface { } type Disconnecter interface { - Disconnect(overlay swarm.Address) error + Disconnect(overlay swarm.Address, reason string) error // Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration // duration 0 is treated as an infinite duration - Blocklist(overlay swarm.Address, duration time.Duration) error + Blocklist(overlay swarm.Address, duration time.Duration, reason string) error } type Halter interface { diff --git a/pkg/p2p/streamtest/streamtest.go b/pkg/p2p/streamtest/streamtest.go index f1e7ad74837..4c75b5def2b 100644 --- a/pkg/p2p/streamtest/streamtest.go +++ b/pkg/p2p/streamtest/streamtest.go @@ -399,7 +399,7 @@ func NewRecorderDisconnecter(r *Recorder) *RecorderDisconnecter { } } -func (r *RecorderDisconnecter) Disconnect(overlay swarm.Address) error { +func (r *RecorderDisconnecter) Disconnect(overlay swarm.Address, _ string) error { r.mu.Lock() defer r.mu.Unlock() @@ -407,7 +407,7 @@ func (r *RecorderDisconnecter) Disconnect(overlay swarm.Address) error { return nil } -func (r *RecorderDisconnecter) Blocklist(overlay swarm.Address, d time.Duration) error { +func (r *RecorderDisconnecter) Blocklist(overlay swarm.Address, d time.Duration, _ string) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index f11b50aeb37..1c8aeff7725 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -533,7 +533,7 @@ func (k *Kad) pruneOversaturatedBins(depth uint8) { newestPeer = peer } } - err := k.p2p.Disconnect(newestPeer) + err := k.p2p.Disconnect(newestPeer, "pruned from oversaturated bin") if err != nil { k.logger.Debugf("prune disconnect fail %v", err) } @@ -783,8 +783,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) return err case !i.Overlay.Equal(peer): - _ = k.p2p.Disconnect(peer) - _ = k.p2p.Disconnect(i.Overlay) + _ = k.p2p.Disconnect(peer, errOverlayMismatch.Error()) + _ = k.p2p.Disconnect(i.Overlay, errOverlayMismatch.Error()) return errOverlayMismatch } @@ -830,7 +830,7 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e err := k.discovery.BroadcastPeers(ctx, peer, addrs...) if err != nil { k.logger.Errorf("kademlia: could not broadcast to peer %s", peer) - _ = k.p2p.Disconnect(peer) + _ = k.p2p.Disconnect(peer, "failed broadcasting to peer") } return err @@ -882,7 +882,7 @@ func (k *Kad) Connected(ctx context.Context, peer p2p.Peer, forceConnection bool if err != nil { return err } - _ = k.p2p.Disconnect(randPeer) + _ = k.p2p.Disconnect(randPeer, "kicking out random peer to accommodate node") return k.onConnected(ctx, address) } if !forceConnection { diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index af55571a1b3..7c89c6fbae9 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1410,7 +1410,7 @@ func p2pMock(ab addressbook.Interface, signer beeCrypto.Signer, counter, failedC return bzzAddr, nil }), - p2pmock.WithDisconnectFunc(func(swarm.Address) error { + p2pmock.WithDisconnectFunc(func(swarm.Address, string) error { if counter != nil { _ = atomic.AddInt32(counter, -1) }