Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use bad peer removal logic only for lightpush and filter #1252

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
Expand Down
26 changes: 8 additions & 18 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 3
const badPeersCleanupInterval = 1 * time.Minute
const maxDialFailures = 2

// 80% relay peers 20% service peers
Expand Down Expand Up @@ -258,32 +257,27 @@ func (pm *PeerManager) Start(ctx context.Context) {
}
}

func (pm *PeerManager) removeBadPeers() {
if !pm.RelayEnabled {
for _, peerID := range pm.host.Peerstore().Peers() {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures &&
pm.peerConnector.onlineChecker.IsOnline() {
if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically.
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
}

func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval)
t1 := time.NewTicker(badPeersCleanupInterval)
defer t.Stop()
defer t1.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
case <-t1.C:
pm.removeBadPeers()
}
}
}
Expand Down Expand Up @@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil || errors.Is(err, context.Canceled) {
return
}

if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID)
}
Expand All @@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
5 changes: 5 additions & 0 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -249,6 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
wf.metrics.RecordError(dialFailure)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wf.pm.CheckAndRemoveBadPeer(peerID)
}
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/filter_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
defer cancel()
err := wf.Ping(ctxWithTimeout, peer)
if err != nil {
if err != nil && wf.onlineChecker.IsOnline() {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
//quickly retry ping again before marking subscription as failure
//Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.
Expand Down
5 changes: 5 additions & 0 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
wakuLP.metrics.RecordError(dialFailure)
if wakuLP.pm != nil {
wakuLP.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wakuLP.pm.CheckAndRemoveBadPeer(peerID)
}
}
return nil, err
}
Expand Down
Loading