Skip to content

Commit

Permalink
bug: move canBeConnected to PeerManager and check for potential overf…
Browse files Browse the repository at this point in the history
…low (#1670)
  • Loading branch information
vpavlin authored Apr 14, 2023
1 parent 73cbafa commit d5c2770
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 75 deletions.
92 changes: 82 additions & 10 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, sequtils],
std/[options, sequtils, times],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand Down Expand Up @@ -195,20 +195,14 @@ procSuite "Peer Manager":
conn1Ok == false

# Right after failing there is a backoff period
nodes[0].peerManager.peerStore.canBeConnected(
nonExistentPeer.peerId,
nodes[0].peerManager.initialBackoffInSec,
nodes[0].peerManager.backoffFactor) == false
nodes[0].peerManager.canBeConnected(nonExistentPeer.peerId) == false

# We wait the first backoff period
await sleepAsync(2100.milliseconds)
await sleepAsync(chronos.milliseconds(2100))

# And backoff period is over
check:
nodes[0].peerManager.peerStore.canBeConnected(
nodes[1].peerInfo.peerId,
nodes[0].peerManager.initialBackoffInSec,
nodes[0].peerManager.backoffFactor) == true
nodes[0].peerManager.canBeConnected(nodes[1].peerInfo.peerId) == true

# After a successful connection, the number of failed connections is reset
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4
Expand Down Expand Up @@ -655,3 +649,81 @@ procSuite "Peer Manager":
not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId)
not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId)
not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId)

asyncTest "canBeConnected() returns correct value":
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
storage = nil)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")


# new peer with no errors can be connected
check:
pm.canBeConnected(p1) == true

# peer with ONE error that just failed
pm.peerStore[NumberFailedConnBook][p1] = 1
pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
# we cant connect right now
check:
pm.canBeConnected(p1) == false

# but we can after the first backoff of 1 seconds
await sleepAsync(chronos.milliseconds(1200))
check:
pm.canBeConnected(p1) == true

# peer with TWO errors, we can connect until 2 seconds have passed
pm.peerStore[NumberFailedConnBook][p1] = 2
pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)

# cant be connected after 1 second
await sleepAsync(chronos.milliseconds(1000))
check:
pm.canBeConnected(p1) == false

# can be connected after 2 seconds
await sleepAsync(chronos.milliseconds(1200))
check:
pm.canBeConnected(p1) == true

# can't be connected if failed attempts are equal to maxFailedAttempts
pm.maxFailedAttempts = 2
check:
pm.canBeConnected(p1) == false

test "peer manager must fail if max backoff is over a week":
# Should result in overflow exception
expect(Defect):
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxFailedAttempts = 150,
storage = nil)

# Should result in backoff > 1 week
expect(Defect):
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxFailedAttempts = 10,
storage = nil)

let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxFailedAttempts = 5,
storage = nil)
39 changes: 0 additions & 39 deletions tests/v2/test_peer_store_extended.nim
Original file line number Diff line number Diff line change
Expand Up @@ -318,42 +318,3 @@ suite "Extended nim-libp2p Peer Store":
peerStore[DisconnectBook][p1] == 0
peerStore[SourceBook][p1] == default(PeerOrigin)
peerStore[DirectionBook][p1] == default(PeerDirection)

asyncTest "canBeConnected() returns correct value":
let peerStore = PeerStore.new(nil, capacity = 5)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")

# with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
let initialBackoffInSec = 1
let backoffFactor = 2

# new peer with no errors can be connected
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true

# peer with ONE error that just failed
peerStore[NumberFailedConnBook][p1] = 1
peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
# we cant connect right now
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false

# but we can after the first backoff of 1 seconds
await sleepAsync(1200)
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true

# peer with TWO errors, we can connect until 2 seconds have passed
peerStore[NumberFailedConnBook][p1] = 2
peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)

# cant be connected after 1 second
await sleepAsync(1000)
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false

# can be connected after 2 seconds
await sleepAsync(1200)
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true
45 changes: 41 additions & 4 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ else:


import
std/[options, sets, sequtils, times, strutils],
std/[options, sets, sequtils, times, strutils, math],
chronos,
chronicles,
metrics,
Expand Down Expand Up @@ -78,6 +78,13 @@ proc protocolMatcher*(codec: string): Matcher =

return match

proc calculateBackoff(initialBackoffInSec: int,
backoffFactor: int,
failedAttempts: int): timer.Duration =
if failedAttempts == 0:
return chronos.seconds(0)
return chronos.seconds(initialBackoffInSec*(backoffFactor^(failedAttempts-1)))

####################
# Helper functions #
####################
Expand Down Expand Up @@ -235,6 +242,30 @@ proc loadFromStorage(pm: PeerManager) =
else:
debug "successfully queried peer storage"

proc canBeConnected*(pm: PeerManager,
peerId: PeerId): bool =
# Returns if we can try to connect to this peer, based on past failed attempts
# It uses an exponential backoff. Each connection attempt makes us
# wait more before trying again.
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]

# if it never errored, we can try to connect
if failedAttempts == 0:
return true

# if there are too many failed attempts, do not reconnect
if failedAttempts >= pm.maxFailedAttempts:
return false

# If it errored we wait an exponential backoff from last connection
# the more failed attempts, the greater the backoff since last attempt
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
if now >= (lastFailed + backoff):
return true
return false

##################
# Initialisation #
##################
Expand Down Expand Up @@ -286,12 +317,20 @@ proc new*(T: type PeerManager,
maxConnections = maxConnections
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")

# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
if backoff.weeks() > 1:
error "Max backoff time can't be over 1 week",
maxBackoff=backoff
raise newException(Defect, "Max backoff time can't be over 1 week")

let pm = PeerManager(switch: switch,
peerStore: switch.peerStore,
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
maxFailedAttempts: maxFailedAttempts)

proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)

Expand Down Expand Up @@ -467,9 +506,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =

# TODO: Track only relay connections (nwaku/issues/1566)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)

info "Relay peer connections",
Expand Down
22 changes: 0 additions & 22 deletions waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,6 @@ type
# Peer Store API #
##################

proc canBeConnected*(peerStore: PeerStore,
peerId: PeerId,
initialBackoffInSec: int,
backoffFactor: int): bool =
# Returns if we can try to connect to this peer, based on past failed attempts
# It uses an exponential backoff. Each connection attempt makes us
# wait more before trying again.
let failedAttempts = peerStore[NumberFailedConnBook][peerId]

# if it never errored, we can try to connect
if failedAttempts == 0:
return true

# If it errored we wait an exponential backoff from last connection
# the more failed attemps, the greater the backoff since last attempt
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = peerStore[LastFailedConnBook][peerId]
let backoff = chronos.seconds(initialBackoffInSec*(backoffFactor^(failedAttempts-1)))
if now >= (lastFailed + backoff):
return true
return false

proc delete*(peerStore: PeerStore,
peerId: PeerId) =
# Delete all the information of a given peer.
Expand Down

0 comments on commit d5c2770

Please sign in to comment.