Skip to content

Commit

Permalink
sharded peer manager
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Jan 3, 2024
1 parent b0c7805 commit fee20c7
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 203 deletions.
6 changes: 2 additions & 4 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import std/[strformat, strutils, times, json, options, random]
import std/[strformat, strutils, times, options, random]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, results],
nimcrypto/pbkdf2,
metrics,
metrics/chronos_httpserver
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
Expand All @@ -22,11 +21,10 @@ import libp2p/[switch, # manage transports, a single entry poi
peerinfo, # manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,
Expand Down
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
Expand Down
84 changes: 62 additions & 22 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, sequtils, times],
std/[options, sequtils, times, sugar],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand All @@ -21,10 +21,12 @@ import
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_relay,
../../waku/waku_store,
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
Expand Down Expand Up @@ -128,7 +130,6 @@ procSuite "Peer Manager":

await node.stop()


asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
Expand Down Expand Up @@ -225,18 +226,34 @@ procSuite "Peer Manager":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))

node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")

await node1.start()
await node2.start()

await node1.mountRelay()
await node2.mountRelay()

let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)

require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"

check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs

# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))

check:
Expand All @@ -246,18 +263,28 @@ procSuite "Peer Manager":
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
let
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage
)

node3.mountMetadata(0).expect("Mounted Waku Metadata")

await node3.start()

check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.mountRelay()
await node3.peerManager.connectToRelayPeers()

await node3.peerManager.manageRelayPeers()

await sleepAsync(chronos.milliseconds(500))

check:
# Reconnected to node2 after "restart"
Expand Down Expand Up @@ -297,9 +324,9 @@ procSuite "Peer Manager":
topics = @["/waku/2/rs/4/0"],
)

discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
Expand All @@ -318,7 +345,6 @@ procSuite "Peer Manager":
conn2.isNone
conn3.isSome


# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
Expand Down Expand Up @@ -377,22 +403,36 @@ procSuite "Peer Manager":

asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)

# Start them
await allFutures(nodes.mapIt(it.start()))
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))

# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo

# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])

# Connect to relay peers
await nodes[0].peerManager.connectToRelayPeers()
await nodes[0].peerManager.manageRelayPeers()

check:
# Peerstore track all three peers
Expand Down
1 change: 1 addition & 0 deletions tests/test_waku_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/client,
../../waku/waku_lightpush/protocol_metrics,
../../waku/waku_lightpush/rpc,
Expand Down
8 changes: 2 additions & 6 deletions tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/switch
chronos
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/node/peer_manager,
../../waku/waku_node,
./testlib/common,
./testlib/wakucore,
./testlib/wakunode

Expand Down
14 changes: 9 additions & 5 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
nat: "any",
maxConnections: 50,
topics: @[],
relay: true,
maxMessageSize: "1024 KiB"
clusterId: 1.uint32,
topics: @["/waku/2/rs/1/0"],
relay: true
)

proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
Expand All @@ -56,8 +57,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 2.uint32,
topics: seq[string] = @["/waku/2/rs/2/0"],
clusterId: uint32 = 1.uint32,
topics: seq[string] = @["/waku/2/rs/1/0"],
peerStoreCapacity = none(int)): WakuNode =

var resolvedExtIp = extIp
Expand All @@ -67,7 +68,10 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
else: extPort

let conf = defaultTestWakuNodeConf()
var conf = defaultTestWakuNodeConf()

conf.clusterId = clusterId
conf.topics = topics

if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails
Expand Down
3 changes: 1 addition & 2 deletions tests/wakunode_rest/test_rest_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import

import
../../waku/waku_api/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/responses,
Expand Down
Loading

0 comments on commit fee20c7

Please sign in to comment.