diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 7dfa9a343e..89e9c435dc 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -105,13 +105,13 @@ procSuite "Peer Manager": node.peerManager.peerStore.peers().len == 3 node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and - it.protos.contains(WakuFilterCodec)) + it.protocols.contains(WakuFilterCodec)) node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and it.addrs.contains(swapLoc) and - it.protos.contains(WakuSwapCodec)) + it.protocols.contains(WakuSwapCodec)) node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and - it.protos.contains(WakuStoreCodec)) + it.protocols.contains(WakuStoreCodec)) await node.stop() @@ -270,7 +270,7 @@ procSuite "Peer Manager": # Currently connected to node2 node1.peerManager.peerStore.peers().len == 1 node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec)) + node1.peerManager.peerStore.peers().anyIt(it.protocols.contains(node2.wakuRelay.codec)) node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage @@ -286,7 +286,7 @@ procSuite "Peer Manager": # Node2 has been loaded after "restart", but we have not yet reconnected node3.peerManager.peerStore.peers().len == 1 node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec)) + node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec)) node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.start() # This should trigger a reconnect @@ -295,8 +295,8 @@ procSuite "Peer Manager": # Reconnected to node2 after "restart" node3.peerManager.peerStore.peers().len == 1 node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec)) - node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec)) + node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec)) + node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(stableCodec)) node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) diff --git a/tests/v2/test_peer_storage.nim b/tests/v2/test_peer_storage.nim index eeef11da86..2ad57497cf 100644 --- a/tests/v2/test_peer_storage.nim +++ b/tests/v2/test_peer_storage.nim @@ -22,64 +22,86 @@ suite "Peer Storage": peerKey = generateEcdsaKey() peer = PeerInfo.new(peerKey, @[peerLoc]) peerProto = "/waku/2/default-waku/codec" - stored = StoredInfo(peerId: peer.peerId, addrs: @[peerLoc], protos: @[peerProto], publicKey: peerKey.getPublicKey().tryGet()) - conn = Connectedness.CanConnect + connectedness = Connectedness.CanConnect disconn = 999999 + stored = RemotePeerInfo( + peerId: peer.peerId, + addrs: @[peerLoc], + protocols: @[peerProto], + publicKey: peerKey.getPublicKey().tryGet(), + connectedness: connectedness, + disconnectTime: disconn) defer: storage.close() # Test insert and retrieve - discard storage.put(peer.peerId, stored, conn, disconn) + require storage.put(peer.peerId, stored, connectedness, disconn).isOk var responseCount = 0 - # flags to check data matches what was stored (default true) - var peerIdFlag, storedInfoFlag, connectednessFlag, disconnectFlag: bool - proc data(peerId: PeerID, storedInfo: StoredInfo, + # Fetched variables from callback + var resPeerId: PeerId + var resStoredInfo: RemotePeerInfo + var resConnectedness: Connectedness + var resDisconnect: int64 + + proc data(peerId: PeerID, storedInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} = responseCount += 1 # Note: cannot use `check` within `{.raises: [Defect].}` block # @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception # These flags are checked outside this block. - peerIdFlag = peerId == peer.peerId - storedInfoFlag = storedInfo == stored - connectednessFlag = connectedness == conn - disconnectFlag = disconnectTime == disconn + resPeerId = peerId + resStoredInfo = storedInfo + resConnectedness = connectedness + resDisconnect = disconnectTime let res = storage.getAll(data) check: res.isErr == false responseCount == 1 - peerIdFlag - storedInfoFlag - connectednessFlag - disconnectFlag + resPeerId == peer.peerId + resStoredInfo.peerId == peer.peerId + resStoredInfo.addrs == @[peerLoc] + resStoredInfo.protocols == @[peerProto] + resStoredInfo.publicKey == peerKey.getPublicKey().tryGet() + # TODO: For compatibility, we don't store connectedness and disconnectTime + #resStoredInfo.connectedness == connectedness + #resStoredInfo.disconnectTime == disconn + resConnectedness == Connectedness.CanConnect + resDisconnect == disconn # Test replace and retrieve (update an existing entry) - discard storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10) + require storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10).isOk responseCount = 0 - proc replacedData(peerId: PeerID, storedInfo: StoredInfo, + proc replacedData(peerId: PeerID, storedInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} = responseCount += 1 # Note: cannot use `check` within `{.raises: [Defect].}` block # @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception # These flags are checked outside this block. - peerIdFlag = peerId == peer.peerId - storedInfoFlag = storedInfo == stored - connectednessFlag = connectedness == CannotConnect - disconnectFlag = disconnectTime == disconn + 10 + resPeerId = peerId + resStoredInfo = storedInfo + resConnectedness = connectedness + resDisconnect = disconnectTime let repRes = storage.getAll(replacedData) check: repRes.isErr == false responseCount == 1 - peerIdFlag - storedInfoFlag - connectednessFlag - disconnectFlag + resPeerId == peer.peerId + resStoredInfo.peerId == peer.peerId + resStoredInfo.addrs == @[peerLoc] + resStoredInfo.protocols == @[peerProto] + resStoredInfo.publicKey == peerKey.getPublicKey().tryGet() + # TODO: For compatibility, we don't store connectedness and disconnectTime + #resStoredInfo.connectedness == connectedness + #resStoredInfo.disconnectTime == disconn + resConnectedness == Connectedness.CannotConnect + resDisconnect == disconn + 10 diff --git a/tests/v2/test_peer_store_extended.nim b/tests/v2/test_peer_store_extended.nim index 3307a83a70..39a3acd39b 100644 --- a/tests/v2/test_peer_store_extended.nim +++ b/tests/v2/test_peer_store_extended.nim @@ -103,37 +103,37 @@ suite "Extended nim-libp2p Peer Store": test "get() returns the correct StoredInfo for a given PeerId": # When - let storedInfoPeer1 = peerStore.get(p1) - let storedInfoPeer6 = peerStore.get(p6) + let peer1 = peerStore.get(p1) + let peer6 = peerStore.get(p6) # Then check: # regression on nim-libp2p fields - storedInfoPeer1.peerId == p1 - storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] - storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] - storedInfoPeer1.agent == "nwaku" - storedInfoPeer1.protoVersion == "protoVersion1" + peer1.peerId == p1 + peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] + peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] + peer1.agent == "nwaku" + peer1.protoVersion == "protoVersion1" # our extended fields - storedInfoPeer1.connectedness == Connected - storedInfoPeer1.disconnectTime == 0 - storedInfoPeer1.origin == Discv5 - storedInfoPeer1.numberFailedConn == 1 - storedInfoPeer1.lastFailedConn == Moment.init(1001, Second) + peer1.connectedness == Connected + peer1.disconnectTime == 0 + peer1.origin == Discv5 + peer1.numberFailedConn == 1 + peer1.lastFailedConn == Moment.init(1001, Second) check: # fields are empty, not part of the peerstore - storedInfoPeer6.peerId == p6 - storedInfoPeer6.addrs.len == 0 - storedInfoPeer6.protos.len == 0 - storedInfoPeer6.agent == default(string) - storedInfoPeer6.protoVersion == default(string) - storedInfoPeer6.connectedness == default(Connectedness) - storedInfoPeer6.disconnectTime == default(int) - storedInfoPeer6.origin == default(PeerOrigin) - storedInfoPeer6.numberFailedConn == default(int) - storedInfoPeer6.lastFailedConn == default(Moment) + peer6.peerId == p6 + peer6.addrs.len == 0 + peer6.protocols.len == 0 + peer6.agent == default(string) + peer6.protoVersion == default(string) + peer6.connectedness == default(Connectedness) + peer6.disconnectTime == default(int) + peer6.origin == default(PeerOrigin) + peer6.numberFailedConn == default(int) + peer6.lastFailedConn == default(Moment) test "peers() returns all StoredInfo of the PeerStore": # When @@ -153,7 +153,7 @@ suite "Extended nim-libp2p Peer Store": check: # regression on nim-libp2p fields p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()] - p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] + p3.protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] p3.agent == "gowaku" p3.protoVersion == "protoVersion3" @@ -180,7 +180,7 @@ suite "Extended nim-libp2p Peer Store": # Only p3 supports that protocol lpPeers.len == 1 lpPeers.anyIt(it.peerId == p3) - lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] + lpPeers[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] test "peers() returns all StoredInfo matching a given protocolMatcher": # When @@ -197,28 +197,25 @@ suite "Extended nim-libp2p Peer Store": pMatcherStorePeers.anyIt(it.peerId == p5) check: - pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] - pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"] - pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] - pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] + pMatcherStorePeers.filterIt(it.peerId == p1)[0].protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] + pMatcherStorePeers.filterIt(it.peerId == p2)[0].protocols == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"] + pMatcherStorePeers.filterIt(it.peerId == p3)[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] + pMatcherStorePeers.filterIt(it.peerId == p5)[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] check: pMatcherSwapPeers.len == 1 pMatcherSwapPeers.anyIt(it.peerId == p5) - pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] + pMatcherSwapPeers[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo": # Given - let storedInfoPeer1 = peerStore.get(p1) - - # When - let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo() + let peer1 = peerStore.get(p1) # Then check: - remotePeerInfo1.peerId == p1 - remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] - remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] + peer1.peerId == p1 + peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] + peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] test "connectedness() returns the connection status of a given PeerId": check: diff --git a/waku/v2/node/jsonrpc/admin/handlers.nim b/waku/v2/node/jsonrpc/admin/handlers.nim index 7bd0d0ef42..aaa2c910c9 100644 --- a/waku/v2/node/jsonrpc/admin/handlers.nim +++ b/waku/v2/node/jsonrpc/admin/handlers.nim @@ -38,13 +38,6 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = return "" constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId) -proc constructMultiaddrStr*(storedInfo: StoredInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity - if storedInfo.addrs.len == 0: - return "" - constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId) - - proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool: diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index a9b78acfac..c354ca12a7 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -78,11 +78,11 @@ proc protocolMatcher*(codec: string): Matcher = proc insertOrReplace(ps: PeerStorage, peerId: PeerID, - storedInfo: StoredInfo, + remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64 = 0) = # Insert peer entry into persistent storage, or replace existing entry with updated info - let res = ps.put(peerId, storedInfo, connectedness, disconnectTime) + let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime) if res.isErr: warn "failed to store peers", err = res.error waku_peers_errors.inc(labelValues = ["storage_failure"]) @@ -137,24 +137,24 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, proc loadFromStorage(pm: PeerManager) = debug "loading peers from storage" # Load peers from storage, if available - proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) = - trace "loading peer", peerId= $peerId, storedInfo= $storedInfo, connectedness=connectedness + proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) = + trace "loading peer", peerId=peerId, connectedness=connectedness if peerId == pm.switch.peerInfo.peerId: # Do not manage self return # nim-libp2p books - pm.peerStore[AddressBook][peerId] = storedInfo.addrs - pm.peerStore[ProtoBook][peerId] = storedInfo.protos - pm.peerStore[KeyBook][peerId] = storedInfo.publicKey - pm.peerStore[AgentBook][peerId] = storedInfo.agent - pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion + pm.peerStore[AddressBook][peerId] = remotePeerInfo.addrs + pm.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols + pm.peerStore[KeyBook][peerId] = remotePeerInfo.publicKey + pm.peerStore[AgentBook][peerId] = remotePeerInfo.agent + pm.peerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion # custom books pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state pm.peerStore[DisconnectBook][peerId] = disconnectTime - pm.peerStore[SourceBook][peerId] = storedInfo.origin + pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin let res = pm.storage.getAll(onData) if res.isErr: @@ -461,7 +461,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned if peers.len > 0: debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto - return some(peers[0].toRemotePeerInfo()) + return some(peers[0]) debug "No peer found for protocol", protocol=proto return none(RemotePeerInfo) @@ -473,7 +473,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = # If not slotted, we select a random peer for the given protocol if peers.len > 0: debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto - return some(peers[0].toRemotePeerInfo()) + return some(peers[0]) debug "No peer found for protocol", protocol=proto return none(RemotePeerInfo) diff --git a/waku/v2/node/peer_manager/peer_store/peer_storage.nim b/waku/v2/node/peer_manager/peer_store/peer_storage.nim index 25ad660367..d41c959fe2 100644 --- a/waku/v2/node/peer_manager/peer_store/peer_storage.nim +++ b/waku/v2/node/peer_manager/peer_store/peer_storage.nim @@ -7,24 +7,25 @@ else: import stew/results import - ../waku_peer_store + ../waku_peer_store, + ../../../utils/peers ## This module defines a peer storage interface. Implementations of ## PeerStorage are used to store and retrieve peers type PeerStorage* = ref object of RootObj - + PeerStorageResult*[T] = Result[T, string] - DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo, + DataProc* = proc(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].} # PeerStorage interface method put*(db: PeerStorage, peerId: PeerID, - storedInfo: StoredInfo, + remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64): PeerStorageResult[void] {.base.} = discard -method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard \ No newline at end of file +method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard diff --git a/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim b/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim index 4515150cf8..7abe05c5f2 100644 --- a/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim +++ b/waku/v2/node/peer_manager/peer_store/waku_peer_storage.nim @@ -5,13 +5,14 @@ else: import - std/sets, + std/sets, stew/results, sqlite3_abi, libp2p/protobuf/minprotobuf import ../../../../common/sqlite, ../waku_peer_store, + ../../../utils/peers, ./peer_storage export sqlite @@ -25,11 +26,11 @@ type # Protobuf Serialisation # ########################## -proc init*(T: type StoredInfo, buffer: seq[byte]): ProtoResult[T] = +proc init*(T: type RemotePeerInfo, buffer: seq[byte]): ProtoResult[T] = var multiaddrSeq: seq[MultiAddress] protoSeq: seq[string] - storedInfo = StoredInfo() + storedInfo = RemotePeerInfo() var pb = initProtoBuffer(buffer) @@ -37,25 +38,27 @@ proc init*(T: type StoredInfo, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(2, multiaddrSeq) discard ? pb.getRepeatedField(3, protoSeq) discard ? pb.getField(4, storedInfo.publicKey) - + + # TODO: Store the rest of parameters such as connectedness and disconnectTime + storedInfo.addrs = multiaddrSeq - storedInfo.protos = protoSeq + storedInfo.protocols = protoSeq ok(storedInfo) -proc encode*(storedInfo: StoredInfo): PeerStorageResult[ProtoBuffer] = +proc encode*(remotePeerInfo: RemotePeerInfo): PeerStorageResult[ProtoBuffer] = var pb = initProtoBuffer() - pb.write(1, storedInfo.peerId) - - for multiaddr in storedInfo.addrs.items: + pb.write(1, remotePeerInfo.peerId) + + for multiaddr in remotePeerInfo.addrs.items: pb.write(2, multiaddr) - - for proto in storedInfo.protos.items: + + for proto in remotePeerInfo.protocols.items: pb.write(3, proto) - + try: - pb.write(4, storedInfo.publicKey) + pb.write(4, remotePeerInfo.publicKey) except ResultError[CryptoError] as e: return err("Failed to encode public key") @@ -69,13 +72,15 @@ proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] = ## Misconfiguration can lead to nil DB if db.isNil(): return err("db not initialized") - + ## Create the "Peer" table ## It contains: ## - peer id as primary key, stored as a blob ## - stored info (serialised protobuf), stored as a blob ## - last known enumerated connectedness state, stored as an integer ## - disconnect time in epoch seconds, if applicable + + # TODO: connectedness and disconnectTime are now stored in the storedInfo type let createStmt = db.prepareStmt(""" CREATE TABLE IF NOT EXISTS Peer ( @@ -102,19 +107,19 @@ proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] = ).expect("this is a valid statement") ## General initialization - + ok(WakuPeerStorage(database: db, replaceStmt: replaceStmt)) method put*(db: WakuPeerStorage, peerId: PeerID, - storedInfo: StoredInfo, + remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64): PeerStorageResult[void] = ## Adds a peer to storage or replaces existing entry if it already exists - let encoded = storedInfo.encode() + let encoded = remotePeerInfo.encode() if encoded.isErr: return err("failed to encode: " & encoded.error()) @@ -129,7 +134,7 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR ## Retrieves all peers from storage var gotPeers = false - proc peer(s: ptr sqlite3_stmt) {.raises: [Defect, LPError, ResultError[ProtoError]].} = + proc peer(s: ptr sqlite3_stmt) {.raises: [Defect, LPError, ResultError[ProtoError]].} = gotPeers = true let # Peer ID @@ -139,7 +144,7 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR # Stored Info sTo = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1)) sToL = sqlite3_column_bytes(s, 1) - storedInfo = StoredInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet() + storedInfo = RemotePeerInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet() # Connectedness connectedness = Connectedness(sqlite3_column_int(s, 2)) # DisconnectTime @@ -152,13 +157,13 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR queryResult = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer) except LPError, ResultError[ProtoError]: return err("failed to extract peer from query result") - + if queryResult.isErr: return err("failed") ok gotPeers -proc close*(db: WakuPeerStorage) = +proc close*(db: WakuPeerStorage) = ## Closes the database. db.replaceStmt.dispose() - db.database.close() \ No newline at end of file + db.database.close() diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index 460543c00e..d1f575f949 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -6,6 +6,7 @@ else: import std/[tables, sequtils, sets, options, times, math], chronos, + eth/p2p/discoveryv5/enr, libp2p/builders, libp2p/peerstore @@ -15,26 +16,6 @@ import export peerstore, builders type - Connectedness* = enum - # NotConnected: default state for a new peer. No connection and no further information on connectedness. - NotConnected, - # CannotConnect: attempted to connect to peer, but failed. - CannotConnect, - # CanConnect: was recently connected to peer and disconnected gracefully. - CanConnect, - # Connected: actively connected to peer. - Connected - - PeerOrigin* = enum - UnknownOrigin, - Discv5, - Static, - Dns - - PeerDirection* = enum - UnknownDirection, - Inbound, - Outbound # Keeps track of the Connectedness state of a peer ConnectionBook* = ref object of PeerBook[Connectedness] @@ -54,22 +35,8 @@ type # Direction DirectionBook* = ref object of PeerBook[PeerDirection] - StoredInfo* = object - # Taken from nim-libp2 - peerId*: PeerId - addrs*: seq[MultiAddress] - protos*: seq[string] - publicKey*: PublicKey - agent*: string - protoVersion*: string - - # Extended custom fields - connectedness*: Connectedness - disconnectTime*: int64 - origin*: PeerOrigin - direction*: PeerDirection - lastFailedConn*: Moment - numberFailedConn*: int + # ENR Book + ENRBook* = ref object of PeerBook[enr.Record] ################## # Peer Store API # @@ -103,16 +70,16 @@ proc delete*(peerStore: PeerStore, peerStore.del(peerId) proc get*(peerStore: PeerStore, - peerId: PeerID): StoredInfo = + peerId: PeerID): RemotePeerInfo = ## Get the stored information of a given peer. - StoredInfo( - # Taken from nim-libp2 + RemotePeerInfo( peerId: peerId, addrs: peerStore[AddressBook][peerId], - protos: peerStore[ProtoBook][peerId], - publicKey: peerStore[KeyBook][peerId], + enr: if peerStore[ENRBook][peerId] != default(enr.Record): some(peerStore[ENRBook][peerId]) else: none(enr.Record), + protocols: peerStore[ProtoBook][peerId], agent: peerStore[AgentBook][peerId], protoVersion: peerStore[ProtoVersionBook][peerId], + publicKey: peerStore[KeyBook][peerId], # Extended custom fields connectedness: peerStore[ConnectionBook][peerId], @@ -124,7 +91,7 @@ proc get*(peerStore: PeerStore, ) # TODO: Rename peers() to getPeersByProtocol() -proc peers*(peerStore: PeerStore): seq[StoredInfo] = +proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = ## Get all the stored information of every peer. let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()), toSeq(peerStore[ProtoBook].book.keys()), @@ -132,19 +99,13 @@ proc peers*(peerStore: PeerStore): seq[StoredInfo] = return allKeys.mapIt(peerStore.get(it)) -proc peers*(peerStore: PeerStore, proto: string): seq[StoredInfo] = +proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = # Return the known info for all peers registered on the specified protocol - peerStore.peers.filterIt(it.protos.contains(proto)) + peerStore.peers.filterIt(it.protocols.contains(proto)) -proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[StoredInfo] = +proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] = # Return the known info for all peers matching the provided protocolMatcher - peerStore.peers.filterIt(it.protos.anyIt(protocolMatcher(it))) - -proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo = - RemotePeerInfo.init(peerId = storedInfo.peerId, - addrs = toSeq(storedInfo.addrs), - protocols = toSeq(storedInfo.protos)) - + peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it))) proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness = # Return the connection state of the given, managed peer @@ -159,7 +120,7 @@ proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool = proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol # TODO: What if peer does not exist in the peerStore? - peerStore.get(peerId).protos.contains(proto) + peerStore.get(peerId).protocols.contains(proto) proc hasPeers*(peerStore: PeerStore, proto: string): bool = # Returns `true` if the peerstore has any peer for the specified protocol @@ -169,14 +130,14 @@ proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool = # Returns `true` if the peerstore has any peer matching the protocolMatcher toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) -proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[StoredInfo] = +proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.direction == direction) -proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = +proc getNotConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.connectedness != Connected) -proc getConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = +proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.connectedness == Connected) -proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] = - return peerStore.peers.filterIt(it.protos.contains(proto)) +proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.protocols.contains(proto)) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 91d2aeb1b6..7b81d7ccaa 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -585,7 +585,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C let remotePeer = when peer is string: parseRemotePeerInfo(peer) else: peer - info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer + info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # TODO: Move this logic to wakunode2 app @@ -612,7 +612,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: let remotePeer = when peer is string: parseRemotePeerInfo(peer) else: peer - info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) if unsubRes.isOk(): @@ -854,7 +854,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe if node.wakuLightpushClient.isNil(): return err("waku lightpush client is nil") - debug "publishing message with lightpush", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, peer=peer + debug "publishing message with lightpush", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, peer=peer.peerId return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) @@ -980,7 +980,6 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = # First get a list of connected peer infos let peers = node.peerManager.peerStore.peers() .filterIt(it.connectedness == Connected) - .mapIt(it.toRemotePeerInfo()) for peer in peers: try: diff --git a/waku/v2/utils/peers.nim b/waku/v2/utils/peers.nim index 0c233a6e0b..ac4ab52593 100644 --- a/waku/v2/utils/peers.nim +++ b/waku/v2/utils/peers.nim @@ -5,7 +5,8 @@ else: # Collection of utilities related to Waku peers import - std/[options, sequtils, strutils], + std/[options, sequtils, strutils, times], + chronos, stew/results, stew/shims/net, eth/keys, @@ -18,6 +19,32 @@ import peerinfo, routing_record] +#import +# ../node/peer_manager/waku_peer_store +# todo organize this + +type + Connectedness* = enum + # NotConnected: default state for a new peer. No connection and no further information on connectedness. + NotConnected, + # CannotConnect: attempted to connect to peer, but failed. + CannotConnect, + # CanConnect: was recently connected to peer and disconnected gracefully. + CanConnect, + # Connected: actively connected to peer. + Connected + + PeerOrigin* = enum + UnknownOrigin, + Discv5, + Static, + Dns + + PeerDirection* = enum + UnknownDirection, + Inbound, + Outbound + type RemotePeerInfo* = ref object of RootObj peerId*: PeerID @@ -25,6 +52,16 @@ type enr*: Option[enr.Record] protocols*: seq[string] + agent*: string + protoVersion*: string + publicKey*: crypto.PublicKey + connectedness*: Connectedness + disconnectTime*: int64 + origin*: PeerOrigin + direction*: PeerDirection + lastFailedConn*: Moment + numberFailedConn*: int + func `$`*(remotePeerInfo: RemotePeerInfo): string = $remotePeerInfo.peerId