Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sharded peer manager #2151

Merged
merged 16 commits into from
Dec 7, 2023
6 changes: 2 additions & 4 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import std/[strformat, strutils, times, json, options, random]
import std/[strformat, strutils, times, options, random]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, results],
nimcrypto/pbkdf2,
metrics,
metrics/chronos_httpserver
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
Expand All @@ -22,11 +21,10 @@ import libp2p/[switch, # manage transports, a single entry poi
peerinfo, # manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,
Expand Down
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
Expand Down
84 changes: 62 additions & 22 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, sequtils, times],
std/[options, sequtils, times, sugar],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand All @@ -21,10 +21,12 @@ import
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_relay,
../../waku/waku_store,
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
Expand Down Expand Up @@ -128,7 +130,6 @@ procSuite "Peer Manager":

await node.stop()


asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
Expand Down Expand Up @@ -225,18 +226,34 @@ procSuite "Peer Manager":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))

node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")

await node1.start()
await node2.start()

await node1.mountRelay()
await node2.mountRelay()

let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)

require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"

check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs

# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))

check:
Expand All @@ -246,18 +263,28 @@ procSuite "Peer Manager":
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
let
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage
)

node3.mountMetadata(0).expect("Mounted Waku Metadata")

await node3.start()

check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.mountRelay()
await node3.peerManager.connectToRelayPeers()

await node3.peerManager.manageRelayPeers()

await sleepAsync(chronos.milliseconds(500))

check:
# Reconnected to node2 after "restart"
Expand Down Expand Up @@ -297,9 +324,9 @@ procSuite "Peer Manager":
topics = @["/waku/2/rs/4/0"],
)

discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
Expand All @@ -318,7 +345,6 @@ procSuite "Peer Manager":
conn2.isNone
conn3.isSome


# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
Expand Down Expand Up @@ -377,22 +403,36 @@ procSuite "Peer Manager":

asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)

# Start them
await allFutures(nodes.mapIt(it.start()))
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))

# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo

# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])

# Connect to relay peers
await nodes[0].peerManager.connectToRelayPeers()
await nodes[0].peerManager.manageRelayPeers()

check:
# Peerstore track all three peers
Expand Down
1 change: 1 addition & 0 deletions tests/test_waku_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/client,
../../waku/waku_lightpush/protocol_metrics,
../../waku/waku_lightpush/rpc,
Expand Down
8 changes: 2 additions & 6 deletions tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/switch
chronos
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/node/peer_manager,
../../waku/waku_node,
./testlib/common,
./testlib/wakucore,
./testlib/wakunode

Expand Down
12 changes: 8 additions & 4 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")],
nat: "any",
maxConnections: 50,
topics: @[],
clusterId: 1.uint32,
topics: @["/waku/2/rs/1/0"],
relay: true
)

Expand All @@ -55,8 +56,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 2.uint32,
topics: seq[string] = @["/waku/2/rs/2/0"],
clusterId: uint32 = 1.uint32,
topics: seq[string] = @["/waku/2/rs/1/0"],
peerStoreCapacity = none(int)): WakuNode =

var resolvedExtIp = extIp
Expand All @@ -66,7 +67,10 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
else: extPort

let conf = defaultTestWakuNodeConf()
var conf = defaultTestWakuNodeConf()

conf.clusterId = clusterId
conf.topics = topics

if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails
Expand Down
3 changes: 1 addition & 2 deletions tests/wakunode_rest/test_rest_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import

import
../../waku/waku_api/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/responses,
Expand Down
Loading
Loading