Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(networking): get relay number of connections from protocol conns/streams #1609

Merged
merged 4 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,83 @@ procSuite "Peer Manager":
# but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false

asyncTest "getNumConnections() returns expected number of connections per protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# create some connections/streams
require:
# some relay connections
(await nodes[0].peerManager.connectRelay(pInfos[1])) == true
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true

(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true

# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true


# assert physical connections
check:
nodes[0].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 0
nodes[0].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 2
nodes[0].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 2

nodes[1].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0

nodes[2].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 2
nodes[2].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
nodes[2].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[2].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 1

nodes[3].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 0
nodes[3].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true

check:
nodes[0].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
nodes[0].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
nodes[0].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 4

nodes[1].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
nodes[1].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
nodes[1].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 4
nodes[1].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 0

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
Expand Down
12 changes: 12 additions & 0 deletions waku/common/utils/sequence.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import std/sequtils

proc flatten*[T](a: seq[seq[T]]): seq[T] =
var aFlat = newSeq[T](0)
for subseq in a:
aFlat &= subseq
return aFlat
59 changes: 47 additions & 12 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import
chronos,
chronicles,
metrics,
libp2p/multistream
libp2p/multistream,
libp2p/muxers/muxer
import
../../protocol/waku_relay,
../../utils/peers,
../../utils/heartbeat,
./peer_store/peer_storage,
./waku_peer_store

Expand All @@ -22,7 +24,8 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
# TODO: Populate from PeerStore.Source when ready
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"]
declarePublicGauge waku_connected_peers, "Number of physical connections per direction and protocol", labels = ["direction", "protocol"]
declarePublicGauge waku_streams_peers, "Number of streams per direction and protocol", labels = ["direction", "protocol"]
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]

Expand Down Expand Up @@ -51,6 +54,9 @@ const
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(5)

# How often the peer store is updated with metrics
UpdateMetricsInterval = chronos.seconds(15)

type
PeerManager* = ref object of RootObj
switch*: Switch
Expand Down Expand Up @@ -252,7 +258,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
let direction = if event.initiator: Outbound else: Inbound
pm.peerStore[ConnectionBook][peerId] = Connected
pm.peerStore[DirectionBook][peerId] = direction
waku_connected_peers.inc(1, labelValues=[$direction])

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
Expand All @@ -261,7 +266,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
elif event.kind == PeerEventKind.Left:
pm.peerStore[DirectionBook][peerId] = UnknownDirection
pm.peerStore[ConnectionBook][peerId] = CanConnect
waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]])

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
Expand Down Expand Up @@ -428,27 +432,48 @@ proc connectToNodes*(pm: PeerManager,
# later.
await sleepAsync(chronos.seconds(5))

# Returns the amount of physical connections for a given direction
# containing at least one stream with the given protocol.
proc getNumConnections*(pm: PeerManager, dir: Direction, protocol: string): int =
var numConns = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if peerConn.connection.transportDir == dir:
if streams.anyIt(it.protocol == protocol):
numConns += 1
return numConns

proc getNumStreams*(pm: PeerManager, dir: Direction, protocol: string): int =
var numConns = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
for stream in peerConn.getStreams():
if stream.protocol == protocol and stream.dir == dir:
numConns += 1
return numConns

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers

# TODO: Enforce a given in/out peers ratio
let inRelayPeers = pm.getNumConnections(Direction.In, WakuRelayCodec)
let outRelayPeers = pm.getNumConnections(Direction.Out, WakuRelayCodec)
let totalRelayPeers = inRelayPeers + outRelayPeers

# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
if totalRelayPeers >= (maxConnections - 5):
return

# TODO: Track only relay connections (nwaku/issues/1566)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)

info "Relay peer connections",
connectedPeers = numConPeers,
inRelayConns = inRelayPeers,
outRelayConns = outRelayPeers,
totalRelayConns = totalRelayPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
Expand Down Expand Up @@ -530,8 +555,18 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)

proc updateMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
for dir in @[Direction.In, Direction.Out]:
for proto in pm.peerStore.getWakuProtos():
let protoDirConns = pm.getNumConnections(dir, proto)
let protoDirStreams = pm.getNumStreams(dir, proto)
waku_connected_peers.set(protoDirConns.float64, labelValues = [$dir, proto])
waku_streams_peers.set(protoDirStreams.float64, labelValues = [$dir, proto])

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.updateMetrics()
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()

Expand Down
13 changes: 11 additions & 2 deletions waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ else:
{.push raises: [].}

import
std/[tables, sequtils, sets, options, times, math],
std/[tables, sequtils, sets, options, times, math, strutils],
chronos,
eth/p2p/discoveryv5/enr,
libp2p/builders,
libp2p/peerstore

import
../../utils/peers
../../utils/peers,
../../../common/utils/sequence

export peerstore, builders

Expand Down Expand Up @@ -90,6 +91,14 @@ proc get*(peerStore: PeerStore,
numberFailedConn: peerStore[NumberFailedConnBook][peerId]
)

proc getWakuProtos*(peerStore: PeerStore): seq[string] =
## Get the waku protocols of all the stored peers.
let wakuProtocols = toSeq(peerStore[ProtoBook].book.values())
.flatten()
.deduplicate()
.filterIt(it.startsWith("/vac/waku"))
return wakuProtocols

# TODO: Rename peers() to getPeersByProtocol()
proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
## Get all the stored information of every peer.
Expand Down
29 changes: 29 additions & 0 deletions waku/v2/utils/heartbeat.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import sequtils
import chronos, chronicles

# Taken from: https://github.com/status-im/nim-libp2p/blob/master/libp2p/utils/heartbeat.nim

template heartbeat*(name: string, interval: Duration, body: untyped): untyped =
var nextHeartbeat = Moment.now()
while true:
body

nextHeartbeat += interval
let now = Moment.now()
if nextHeartbeat < now:
let
delay = now - nextHeartbeat
itv = interval
if delay > itv:
info "Missed multiple heartbeats", heartbeat = name,
delay = delay, hinterval = itv
else:
debug "Missed heartbeat", heartbeat = name,
delay = delay, hinterval = itv
nextHeartbeat = now + itv
await sleepAsync(nextHeartbeat - now)