Skip to content

Commit

Permalink
fix(networking): properly get number of relay connections
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta committed Apr 11, 2023
1 parent 418efca commit 0352e21
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 13 deletions.
79 changes: 78 additions & 1 deletion tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ procSuite "Peer Manager":
await allFutures([node1.stop(), node2.stop(), node3.stop()])

# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
Expand Down Expand Up @@ -448,6 +448,83 @@ procSuite "Peer Manager":
# but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false

asyncTest "getNumConnections() returns expected number of connections per protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# create some connections/streams
require:
# some relay connections
(await nodes[0].peerManager.connectRelay(pInfos[1])) == true
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true

(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true

# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true


# assert physical connections
check:
nodes[0].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 0
nodes[0].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 2
nodes[0].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 2

nodes[1].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0

nodes[2].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 2
nodes[2].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
nodes[2].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[2].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 1

nodes[3].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 0
nodes[3].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true

check:
nodes[0].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
nodes[0].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
nodes[0].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 4

nodes[1].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
nodes[1].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
nodes[1].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 4
nodes[1].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 0

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
Expand Down
64 changes: 52 additions & 12 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import
chronos,
chronicles,
metrics,
libp2p/multistream
libp2p/multistream,
libp2p/muxers/muxer
import
../../protocol/waku_relay,
../../protocol/waku_store/common,
../../utils/peers,
../../utils/heartbeat,
./peer_store/peer_storage,
./waku_peer_store

Expand All @@ -22,7 +25,8 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
# TODO: Populate from PeerStore.Source when ready
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"]
declarePublicGauge waku_connected_peers, "Number of physical connections per direction and protocol", labels = ["direction", "protocol"]
declarePublicGauge waku_streams_peers, "Number of streams per direction and protocol", labels = ["direction", "protocol"]
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]

Expand Down Expand Up @@ -51,6 +55,9 @@ const
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(5)

# How often the peer store is updated with metrics
UpdateMetricsInterval = chronos.seconds(15)

type
PeerManager* = ref object of RootObj
switch*: Switch
Expand Down Expand Up @@ -252,7 +259,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
let direction = if event.initiator: Outbound else: Inbound
pm.peerStore[ConnectionBook][peerId] = Connected
pm.peerStore[DirectionBook][peerId] = direction
waku_connected_peers.inc(1, labelValues=[$direction])

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
Expand All @@ -261,7 +267,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
elif event.kind == PeerEventKind.Left:
pm.peerStore[DirectionBook][peerId] = UnknownDirection
pm.peerStore[ConnectionBook][peerId] = CanConnect
waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]])

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
Expand Down Expand Up @@ -428,27 +433,48 @@ proc connectToNodes*(pm: PeerManager,
# later.
await sleepAsync(chronos.seconds(5))

# Returns the amount of physical connections for a given direction
# containing at least one stream with the given protocol.
proc getNumConnections*(pm: PeerManager, dir: Direction, protocol: string): int =
var numConns = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if peerConn.connection.transportDir == dir:
if streams.anyIt(it.protocol == protocol):
numConns += 1
return numConns

proc getNumStreams*(pm: PeerManager, dir: Direction, protocol: string): int =
var numConns = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
for stream in peerConn.getStreams():
if stream.protocol == protocol and stream.dir == dir:
numConns += 1
return numConns

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers

# TODO: Enforce a given in/out peers ratio
let inRelayPeers = pm.getNumConnections(Direction.In, WakuRelayCodec)
let outRelayPeers = pm.getNumConnections(Direction.Out, WakuRelayCodec)
let totalRelayPeers = inRelayPeers + outRelayPeers

# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
if totalRelayPeers >= (maxConnections - 5):
return

# TODO: Track only relay connections (nwaku/issues/1566)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)

info "Relay peer connections",
connectedPeers = numConPeers,
inRelayConns = inRelayPeers,
outRelayConns = outRelayPeers,
totalRelayConns = totalRelayPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
Expand Down Expand Up @@ -530,8 +556,22 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)

proc updateMetrics(pm: PeerManager) {.async.} =
# TODO: Hardcoded due to circular dependency
let WakuFilterCodec = "/vac/waku/filter/2.0.0-beta1"
let WakuLightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"

heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
for dir in @[Direction.In, Direction.Out]:
for proto in @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec, WakuLightPushCodec]:
let protoDirConns = pm.getNumConnections(dir, proto)
let protoDirStreams = pm.getNumStreams(dir, proto)
waku_connected_peers.set(protoDirConns.float64, labelValues = [$dir, proto])
waku_streams_peers.set(protoDirStreams.float64, labelValues = [$dir, proto])

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.updateMetrics()
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()

Expand Down
29 changes: 29 additions & 0 deletions waku/v2/utils/heartbeat.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import sequtils
import chronos, chronicles

# Taken from: https://github.com/status-im/nim-libp2p/blob/master/libp2p/utils/heartbeat.nim

template heartbeat*(name: string, interval: Duration, body: untyped): untyped =
var nextHeartbeat = Moment.now()
while true:
body

nextHeartbeat += interval
let now = Moment.now()
if nextHeartbeat < now:
let
delay = now - nextHeartbeat
itv = interval
if delay > itv:
info "Missed multiple heartbeats", heartbeat = name,
delay = delay, hinterval = itv
else:
debug "Missed heartbeat", heartbeat = name,
delay = delay, hinterval = itv
nextHeartbeat = now + itv
await sleepAsync(nextHeartbeat - now)

0 comments on commit 0352e21

Please sign in to comment.