Skip to content

Commit

Permalink
feat: add new metadata protocol (#2062)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta authored Oct 11, 2023
1 parent 25d6e52 commit d5c3ade
Show file tree
Hide file tree
Showing 12 changed files with 408 additions and 13 deletions.
5 changes: 5 additions & 0 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type
name: "log-format" .}: logging.LogFormat

## General node config
clusterId* {.
desc: "Cluster id that the node is running in. Node in a different cluster id is disconnected."
defaultValue: 0
name: "cluster-id" }: uint32

agentString* {.
defaultValue: "nwaku",
desc: "Node agent string which is used as identifier in network"
Expand Down
11 changes: 6 additions & 5 deletions apps/wakunode2/internal_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ proc validateExtMultiAddrs*(vals: seq[string]):
return ok(multiaddrs)

proc dnsResolve*(domain: string, conf: WakuNodeConf): Future[Result[string, string]] {.async} =

# Use conf's DNS servers
var nameServers: seq[TransportAddress]
for ip in conf.dnsAddrsNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53

let dnsResolver = DnsResolver.new(nameServers)

# Resolve domain IP
Expand Down Expand Up @@ -93,18 +93,19 @@ proc networkConfiguration*(conf: WakuNodeConf,
if dns4DomainName.isSome() and extIp.isNone():
try:
let dnsRes = waitFor dnsResolve(conf.dns4DomainName, conf)

if dnsRes.isErr():
return err($dnsRes.error) # Pass error down the stack

extIp = some(ValidIpAddress.init(dnsRes.get()))
except CatchableError:
return err("Could not update extIp to resolved DNS IP: " & getCurrentExceptionMsg())

# Wrap in none because NetConfig does not have a default constructor
# TODO: We could change bindIp in NetConfig to be something less restrictive
# than ValidIpAddress, which doesn't allow default construction
let netConfigRes = NetConfig.init(
clusterId = conf.clusterId,
bindIp = conf.listenAddress,
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
extIp = extIp,
Expand Down
44 changes: 44 additions & 0 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
./testlib/testutils,
./testlib/wakucore,
Expand All @@ -38,6 +39,8 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))

let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
await sleepAsync(chronos.milliseconds(500))

check:
connOk == true
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId)
Expand All @@ -53,6 +56,8 @@ procSuite "Peer Manager":

# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
await sleepAsync(chronos.milliseconds(500))

# Check connection
check:
conn.isSome()
Expand Down Expand Up @@ -145,6 +150,7 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value
require:
(await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false
await sleepAsync(chronos.milliseconds(500))

check:
# Cannot connect to node2
Expand All @@ -153,6 +159,8 @@ procSuite "Peer Manager":
# Successful connection
require:
(await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))

check:
# Currently connected to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
Expand Down Expand Up @@ -229,6 +237,8 @@ procSuite "Peer Manager":

require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))

check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
Expand Down Expand Up @@ -257,6 +267,36 @@ procSuite "Peer Manager":

await allFutures([node1.stop(), node2.stop(), node3.stop()])

asyncTest "Peer manager drops conections to peers on different networks":
let clusterId1 = 1.uint32
let clusterId2 = 2.uint32

let
# different network
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1)

# same network
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])

# 1->2 (fails)
let conn1 = await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)

# 1->3 (fails)
let conn2 = await node1.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)

# 2->3 (succeeds)
let conn3 = await node2.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)

check:
conn1.isNone
conn2.isNone
conn3.isSome


# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
Expand Down Expand Up @@ -370,6 +410,8 @@ procSuite "Peer Manager":
(await nodes[2].peerManager.connectRelay(peerInfos[0])) == true
(await nodes[3].peerManager.connectRelay(peerInfos[0])) == true

await sleepAsync(chronos.milliseconds(500))

check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
Expand Down Expand Up @@ -749,13 +791,15 @@ procSuite "Peer Manager":
# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])
await sleepAsync(chronos.milliseconds(500))

