From b5e550935ec1057088cf79a2673e3445bcce74c6 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 20 Dec 2023 15:23:41 +0100 Subject: [PATCH] fix: Revert "feat: shard aware peer management (#2151)" (#2312) This reverts commit dba9820c1fa00f414f18d57f7a3ff38b67d2bb1a. We need to revert this commit because the waku-simulator stopped working. i.e. the nodes couldn't establish connections among them: https://github.com/waku-org/waku-simulator/tree/054ba9e33f4fdcdb590bcfe760a5254069c5cb9f Also, the following js-waku test fails due to this commit: "same cluster, different shard: nodes connect" * waku_lightpush/protocol.nim: minor changes to make it compile after revert --- apps/chat2/chat2.nim | 6 +- apps/wakunode2/app.nim | 2 +- tests/test_peer_manager.nim | 118 +++---- tests/test_waku_lightpush.nim | 1 - tests/test_wakunode_lightpush.nim | 8 +- tests/testlib/wakunode.nim | 12 +- tests/wakunode_rest/test_rest_lightpush.nim | 3 +- waku/node/peer_manager/peer_manager.nim | 323 ++++++++------------ waku/node/peer_manager/waku_peer_store.nim | 21 +- waku/node/waku_node.nim | 3 +- waku/waku_api/rest/admin/handlers.nim | 2 +- waku/waku_api/rest/lightpush/handlers.nim | 4 +- waku/waku_core/peers.nim | 15 +- waku/waku_lightpush/client.nim | 2 +- waku/waku_lightpush/common.nim | 21 -- waku/waku_lightpush/protocol.nim | 23 +- waku/waku_metadata/protocol.nim | 2 +- 17 files changed, 219 insertions(+), 347 deletions(-) delete mode 100644 waku/waku_lightpush/common.nim diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 1f290c8f67..828e7fbbd1 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -9,9 +9,10 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[strformat, strutils, times, options, random] +import std/[strformat, strutils, times, json, options, random] import confutils, chronicles, chronos, stew/shims/net as stewNet, eth/keys, bearssl, stew/[byteutils, results], + nimcrypto/pbkdf2, metrics, metrics/chronos_httpserver import libp2p/[switch, # manage transports, a single entry point for dialing and listening @@ -21,10 +22,11 @@ import libp2p/[switch, # manage transports, a single entry poi peerinfo, # manage the information of a peer, such as peer ID and public / private key peerid, # Implement how peers interact protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs + protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS nameresolving/dnsresolver]# define DNS resolution import ../../waku/waku_core, - ../../waku/waku_lightpush/common, + ../../waku/waku_lightpush, ../../waku/waku_lightpush/rpc, ../../waku/waku_filter, ../../waku/waku_enr, diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index c5f562f742..ec3e8cc658 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -53,7 +53,7 @@ import ../../waku/waku_peer_exchange, ../../waku/waku_rln_relay, ../../waku/waku_store, - ../../waku/waku_lightpush/common, + ../../waku/waku_lightpush, ../../waku/waku_filter, ../../waku/waku_filter_v2, ./wakunode2_validator_signed, diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 4f7a6598fb..8576f2725a 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils, times, sugar], + std/[options, sequtils, times], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -21,12 +21,10 @@ import ../../waku/node/peer_manager/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../waku/waku_node, - ../../waku/waku_core, - ../../waku/waku_enr/capabilities, - ../../waku/waku_relay/protocol, - ../../waku/waku_store/common, - ../../waku/waku_filter/protocol, - ../../waku/waku_lightpush/common, + ../../waku/waku_relay, + ../../waku/waku_store, + ../../waku/waku_filter, + ../../waku/waku_lightpush, ../../waku/waku_peer_exchange, ../../waku/waku_metadata, ./testlib/common, @@ -37,7 +35,7 @@ import procSuite "Peer Manager": asyncTest "connectRelay() works": # Create 2 nodes - let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) await allFutures(nodes.mapIt(it.start())) let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo()) @@ -50,7 +48,7 @@ procSuite "Peer Manager": asyncTest "dialPeer() works": # Create 2 nodes - let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) @@ -78,7 +76,7 @@ procSuite "Peer Manager": asyncTest "dialPeer() fails gracefully": # Create 2 nodes and start them - let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) @@ -101,7 +99,7 @@ procSuite "Peer Manager": asyncTest "Adding, selecting and filtering peers work": let - node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) # Create filter peer filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() @@ -130,9 +128,10 @@ procSuite "Peer Manager": await node.stop() + asyncTest "Peer manager keeps track of connections": # Create 2 nodes - let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) @@ -176,7 +175,7 @@ procSuite "Peer Manager": asyncTest "Peer manager updates failed peers correctly": # Create 2 nodes - let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) @@ -226,34 +225,18 @@ procSuite "Peer Manager": let database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] - node1 = newTestWakuNode( - generateSecp256k1Key(), - parseIpAddress("127.0.0.1"), - Port(44048), - peerStorage = storage - ) - node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(34023)) - - node1.mountMetadata(0).expect("Mounted Waku Metadata") - node2.mountMetadata(0).expect("Mounted Waku Metadata") + node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) + node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) + peerInfo2 = node2.switch.peerInfo await node1.start() await node2.start() await node1.mountRelay() await node2.mountRelay() - - let peerInfo2 = node2.switch.peerInfo - var remotePeerInfo2 = peerInfo2.toRemotePeerInfo() - remotePeerInfo2.enr = some(node2.enr) - - let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2) - assert is12Connected == true, "Node 1 and 2 not connected" - check: - node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs - - # wait for the peer store update + require: + (await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true await sleepAsync(chronos.milliseconds(500)) check: @@ -263,17 +246,10 @@ procSuite "Peer Manager": node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage - let node3 = newTestWakuNode( - generateSecp256k1Key(), - parseIpAddress("127.0.0.1"), - Port(56037), - peerStorage = storage - ) - - node3.mountMetadata(0).expect("Mounted Waku Metadata") + let + node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) await node3.start() - check: # Node2 has been loaded after "restart", but we have not yet reconnected node3.peerManager.peerStore.peers().len == 1 @@ -281,10 +257,7 @@ procSuite "Peer Manager": node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() - - await node3.peerManager.manageRelayPeers() - - await sleepAsync(chronos.milliseconds(500)) + await node3.peerManager.connectToRelayPeers() check: # Reconnected to node2 after "restart" @@ -302,7 +275,7 @@ procSuite "Peer Manager": # different network node1 = newTestWakuNode( generateSecp256k1Key(), - parseIpAddress("0.0.0.0"), + ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId3, topics = @["/waku/2/rs/3/0"], @@ -311,22 +284,22 @@ procSuite "Peer Manager": # same network node2 = newTestWakuNode( generateSecp256k1Key(), - parseIpAddress("0.0.0.0"), + ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId4, topics = @["/waku/2/rs/4/0"], ) node3 = newTestWakuNode( generateSecp256k1Key(), - parseIpAddress("0.0.0.0"), + ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId4, topics = @["/waku/2/rs/4/0"], ) - node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata") - node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata") - node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata") + discard node1.mountMetadata(clusterId3) + discard node2.mountMetadata(clusterId4) + discard node3.mountMetadata(clusterId4) # Start nodes await allFutures([node1.start(), node2.start(), node3.start()]) @@ -345,13 +318,14 @@ procSuite "Peer Manager": conn2.isNone conn3.isSome + # TODO: nwaku/issues/1377 xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": let database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] - node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage) - node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) + node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) peerInfo2 = node2.switch.peerInfo betaCodec = "/vac/waku/relay/2.0.0-beta2" stableCodec = "/vac/waku/relay/2.0.0" @@ -375,7 +349,7 @@ procSuite "Peer Manager": # Simulate restart by initialising a new node using the same storage let - node3 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), peerStorage = storage) + node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) await node3.mountRelay() node3.wakuRelay.codec = stableCodec @@ -403,28 +377,14 @@ procSuite "Peer Manager": asyncTest "Peer manager connects to all peers supporting a given protocol": # Create 4 nodes - let nodes = - toSeq(0..<4) - .mapIt( - newTestWakuNode( - nodeKey = generateSecp256k1Key(), - bindIp = parseIpAddress("0.0.0.0"), - bindPort = Port(0), - wakuFlags = some(CapabilitiesBitfield.init(@[Relay])) - ) - ) + let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) # Start them - discard nodes.mapIt(it.mountMetadata(0)) - await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) # Get all peer infos - let peerInfos = collect: - for i in 0..nodes.high: - let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo() - peerInfo.enr = some(nodes[i].enr) - peerInfo + let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) # Add all peers (but self) to node 0 nodes[0].peerManager.addPeer(peerInfos[1]) @@ -432,7 +392,7 @@ procSuite "Peer Manager": nodes[0].peerManager.addPeer(peerInfos[3]) # Connect to relay peers - await nodes[0].peerManager.manageRelayPeers() + await nodes[0].peerManager.connectToRelayPeers() check: # Peerstore track all three peers @@ -457,7 +417,7 @@ procSuite "Peer Manager": asyncTest "Peer store keeps track of incoming connections": # Create 4 nodes - let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) # Start them await allFutures(nodes.mapIt(it.start())) @@ -520,7 +480,7 @@ procSuite "Peer Manager": let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D" let - node = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) peers = toSeq(1..5) .mapIt( parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it) @@ -562,7 +522,7 @@ procSuite "Peer Manager": asyncTest "connectedPeers() returns expected number of connections per protocol": # Create 4 nodes - let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) # Start them with relay + filter await allFutures(nodes.mapIt(it.start())) @@ -613,7 +573,7 @@ procSuite "Peer Manager": asyncTest "getNumStreams() returns expected number of connections per protocol": # Create 2 nodes - let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) # Start them with relay + filter await allFutures(nodes.mapIt(it.start())) @@ -839,7 +799,7 @@ procSuite "Peer Manager": asyncTest "colocationLimit is enforced by pruneConnsByIp()": # Create 5 nodes - let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) + let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) # Start them with relay + filter await allFutures(nodes.mapIt(it.start())) diff --git a/tests/test_waku_lightpush.nim b/tests/test_waku_lightpush.nim index 76312272ff..de125ac974 100644 --- a/tests/test_waku_lightpush.nim +++ b/tests/test_waku_lightpush.nim @@ -11,7 +11,6 @@ import ../../waku/node/peer_manager, ../../waku/waku_core, ../../waku/waku_lightpush, - ../../waku/waku_lightpush/common, ../../waku/waku_lightpush/client, ../../waku/waku_lightpush/protocol_metrics, ../../waku/waku_lightpush/rpc, diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim index 95a8ce874a..6e986fa1a1 100644 --- a/tests/test_wakunode_lightpush.nim +++ b/tests/test_wakunode_lightpush.nim @@ -4,12 +4,16 @@ import std/options, stew/shims/net as stewNet, testutils/unittests, - chronos + chronicles, + chronos, + libp2p/crypto/crypto, + libp2p/switch import ../../waku/waku_core, - ../../waku/waku_lightpush/common, + ../../waku/waku_lightpush, ../../waku/node/peer_manager, ../../waku/waku_node, + ./testlib/common, ./testlib/wakucore, ./testlib/wakunode diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 4a3179bcab..274e30efc4 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -32,8 +32,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf = dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], nat: "any", maxConnections: 50, - clusterId: 1.uint32, - topics: @["/waku/2/rs/1/0"], + topics: @[], relay: true ) @@ -56,8 +55,8 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, dns4DomainName = none(string), discv5UdpPort = none(Port), agentString = none(string), - clusterId: uint32 = 1.uint32, - topics: seq[string] = @["/waku/2/rs/1/0"], + clusterId: uint32 = 2.uint32, + topics: seq[string] = @["/waku/2/rs/2/0"], peerStoreCapacity = none(int)): WakuNode = var resolvedExtIp = extIp @@ -67,10 +66,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, if (extIp.isSome() or dns4DomainName.isSome()) and extPort.isNone(): some(Port(60000)) else: extPort - var conf = defaultTestWakuNodeConf() - - conf.clusterId = clusterId - conf.topics = topics + let conf = defaultTestWakuNodeConf() if dns4DomainName.isSome() and extIp.isNone(): # If there's an error resolving the IP, an exception is thrown and test fails diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index 876be7c642..edeeed02ba 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -10,10 +10,11 @@ import import ../../waku/waku_api/message_cache, + ../../waku/common/base64, ../../waku/waku_core, ../../waku/waku_node, ../../waku/node/peer_manager, - ../../waku/waku_lightpush/common, + ../../waku/waku_lightpush, ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/client, ../../waku/waku_api/rest/responses, diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 0edddad7b5..ef28058604 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -18,7 +18,6 @@ import ../../waku_core, ../../waku_relay, ../../waku_enr/sharding, - ../../waku_enr/capabilities, ../../waku_metadata, ./peer_store/peer_storage, ./waku_peer_store @@ -51,7 +50,7 @@ const BackoffFactor = 4 # Limit the amount of paralel dials - MaxParallelDials = 10 + MaxParalelDials = 10 # Delay between consecutive relayConnectivityLoop runs ConnectivityLoopInterval = chronos.minutes(1) @@ -117,21 +116,22 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO # Do not attempt to manage our unmanageable self return + # ...public key + var publicKey: PublicKey + discard remotePeerInfo.peerId.extractPublicKey(publicKey) + if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and - pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and + pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey and pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0: # Peer already managed and ENR info is already saved return trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs - + pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs - pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey + pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin - - if remotePeerInfo.protocols.len > 0: - pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols - + if remotePeerInfo.enr.isSome(): pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get() @@ -159,31 +159,27 @@ proc connectRelay*(pm: PeerManager, pm.addPeer(peer) let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] - trace "Connecting to relay peer", - wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts + trace "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts var deadline = sleepAsync(dialTimeout) - let workfut = pm.switch.connect(peerId, peer.addrs) - - # Can't use catch: with .withTimeout() in this case - let res = catch: await workfut or deadline - - let reasonFailed = - if not workfut.finished(): - await workfut.cancelAndWait() - "timed out" - elif res.isErr(): res.error.msg - else: + var workfut = pm.switch.connect(peerId, peer.addrs) + var reasonFailed = "" + + try: + await workfut or deadline + if workfut.finished(): if not deadline.finished(): - await deadline.cancelAndWait() - + deadline.cancel() waku_peers_dials.inc(labelValues = ["successful"]) waku_node_conns_initiated.inc(labelValues = [source]) - pm.peerStore[NumberFailedConnBook][peerId] = 0 - return true - + else: + reasonFailed = "timed out" + await cancelAndWait(workfut) + except CatchableError as exc: + reasonFailed = "remote peer failed" + # Dial failed pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1 pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) @@ -219,15 +215,15 @@ proc dialPeer(pm: PeerManager, # Dial Peer let dialFut = pm.switch.dial(peerId, addrs, proto) - - let res = catch: - if await dialFut.withTimeout(dialTimeout): + var reasonFailed = "" + try: + if (await dialFut.withTimeout(dialTimeout)): return some(dialFut.read()) - else: await cancelAndWait(dialFut) - - let reasonFailed = - if res.isOk: "timed out" - else: res.error.msg + else: + reasonFailed = "timeout" + await cancelAndWait(dialFut) + except CatchableError as exc: + reasonFailed = "failed" trace "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto @@ -298,108 +294,105 @@ proc canBeConnected*(pm: PeerManager, let now = Moment.init(getTime().toUnix, Second) let lastFailed = pm.peerStore[LastFailedConnBook][peerId] let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts) - - return now >= (lastFailed + backoff) + if now >= (lastFailed + backoff): + return true + return false ################## # Initialisation # ################## proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = - if not pm.switch.connManager.getConnections().hasKey(peerId): - return none(string) - - let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) - if conns.len == 0: - return none(string) - - let obAddr = conns[0].connection.observedAddr.valueOr: - return none(string) - - # TODO: think if circuit relay ips should be handled differently - - return some(obAddr.getHostname()) + if pm.switch.connManager.getConnections().hasKey(peerId): + let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) + if conns.len != 0: + let observedAddr = conns[0].connection.observedAddr + let ip = observedAddr.get.getHostname() + if observedAddr.isSome: + # TODO: think if circuit relay ips should be handled differently + let ip = observedAddr.get.getHostname() + return some(ip) + return none(string) # called when a connection i) is created or ii) is closed proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = case event.kind - of ConnEventKind.Connected: - #let direction = if event.incoming: Inbound else: Outbound - discard - of ConnEventKind.Disconnected: - discard - -proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = - # To prevent metadata protocol from breaking prev nodes, by now we only - # disconnect if the clusterid is specified. - if pm.wakuMetadata.clusterId == 0: - return - - let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec) - - var reason: string - block guardClauses: - let conn = res.valueOr: - reason = "dial failed: " & error.msg - break guardClauses - - let metadata = (await pm.wakuMetadata.request(conn)).valueOr: - reason = "waku metatdata request failed: " & error - break guardClauses - - let clusterId = metadata.clusterId.valueOr: - reason = "empty cluster-id reported" - break guardClauses - - if pm.wakuMetadata.clusterId != clusterId: - reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId - break guardClauses - - if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)): - reason = "no shards in common" - break guardClauses - - return - - info "disconnecting from peer", peerId=peerId, reason=reason - asyncSpawn(pm.switch.disconnect(peerId)) - pm.peerStore.delete(peerId) + of ConnEventKind.Connected: + let direction = if event.incoming: Inbound else: Outbound + discard + of ConnEventKind.Disconnected: + discard # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = - if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: - await pm.onPeerMetadata(peerId) - var direction: PeerDirection var connectedness: Connectedness - case event.kind: - of Joined: - direction = if event.initiator: Outbound else: Inbound - connectedness = Connected - - if (let ip = pm.getPeerIp(peerId); ip.isSome()): - pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) - + if event.kind == PeerEventKind.Joined: + direction = if event.initiator: Outbound else: Inbound + connectedness = Connected + + var clusterOk = false + var reason = "" + # To prevent metadata protocol from breaking prev nodes, by now we only + # disconnect if the clusterid is specified. + if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0: + block wakuMetadata: + var conn: Connection + try: + conn = await pm.switch.dial(peerId, WakuMetadataCodec) + except CatchableError: + reason = "waku metadata codec not supported: " & getCurrentExceptionMsg() + break wakuMetadata + + # request metadata from connecting peer + let metadata = (await pm.wakuMetadata.request(conn)).valueOr: + reason = "failed waku metadata codec request" + break wakuMetadata + + # does not report any clusterId + let clusterId = metadata.clusterId.valueOr: + reason = "empty clusterId reported" + break wakuMetadata + + # drop it if it doesnt match our network id + if pm.wakuMetadata.clusterId != clusterId: + reason = "different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " & $clusterId + break wakuMetadata + + # reaching here means the clusterId matches + clusterOk = true + + if not pm.wakuMetadata.isNil() and pm.wakuMetadata.clusterId != 0 and not clusterOk: + info "disconnecting from peer", peerId=peerId, reason=reason + asyncSpawn(pm.switch.disconnect(peerId)) + pm.peerStore.delete(peerId) + + # TODO: Take action depending on the supported shards as reported by metadata + + let ip = pm.getPeerIp(peerId) + if ip.isSome: + pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) + + let peersBehindIp = pm.ipTable[ip.get] + if peersBehindIp.len > pm.colocationLimit: # in theory this should always be one, but just in case - let peersBehindIp = pm.ipTable[ip.get] - - let idx = max((peersBehindIp.len - pm.colocationLimit), 0) - for peerId in peersBehindIp[0.. pm.inRelayPeersTarget: + # await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget) - for shard in pm.wakuMetadata.shards.items: - # Filter out peer not on this shard - let connectedInPeers = inPeers.filterIt( - pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard))) - - let connectedOutPeers = outPeers.filterIt( - pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard))) - - # Calculate the difference between current values and targets - let inPeerDiff = connectedInPeers.len - inTarget - let outPeerDiff = outTarget - connectedOutPeers.len - - if inPeerDiff > 0: - peersToDisconnect += inPeerDiff - - if outPeerDiff <= 0: - continue - - # Get all peers for this shard - var connectablePeers = pm.peerStore.getPeersByShard( - uint16(pm.wakuMetadata.clusterId), uint16(shard)) - - let shardCount = connectablePeers.len - - connectablePeers.keepItIf( - not pm.peerStore.isConnected(it.peerId) and - pm.canBeConnected(it.peerId)) - - let connectableCount = connectablePeers.len - - connectablePeers.keepItIf(pm.peerStore.hasCapability(it.peerId, Relay)) - - let relayCount = connectablePeers.len - - trace "Sharded Peer Management", - shard = shard, - connectable = $connectableCount & "/" & $shardCount, - relayConnectable = $relayCount & "/" & $shardCount, - relayInboundTarget = $connectedInPeers.len & "/" & $inTarget, - relayOutboundTarget = $connectedOutPeers.len & "/" & $outTarget - - let length = min(outPeerDiff, connectablePeers.len) - for peer in connectablePeers[0..= pm.outRelayPeersTarget: return - let uniquePeers = toSeq(peersToConnect).mapIt(pm.peerStore.get(it)) + let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) + let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) + let numPeersToConnect = min(outsideBackoffPeers.len, MaxParalelDials) - # Connect to all nodes - for i in countup(0, uniquePeers.len, MaxParallelDials): - let stop = min(i + MaxParallelDials, uniquePeers.len) - trace "Connecting to Peers", peerIds = $uniquePeers[i..