Skip to content

Commit

Permalink
sharded peer manager
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Oct 24, 2023
1 parent 944dfda commit 36866cd
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 118 deletions.
251 changes: 133 additions & 118 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ const
BackoffFactor = 4

# Limit the amount of paralel dials
MaxParalelDials = 10
MaxParallelDials = 10

# Delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(15)
ConnectivityLoopInterval = chronos.seconds(60)

# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(10)
Expand Down Expand Up @@ -107,8 +107,7 @@ proc insertOrReplace(ps: PeerStorage,
connectedness: Connectedness,
disconnectTime: int64 = 0) =
# Insert peer entry into persistent storage, or replace existing entry with updated info
let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime)
if res.isErr:
if (let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime); res.isErr()):
warn "failed to store peers", err = res.error
waku_peers_errors.inc(labelValues = ["storage_failure"])

Expand Down Expand Up @@ -160,27 +159,23 @@ proc connectRelay*(pm: PeerManager,
pm.addPeer(peer)

let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts
debug "Connecting to relay peer",
wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts

var deadline = sleepAsync(dialTimeout)
var workfut = pm.switch.connect(peerId, peer.addrs)
var reasonFailed = ""
let fut = pm.switch.connect(peerId, peer.addrs)

try:
await workfut or deadline
if workfut.finished():
if not deadline.finished():
deadline.cancel()
let res = catch:
if await fut.withTimeout(dialTimeout):
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])
pm.peerStore[NumberFailedConnBook][peerId] = 0
return true
else:
reasonFailed = "timed out"
await cancelAndWait(workfut)
except CatchableError as exc:
reasonFailed = "remote peer failed"
else: await cancelAndWait(fut)

let reasonFailed =
if res.isOk: "timed out"
else: res.error.msg

# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
Expand Down Expand Up @@ -216,15 +211,15 @@ proc dialPeer(pm: PeerManager,

# Dial Peer
let dialFut = pm.switch.dial(peerId, addrs, proto)
var reasonFailed = ""
try:
if (await dialFut.withTimeout(dialTimeout)):

let res = catch:
if await dialFut.withTimeout(dialTimeout):
return some(dialFut.read())
else:
reasonFailed = "timeout"
await cancelAndWait(dialFut)
except CatchableError as exc:
reasonFailed = "failed"
else: await cancelAndWait(dialFut)

let reasonFailed =
if res.isOk: "timed out"
else: res.error.msg

debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto

Expand Down Expand Up @@ -255,8 +250,7 @@ proc loadFromStorage(pm: PeerManager) =

amount.inc()

let res = pm.storage.getAll(onData)
if res.isErr:
if (let res = pm.storage.getAll(onData); res.isErr()):
warn "failed to load peers from storage", err = res.error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
return
Expand All @@ -283,108 +277,101 @@ proc canBeConnected*(pm: PeerManager,
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
if now >= (lastFailed + backoff):
return true
return false

return now >= (lastFailed + backoff)

##################
# Initialisation #
##################

proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
if pm.switch.connManager.getConnections().hasKey(peerId):
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len != 0:
let observedAddr = conns[0].connection.observedAddr
let ip = observedAddr.get.getHostname()
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
return some(ip)
return none(string)
if not pm.switch.connManager.getConnections().hasKey(peerId):
return none(string)

let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len == 0:
return none(string)

let obAddr = conns[0].connection.observedAddr.valueOr:
return none(string)

# TODO: think if circuit relay ips should be handled differently

return some(obAddr.getHostname())

# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
let direction = if event.incoming: Inbound else: Outbound
#let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard

proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if pm.wakuMetadata.clusterId == 0:
return

let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec)

let reason =
if (let conn = res; conn.isOk()):
if (let metadata = (await pm.wakuMetadata.request(conn.get())); metadata.isOk()):
if (let clusterId = metadata.get().clusterId; clusterId.isSome()):
if pm.wakuMetadata.clusterId == clusterId.get():
if metadata.get().shards.anyIt(pm.wakuMetadata.shards.contains(it)):
return
else: "no shards in common"
else: "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
else: "empty clusterId reported"
else: "failed waku metadata codec request"
else: "waku metadata codec not supported"

info "disconnecting from peer", peerId=peerId, reason=reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)

# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
await pm.onPeerMetadata(peerId)

var direction: PeerDirection
var connectedness: Connectedness

if event.kind == PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected

