Skip to content

Commit

Permalink
feat: curate peers shared over px protocol (#1671)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta authored Apr 19, 2023
1 parent d5ef933 commit 14305c6
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 55 deletions.
3 changes: 0 additions & 3 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
except CatchableError:
return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())

if conf.peerExchange:
asyncSpawn runPeerExchangeDiscv5Loop(node.wakuPeerExchange)

# retrieve px peers and add the to the peer store
if conf.peerExchangeNode != "":
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
Expand Down
11 changes: 6 additions & 5 deletions tests/v2/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,16 @@ procSuite "Waku Peer Exchange":
await allFutures([node1.start(), node2.start(), node3.start()])
await allFutures([node1.startDiscv5(), node2.startDiscv5()])

# Give disv5 some time to discover each other
await sleepAsync(5000.millis)

# node2 can be connected, so will be returned by peer exchange
require (await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo()))

# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()

# Give the algorithm some time to work its magic
await sleepAsync(3000.millis)

asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop()

let connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
check:
connOpt.isSome
Expand Down
6 changes: 5 additions & 1 deletion waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ proc insertOrReplace(ps: PeerStorage,
warn "failed to store peers", err = res.error
waku_peers_errors.inc(labelValues = ["storage_failure"])

proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownOrigin) =
# Adds peer to manager for the specified protocol

if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
Expand All @@ -120,6 +120,10 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =

pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin

if remotePeerInfo.enr.isSome():
pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get()

# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
Expand Down
3 changes: 3 additions & 0 deletions waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,6 @@ proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] =

proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.protocols.contains(proto))

proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected)
16 changes: 6 additions & 10 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -760,11 +760,7 @@ when defined(rln):
proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting waku peer exchange"

var discv5Opt: Option[WakuDiscoveryV5]
if not node.wakuDiscV5.isNil():
discv5Opt = some(node.wakuDiscV5)

node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, discv5Opt)
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager)

if node.started:
await node.wakuPeerExchange.start()
Expand All @@ -780,13 +776,13 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D
let pxPeersRes = await node.wakuPeerExchange.request(amount)
if pxPeersRes.isOk:
var validPeers = 0
for pi in pxPeersRes.get().peerInfos:
let peers = pxPeersRes.get().peerInfos
for pi in peers:
var record: enr.Record
if enr.fromBytes(record, pi.enr):
# TODO: Add source: PX
node.peerManager.addPeer(record.toRemotePeerInfo().get)
node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExcahnge)
validPeers += 1
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers, totalPeers = peers.len
else:
warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error

Expand Down Expand Up @@ -871,7 +867,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =

# Add all peers, new ones and already seen (in case their addresses changed)
for peer in discoveredPeers:
node.peerManager.addPeer(peer)
node.peerManager.addPeer(peer, Discv5)

# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
Expand Down
1 change: 1 addition & 0 deletions waku/v2/utils/peers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type
UnknownOrigin,
Discv5,
Static,
PeerExcahnge,
Dns

PeerDirection* = enum
Expand Down
71 changes: 35 additions & 36 deletions waku/v2/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import
import
../node/peer_manager,
../waku_core,
../utils/heartbeat,
../waku_discv5,
./rpc,
./rpc_codec
Expand All @@ -29,8 +30,8 @@ const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs...
MaxCacheSize = 1000
CacheCleanWindow = 200
MaxPeersCacheSize = 60
CacheRefreshInterval = 15.minutes

WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"

Expand All @@ -47,7 +48,6 @@ type

WakuPeerExchange* = ref object of LPProtocol
peerManager*: PeerManager
wakuDiscv5: Option[WakuDiscoveryV5]
enrCache*: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/

proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
Expand Down Expand Up @@ -95,42 +95,42 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection): Fu

return ok()

proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} =
wpx.enrCache.delete(0..CacheCleanWindow-1)
proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
if wpx.enrCache.len() == 0:
debug "peer exchange ENR cache is empty"
return @[]

proc runPeerExchangeDiscv5Loop*(wpx: WakuPeerExchange) {.async, gcsafe.} =
## Runs a discv5 loop adding new peers to the px peer cache
if wpx.wakuDiscv5.isNone():
warn "Trying to run discovery v5 (for PX) while it's disabled"
return
# copy and shuffle
randomize()
var shuffledCache = wpx.enrCache
shuffledCache.shuffle()

info "Starting peer exchange discovery v5 loop"
# return numPeers or less if cache is smaller
return shuffledCache[0..<min(shuffledCache.len.int, numPeers.int)]

while wpx.wakuDiscv5.get().listening:
trace "Running px discv5 discovery loop"
let discoveredPeers = await wpx.wakuDiscv5.get().findRandomPeers()
info "Discovered px peers via discv5", count=discoveredPeers.get().len()
if discoveredPeers.isOk():
for dp in discoveredPeers.get():
if dp.enr.isSome() and not wpx.enrCache.contains(dp.enr.get()):
wpx.enrCache.add(dp.enr.get())
proc populateEnrCache(wpx: WakuPeerExchange) =
# share only peers that i) are reachable ii) come from discv5
let withEnr = wpx.peerManager.peerStore
.getReachablePeers()
.filterIt(it.origin == Discv5)
.filterIt(it.enr.isSome)

if wpx.enrCache.len() >= MaxCacheSize:
wpx.cleanCache()
# either what we have or max cache size
var newEnrCache = newSeq[enr.Record](0)
for i in 0..<min(withEnr.len, MaxPeersCacheSize):
newEnrCache.add(withEnr[i].enr.get())

## This loop "competes" with the loop in wakunode2
## For the purpose of collecting px peers, 30 sec intervals should be enough
await sleepAsync(30.seconds)
# swap cache for new
wpx.enrCache = newEnrCache

proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
randomize()
if wpx.enrCache.len() == 0:
debug "peer exchange ENR cache is empty"
return @[]
for i in 0..<min(numPeers, wpx.enrCache.len().uint64()):
let ri = rand(0..<wpx.enrCache.len())
# TODO: Note that duplicated peers can be returned here
result.add(wpx.enrCache[ri])
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
# try more aggressively to fill the cache at startup
while wpx.enrCache.len < MaxPeersCacheSize:
wpx.populateEnrCache()
await sleepAsync(5.seconds)

heartbeat "Updating px enr cache", CacheRefreshInterval:
wpx.populateEnrCache()

proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
Expand Down Expand Up @@ -159,11 +159,10 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
wpx.codec = WakuPeerExchangeCodec

proc new*(T: type WakuPeerExchange,
peerManager: PeerManager,
wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)): T =
peerManager: PeerManager): T =
let wpx = WakuPeerExchange(
peerManager: peerManager,
wakuDiscv5: wakuDiscv5
)
wpx.initProtocolHandler()
asyncSpawn wpx.updatePxEnrCache()
return wpx

0 comments on commit 14305c6

Please sign in to comment.