Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta committed Jan 25, 2023
1 parent 8879340 commit c5c70f3
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 42 deletions.
4 changes: 1 addition & 3 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,9 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
if conf.keepAlive:
node.startKeepalive()

asyncSpawn node.peerManager.serviceConnectivityLoop()

# Maintain relay connections
if conf.relay:
asyncSpawn node.peerManager.relayConnectivityLoop()
node.peerManager.start()

return ok()

Expand Down
54 changes: 18 additions & 36 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ type
maxFailedAttempts*: int
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
relayLoopUp*: bool
serviceLoopUp*: bool
started: bool

####################
# Helper functions #
Expand Down Expand Up @@ -359,9 +358,8 @@ proc connectToNodes*(pm: PeerManager,

# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
pm.relayLoopUp = true
info "Starting relay connectivity loop"
while pm.relayLoopUp:
debug "Starting relay connectivity loop"
while pm.started:

let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
Expand Down Expand Up @@ -391,52 +389,36 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =

await sleepAsync(ConnectivityLoopInterval)

# Ensure we are always connected to the slotted service peers
proc serviceConnectivityLoop*(pm: PeerManager) {.async.} =
pm.serviceLoopUp = true
info "Starting service connectivity loop"

while pm.serviceLoopUp:
if pm.serviceSlots.len == 0:
warn "No service peers configured, but service loop is running"
for serviceProto, servicePeer in pm.serviceSlots.pairs:
if pm.peerStore.connectedness(servicePeer.peerId) != Connected:
# Attempt to dial peer. Note that service peers do not respect any backoff
let conn = await pm.dialPeer(servicePeer.peerId, servicePeer.addrs, serviceProto)
if conn.isNone:
warn "Could not connect with service peer", peerId=servicePeer.peerId, proto=serviceProto

# Log a summary of slot peers connected/notconnected
let connectedServicePeers = toSeq(pm.serviceSlots.pairs).filterIt(pm.peerStore.connectedness(it[1].peerId) == Connected)
if connectedServicePeers.len > 0:
info "Connected service peers",
servicePeers = connectedServicePeers.mapIt(it[1].addrs),
respectiveProtocols = connectedServicePeers.mapIt(it[0])

let notConnectedServicePeers = toSeq(pm.serviceSlots.pairs).filterIt(pm.peerStore.connectedness(it[1].peerId) != Connected)
if notConnectedServicePeers.len > 0:
info "Not connected service peers",
servicePeers = notConnectedServicePeers.mapIt(it[1].addrs),
respectiveProtocols = notConnectedServicePeers.mapIt(it[0])

await sleepAsync(ServicePeersInterval)

proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "Selecting peer from peerstore", protocol=proto

# Selects the best peer for a given protocol
let peers = pm.peerStore.peers().filterIt(it.protos.contains(proto))
let peers = pm.peerStore.getPeersByProtocol(proto)

# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

# For other protocols, we select the peer that is slotted for the given protocol
pm.serviceSlots.withValue(proto, serviceSlot):
debug "Got peer from service slots", peerId=serviceSlot[].peerId, multi=serviceSlot[].addrs[0], protocol=proto
return some(serviceSlot[])

# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()

proc stop*(pm: PeerManager) =
pm.started = false
7 changes: 7 additions & 0 deletions waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)

proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
# Returns `true` if the peer is connected
peerStore.connectedness(peerId) == Connected

proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
# TODO: What if peer does not exist in the peerStore?
Expand All @@ -170,3 +174,6 @@ proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[S

proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)

proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
return peerStore.peers.filterIt(it.protos.contains(proto))
4 changes: 1 addition & 3 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1006,8 +1006,6 @@ proc stop*(node: WakuNode) {.async.} =
discard await node.stopDiscv5()

await node.switch.stop()

node.peerManager.serviceLoopUp = false
node.peerManager.relayLoopUp = false
node.peerManager.stop()

node.started = false

0 comments on commit c5c70f3

Please sign in to comment.