var clusterOk = false
var reason = ""
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0:
block wakuMetadata:
var conn: Connection
try:
conn = await pm.switch.dial(peerId, WakuMetadataCodec)
except CatchableError:
reason = "waku metadata codec not supported"
break wakuMetadata

# request metadata from connecting peer
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
reason = "failed waku metadata codec request"
break wakuMetadata

# does not report any clusterId
let clusterId = metadata.clusterId.valueOr:
reason = "empty clusterId reported"
break wakuMetadata

# drop it if it doesnt match our network id
if pm.wakuMetadata.clusterId != clusterId:
reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId
break wakuMetadata

# reaching here means the clusterId matches
clusterOk = true

if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk:
info "disconnecting from peer", peerId=peerId, reason=reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)

# TODO: Take action depending on the supported shards as reported by metadata

let ip = pm.getPeerIp(peerId)
if ip.isSome:
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)

let peersBehindIp = pm.ipTable[ip.get]
if peersBehindIp.len > pm.colocationLimit:
case event.kind:
of Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected

if (let ip = pm.getPeerIp(peerId); ip.isSome()):
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)

# in theory this should always be one, but just in case
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
let peersBehindIp = pm.ipTable[ip.get]

let idx = min((peersBehindIp.len - pm.colocationLimit), 0)
for peerId in peersBehindIp[0..<idx]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)

elif event.kind == PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect

# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
of Left:
direction = UnknownDirection
connectedness = CanConnect

# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break

pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)

Expand Down Expand Up @@ -619,30 +606,58 @@ proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
return (numStreamsIn, numStreamsOut)

proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec)
let connsToPrune = min(amount, inRelayPeers.len)

for p in inRelayPeers[0..<connsToPrune]:
asyncSpawn(pm.switch.disconnect(p))

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outRelayPeersTarget
var peersToConnect: seq[RemotePeerInfo]
var peersToDisconnect: int

for shard in pm.wakuMetadata.shards.items:
let peers = pm.peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard))

let connectedInPeers = peers.filterIt(
pm.peerStore.isConnected(it.peerId) and it.direction == Inbound)

let connectedOutPeers = peers.filterIt(
pm.peerStore.isConnected(it.peerId) and it.direction == Outbound)

let outPeerDiff = pm.outRelayPeersTarget - connectedOutPeers.len
let inPeerDiff = pm.inRelayPeersTarget - connectedInPeers.len

# TODO: Temporally disabled. Might be causing connection issues
#if inRelayPeers.len > pm.inRelayPeersTarget:
# await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
if inPeerDiff <= 0:
peersToDisconnect += abs(inPeerDiff)

if outRelayPeers.len >= pm.outRelayPeersTarget:
if outPeerDiff <= 0:
continue

let connectablePeers = peers.filterIt(
(not pm.peerStore.isConnected(it.peerId) and pm.canBeConnected(it.peerId)))

peersToConnect.insert(connectablePeers[0..outPeerDiff])

if peersToConnect.len == 0:
return

let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let numPeersToConnect = min(outsideBackoffPeers.len, MaxParalelDials)
# Even with duplicate, after a couple of iteration the target will be reached
peersToConnect.deduplicate()

let chunks = peersToConnect.len div MaxParallelDials
let rem = peersToConnect.len mod MaxParallelDials

for i in 0..chunks:
let start = i * MaxParallelDials
let stop = (i + 1) * MaxParallelDials
await pm.connectToNodes(peersToConnect[start..<stop])

let start = peersToConnect.len - rem
let stop = peersToConnect.len
await pm.connectToNodes(peersToConnect[start..<stop])

await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect])
await pm.pruneInRelayConns(peersToDisconnect)

proc prunePeerStore*(pm: PeerManager) =
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
Expand Down Expand Up @@ -727,7 +742,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
#let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let totalConnections = pm.switch.connManager.getConnections().len
Expand Down Expand Up @@ -755,4 +770,4 @@ proc start*(pm: PeerManager) =
asyncSpawn pm.logAndMetrics()

proc stop*(pm: PeerManager) =
pm.started = false
pm.started = false
8 changes: 8 additions & 0 deletions waku/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import

import
../../waku_core,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../common/utils/sequence

export peerstore, builders
Expand Down Expand Up @@ -131,3 +133,9 @@ proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInf

proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected)

proc getPeersByShard*(peerStore: PeerStore, cluster, shard: uint16): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().containsShard(cluster, shard))

proc getPeersByCapability*(peerStore: PeerStore, cap: Capabilities): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))

0 comments on commit 36866cd

Please sign in to comment.