Skip to content

Commit

Permalink
fix: Revert "feat: shard aware peer management (#2151)" (#2312)
Browse files Browse the repository at this point in the history
This reverts commit dba9820.

We need to revert this commit because
the waku-simulator stopped working. i.e. the nodes couldn't establish
connections among them: https://github.com/waku-org/waku-simulator/tree/054ba9e33f4fdcdb590bcfe760a5254069c5cb9f

Also, the following js-waku test fails due to this commit:
"same cluster, different shard: nodes connect"

* waku_lightpush/protocol.nim: minor changes to make it compile after revert
  • Loading branch information
Ivansete-status authored Dec 20, 2023
1 parent a1b27ed commit 32668f4
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 347 deletions.
6 changes: 4 additions & 2 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import std/[strformat, strutils, times, options, random]
import std/[strformat, strutils, times, json, options, random]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, results],
nimcrypto/pbkdf2,
metrics,
metrics/chronos_httpserver
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
Expand All @@ -21,10 +22,11 @@ import libp2p/[switch, # manage transports, a single entry poi
peerinfo, # manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/waku_core,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,
Expand Down
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
Expand Down
118 changes: 39 additions & 79 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, sequtils, times, sugar],
std/[options, sequtils, times],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand All @@ -21,12 +21,10 @@ import
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common,
../../waku/waku_relay,
../../waku/waku_store,
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
Expand All @@ -37,7 +35,7 @@ import
procSuite "Peer Manager":
asyncTest "connectRelay() works":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))

let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())
Expand All @@ -50,7 +48,7 @@ procSuite "Peer Manager":

asyncTest "dialPeer() works":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
Expand Down Expand Up @@ -78,7 +76,7 @@ procSuite "Peer Manager":

asyncTest "dialPeer() fails gracefully":
# Create 2 nodes and start them
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))

Expand All @@ -101,7 +99,7 @@ procSuite "Peer Manager":

asyncTest "Adding, selecting and filtering peers work":
let
node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))

# Create filter peer
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
Expand Down Expand Up @@ -130,9 +128,10 @@ procSuite "Peer Manager":

await node.stop()


asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
Expand Down Expand Up @@ -176,7 +175,7 @@ procSuite "Peer Manager":

asyncTest "Peer manager updates failed peers correctly":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
Expand Down Expand Up @@ -226,34 +225,18 @@ procSuite "Peer Manager":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(34023))

node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo

await node1.start()
await node2.start()

await node1.mountRelay()
await node2.mountRelay()

let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)

let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"

check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs

# wait for the peer store update
require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
await sleepAsync(chronos.milliseconds(500))

check:
Expand All @@ -263,28 +246,18 @@ procSuite "Peer Manager":
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
let node3 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("127.0.0.1"),
Port(56037),
peerStorage = storage
)

node3.mountMetadata(0).expect("Mounted Waku Metadata")
let
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)

await node3.start()

check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.mountRelay()

await node3.peerManager.manageRelayPeers()

await sleepAsync(chronos.milliseconds(500))
await node3.peerManager.connectToRelayPeers()

check:
# Reconnected to node2 after "restart"
Expand All @@ -302,7 +275,7 @@ procSuite "Peer Manager":
# different network
node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId3,
topics = @["/waku/2/rs/3/0"],
Expand All @@ -311,22 +284,22 @@ procSuite "Peer Manager":
# same network
node2 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)

node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
Expand All @@ -345,13 +318,14 @@ procSuite "Peer Manager":
conn2.isNone
conn3.isSome


# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0"
Expand All @@ -375,7 +349,7 @@ procSuite "Peer Manager":

# Simulate restart by initialising a new node using the same storage
let
node3 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage)
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)

await node3.mountRelay()
node3.wakuRelay.codec = stableCodec
Expand Down Expand Up @@ -403,36 +377,22 @@ procSuite "Peer Manager":

asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = parseIpAddress("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))

# Get all peer infos
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])

# Connect to relay peers
await nodes[0].peerManager.manageRelayPeers()
await nodes[0].peerManager.connectToRelayPeers()

check:
# Peerstore track all three peers
Expand All @@ -457,7 +417,7 @@ procSuite "Peer Manager":

asyncTest "Peer store keeps track of incoming connections":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them
await allFutures(nodes.mapIt(it.start()))
Expand Down Expand Up @@ -520,7 +480,7 @@ procSuite "Peer Manager":
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"

let
node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peers = toSeq(1..5)
.mapIt(
parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)
Expand Down Expand Up @@ -562,7 +522,7 @@ procSuite "Peer Manager":

asyncTest "connectedPeers() returns expected number of connections per protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
Expand Down Expand Up @@ -613,7 +573,7 @@ procSuite "Peer Manager":

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
Expand Down Expand Up @@ -839,7 +799,7 @@ procSuite "Peer Manager":

asyncTest "colocationLimit is enforced by pruneConnsByIp()":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
Expand Down
1 change: 0 additions & 1 deletion tests/test_waku_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/client,
../../waku/waku_lightpush/protocol_metrics,
../../waku/waku_lightpush/rpc,
Expand Down
8 changes: 6 additions & 2 deletions tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronos
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/switch
import
../../waku/waku_core,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush,
../../waku/node/peer_manager,
../../waku/waku_node,
./testlib/common,
./testlib/wakucore,
./testlib/wakunode

Expand Down
12 changes: 4 additions & 8 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
nat: "any",
maxConnections: 50,
clusterId: 1.uint32,
topics: @["/waku/2/rs/1/0"],
topics: @[],
relay: true
)

Expand All @@ -56,8 +55,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
clusterId: uint32 = 1.uint32,
topics: seq[string] = @["/waku/2/rs/1/0"],
clusterId: uint32 = 2.uint32,
topics: seq[string] = @["/waku/2/rs/2/0"],
peerStoreCapacity = none(int)): WakuNode =

var resolvedExtIp = extIp
Expand All @@ -67,10 +66,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000))
else: extPort

var conf = defaultTestWakuNodeConf()

conf.clusterId = clusterId
conf.topics = topics
let conf = defaultTestWakuNodeConf()

if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails
Expand Down
Loading

0 comments on commit 32668f4

Please sign in to comment.