Skip to content

Commit

Permalink
sharded peer manager
Browse files Browse the repository at this point in the history
Fix possible out of bound & logic error

Filter peers per protocol & rename proc

Fix out of bound & refactor dialling

Fix catching raise VS timeout & tests fixes

Fix test to connect to all peer per proto

Fix test

Div target per shard count

Logging & stuff

Fixes

Log peer count

More logs

Remove protobook override & clean up

Fix relay peer management & logs

Mics Fixes
  • Loading branch information
SionoiS committed Nov 27, 2023
1 parent c301e88 commit 7580a6a
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 145 deletions.
60 changes: 43 additions & 17 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, sequtils, times],
std/[options, sequtils, times, sugar],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand All @@ -21,6 +21,7 @@ import
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_core/topics,
../../waku/waku_relay,
../../waku/waku_store,
../../waku/waku_filter,
Expand Down Expand Up @@ -128,7 +129,6 @@ procSuite "Peer Manager":

await node.stop()


asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
Expand Down Expand Up @@ -225,19 +225,30 @@ procSuite "Peer Manager":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo

node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")

await node1.start()
await node2.start()

await node1.mountRelay()
await node2.mountRelay()

# Setup sharding info
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)

require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))
let isConnected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert isConnected == true, "Node 1 and 2 not connected"

check:
# Currently connected to node2
Expand All @@ -246,18 +257,29 @@ procSuite "Peer Manager":
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
let
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
peerStorage = storage
)

node3.mountMetadata(0).expect("Mounted Waku Metadata")

await node3.start()

await node3.mountRelay()

check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.mountRelay()
await node3.peerManager.connectToRelayPeers()
await node3.peerManager.manageRelayPeers()

# Can't work because .manageRelayPeers() require sharding information
# but the ENR is not save in storage

check:
# Reconnected to node2 after "restart"
Expand Down Expand Up @@ -297,9 +319,9 @@ procSuite "Peer Manager":
topics = @["/waku/2/rs/4/0"],
)

discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
Expand All @@ -318,7 +340,6 @@ procSuite "Peer Manager":
conn2.isNone
conn3.isSome


# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
Expand Down Expand Up @@ -380,19 +401,24 @@ procSuite "Peer Manager":
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them
await allFutures(nodes.mapIt(it.start()))
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))

# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo

# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])

# Connect to relay peers
await nodes[0].peerManager.connectToRelayPeers()
await nodes[0].peerManager.manageRelayPeers()

check:
# Peerstore track all three peers
Expand Down
4 changes: 3 additions & 1 deletion tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
else: extPort

let conf = defaultTestWakuNodeConf()
var conf = defaultTestWakuNodeConf()

conf.topics = topics

if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails
Expand Down
Loading

0 comments on commit 7580a6a

Please sign in to comment.