# but one is pruned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])
await sleepAsync(chronos.milliseconds(500))

# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1
Expand Down
50 changes: 50 additions & 0 deletions tests/test_waku_metadata.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{.used.}

import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_discv5,
../../waku/waku_metadata,
./testlib/wakucore,
./testlib/wakunode


procSuite "Waku Metadata Protocol":

# TODO: Add tests with shards when ready
asyncTest "request() returns the supported metadata of the peer":
let clusterId = 10.uint32
let
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId)

# Start nodes
await allFutures([node1.start(), node2.start()])

# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
require:
connOpt.isSome

# Request metadata
let response1 = await node2.wakuMetadata.request(connOpt.get())

# Check the response or dont even continue
require:
response1.isOk

check:
response1.get().clusterId.get() == clusterId
45 changes: 45 additions & 0 deletions tests/test_waku_protobufs.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{.used.}

import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles
import
../../waku/waku_metadata,
../../waku/waku_metadata/rpc,
./testlib/wakucore,
./testlib/wakunode


procSuite "Waku Protobufs":
# TODO: Missing test coverage in many encode/decode protobuf functions

test "WakuMetadataResponse":
let res = WakuMetadataResponse(
clusterId: some(7),
shards: @[10, 23, 33],
)

let buffer = res.encode()

let decodedBuff = WakuMetadataResponse.decode(buffer.buffer)
check:
decodedBuff.isOk()
decodedBuff.get().clusterId.get() == res.clusterId.get()
decodedBuff.get().shards == res.shards

test "WakuMetadataRequest":
let req = WakuMetadataRequest(
clusterId: some(5),
shards: @[100, 2, 0],
)

let buffer = req.encode()

let decodedBuff = WakuMetadataRequest.decode(buffer.buffer)
check:
decodedBuff.isOk()
decodedBuff.get().clusterId.get() == req.clusterId.get()
decodedBuff.get().shards == req.shards

10 changes: 6 additions & 4 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,21 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 0.uint32,
peerStoreCapacity = none(int)): WakuNode =

var resolvedExtIp = extIp

# Update extPort to default value if it's missing and there's an extIp or a DNS domain
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
let extPort = if (extIp.isSome() or dns4DomainName.isSome()) and
extPort.isNone():
some(Port(60000))
else:
extPort

if dns4DomainName.isSome() and extIp.isNone():
let conf = defaultTestWakuNodeConf()
# If there's an error resolving the IP, an exception is thrown and test fails
# If there's an error resolving the IP, an exception is thrown and test fails
let dnsRes = waitFor dnsResolve(dns4DomainName.get(), conf)
if dnsRes.isErr():
raise newException(Defect, $dnsRes.error)
Expand All @@ -76,6 +77,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,

let netConfigRes = NetConfig.init(
bindIp = bindIp,
clusterId = clusterId,
bindPort = bindPort,
extIp = resolvedExtIp,
extPort = extPort,
Expand Down
3 changes: 3 additions & 0 deletions waku/node/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import

type NetConfig* = object
hostAddress*: MultiAddress
clusterId*: uint32
wsHostAddress*: Option[MultiAddress]
hostExtAddress*: Option[MultiAddress]
wsExtAddress*: Option[MultiAddress]
Expand Down Expand Up @@ -69,6 +70,7 @@ proc init*(T: type NetConfig,
wssEnabled: bool = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
clusterId: uint32 = 0,
wakuFlags = none(CapabilitiesBitfield)): NetConfigResult =
## Initialize and validate waku node network configuration

Expand Down Expand Up @@ -137,6 +139,7 @@ proc init*(T: type NetConfig,

ok(NetConfig(
hostAddress: hostAddress,
clusterId: clusterId,
wsHostAddress: wsHostAddress,
hostExtAddress: hostExtAddress,
wsExtAddress: wsExtAddress,
Expand Down
Loading

0 comments on commit d5c3ade

Please sign in to comment.