From c5c70f35a43875b214e580f1c026ecac214f22cd Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Wed, 25 Jan 2023 13:56:57 +0100 Subject: [PATCH] Fix comments --- apps/wakunode2/wakunode2.nim | 4 +- waku/v2/node/peer_manager/peer_manager.nim | 54 +++++++------------ waku/v2/node/peer_manager/waku_peer_store.nim | 7 +++ waku/v2/node/waku_node.nim | 4 +- 4 files changed, 27 insertions(+), 42 deletions(-) diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 132077de4d..4b58f59056 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -505,11 +505,9 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, if conf.keepAlive: node.startKeepalive() - asyncSpawn node.peerManager.serviceConnectivityLoop() - # Maintain relay connections if conf.relay: - asyncSpawn node.peerManager.relayConnectivityLoop() + node.peerManager.start() return ok() diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 6b651ab105..05b65740f3 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -58,8 +58,7 @@ type maxFailedAttempts*: int storage: PeerStorage serviceSlots*: Table[string, RemotePeerInfo] - relayLoopUp*: bool - serviceLoopUp*: bool + started: bool #################### # Helper functions # @@ -359,9 +358,8 @@ proc connectToNodes*(pm: PeerManager, # Ensures a healthy amount of connected relay peers proc relayConnectivityLoop*(pm: PeerManager) {.async.} = - pm.relayLoopUp = true - info "Starting relay connectivity loop" - while pm.relayLoopUp: + debug "Starting relay connectivity loop" + while pm.started: let maxConnections = pm.switch.connManager.inSema.size let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len @@ -391,52 +389,36 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = await sleepAsync(ConnectivityLoopInterval) -# Ensure we are always connected to the slotted service peers -proc serviceConnectivityLoop*(pm: PeerManager) {.async.} = - pm.serviceLoopUp = true - info "Starting service connectivity loop" - - while pm.serviceLoopUp: - if pm.serviceSlots.len == 0: - warn "No service peers configured, but service loop is running" - for serviceProto, servicePeer in pm.serviceSlots.pairs: - if pm.peerStore.connectedness(servicePeer.peerId) != Connected: - # Attempt to dial peer. Note that service peers do not respect any backoff - let conn = await pm.dialPeer(servicePeer.peerId, servicePeer.addrs, serviceProto) - if conn.isNone: - warn "Could not connect with service peer", peerId=servicePeer.peerId, proto=serviceProto - - # Log a summary of slot peers connected/notconnected - let connectedServicePeers = toSeq(pm.serviceSlots.pairs).filterIt(pm.peerStore.connectedness(it[1].peerId) == Connected) - if connectedServicePeers.len > 0: - info "Connected service peers", - servicePeers = connectedServicePeers.mapIt(it[1].addrs), - respectiveProtocols = connectedServicePeers.mapIt(it[0]) - - let notConnectedServicePeers = toSeq(pm.serviceSlots.pairs).filterIt(pm.peerStore.connectedness(it[1].peerId) != Connected) - if notConnectedServicePeers.len > 0: - info "Not connected service peers", - servicePeers = notConnectedServicePeers.mapIt(it[1].addrs), - respectiveProtocols = notConnectedServicePeers.mapIt(it[0]) - - await sleepAsync(ServicePeersInterval) - proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = + debug "Selecting peer from peerstore", protocol=proto + # Selects the best peer for a given protocol - let peers = pm.peerStore.peers().filterIt(it.protos.contains(proto)) + let peers = pm.peerStore.getPeersByProtocol(proto) # No criteria for selecting a peer for WakuRelay, random one if proto == WakuRelayCodec: # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned if peers.len > 0: + debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto return some(peers[0].toRemotePeerInfo()) + debug "No peer found for protocol", protocol=proto return none(RemotePeerInfo) # For other protocols, we select the peer that is slotted for the given protocol pm.serviceSlots.withValue(proto, serviceSlot): + debug "Got peer from service slots", peerId=serviceSlot[].peerId, multi=serviceSlot[].addrs[0], protocol=proto return some(serviceSlot[]) # If not slotted, we select a random peer for the given protocol if peers.len > 0: + debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto return some(peers[0].toRemotePeerInfo()) + debug "No peer found for protocol", protocol=proto return none(RemotePeerInfo) + +proc start*(pm: PeerManager) = + pm.started = true + asyncSpawn pm.relayConnectivityLoop() + +proc stop*(pm: PeerManager) = + pm.started = false diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index 14717366bd..d8d026ad6f 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -152,6 +152,10 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness = # TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) +proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool = + # Returns `true` if the peer is connected + peerStore.connectedness(peerId) == Connected + proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol # TODO: What if peer does not exist in the peerStore? @@ -170,3 +174,6 @@ proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[S proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = return peerStore.peers.filterIt(it.connectedness != Connected) + +proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] = + return peerStore.peers.filterIt(it.protos.contains(proto)) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 424b272b25..6d51800b93 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -1006,8 +1006,6 @@ proc stop*(node: WakuNode) {.async.} = discard await node.stopDiscv5() await node.switch.stop() - - node.peerManager.serviceLoopUp = false - node.peerManager.relayLoopUp = false + node.peerManager.stop() node.started = false