Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve node metrics #831

Merged
merged 2 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/v2/test_jsonrpc_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ procSuite "Waku v2 JSON-RPC API":
node3.mountRelay()

# Dial nodes 2 and 3 from node1
await node1.dialPeer(constructMultiaddrStr(peerInfo2))
await node1.dialPeer(constructMultiaddrStr(peerInfo3))
await node1.connectToNodes(@[constructMultiaddrStr(peerInfo2)])
await node1.connectToNodes(@[constructMultiaddrStr(peerInfo3)])

# RPC server setup
let
Expand Down
58 changes: 37 additions & 21 deletions waku/v2/node/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ when defined(rln):
web3,
../protocol/waku_rln_relay/[rln, waku_rln_relay_utils]

declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicCounter waku_node_messages, "number of messages received", ["type", "contentTopic"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
declarePublicCounter waku_node_conns_initiated, "number of connections initiated by this node", ["source"]

logScope:
topics = "wakunode"
Expand Down Expand Up @@ -251,7 +252,10 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
if (not node.wakuStore.isNil):
await node.wakuStore.handleMessage(topic, msg.value())

waku_node_messages.inc(labelValues = ["relay"])
# Increase message counter
let ctLabel = if msg.value().contentTopic.len > 0: msg.value().contentTopic
else: "none"
waku_node_messages.inc(labelValues = ["relay", ctLabel])

let wakuRelay = node.wakuRelay

Expand Down Expand Up @@ -433,7 +437,11 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
waku_node_messages.inc(labelValues = ["filter"])

# Increase message counter
let ctLabel = if message.contentTopic.len > 0: message.contentTopic
else: "none"
waku_node_messages.inc(labelValues = ["filter", ctLabel])

node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
Expand Down Expand Up @@ -721,15 +729,20 @@ proc startKeepalive*(node: WakuNode) =
asyncSpawn node.keepaliveLoop(defaultKeepalive)

## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address
# XXX: This turns ipfs into p2p, not quite sure why
let remotePeer = parseRemotePeerInfo(address)

info "Dialing peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
proc connectToNode(n: WakuNode, remotePeer: RemotePeerInfo, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "Connecting to node", remotePeer = remotePeer, source = source

# NOTE This is dialing on WakuRelay protocol specifically
discard await n.peerManager.dialPeer(remotePeer, WakuRelayCodec)
info "Post peerManager dial"
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
let connOpt = await n.peerManager.dialPeer(remotePeer, WakuRelayCodec)

if connOpt.isSome():
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
waku_node_conns_initiated.inc(labelValues = [source])
else:
error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
waku_node_errors.inc(labelValues = ["conn_init_failure"])

proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set store peer", address = address
Expand All @@ -752,11 +765,12 @@ proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueErr

n.wakuLightPush.setPeer(remotePeer)

proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
proc connectToNodes*(n: WakuNode, nodes: seq[string], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len

for nodeId in nodes:
info "connectToNodes", node = nodeId
# XXX: This seems...brittle
await dialPeer(n, nodeId)
await connectToNode(n, parseRemotePeerInfo(nodeId), source)

# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
Expand All @@ -766,10 +780,12 @@ proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
# later.
await sleepAsync(5.seconds)

proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo]) {.async.} =
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len

for remotePeerInfo in nodes:
info "connectToNodes", peer = remotePeerInfo
discard await n.peerManager.dialPeer(remotePeerInfo, WakuRelayCodec)
await connectToNode(n, remotePeerInfo, source)

# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
Expand Down Expand Up @@ -804,7 +820,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =

if newPeers.len > 0:
debug "Connecting to newly discovered peers", count=newPeers.len()
await connectToNodes(node, newPeers)
await connectToNodes(node, newPeers, "discv5")

# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
Expand Down Expand Up @@ -1113,7 +1129,7 @@ when isMainModule:

# Connect to configured static nodes
if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes)
waitFor connectToNodes(node, conf.staticnodes, "static")

# Connect to discovered nodes
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
Expand All @@ -1136,7 +1152,7 @@ when isMainModule:
let discoveredPeers = wakuDnsDiscovery.get().findPeers()
if discoveredPeers.isOk:
info "Connecting to discovered peers"
waitFor connectToNodes(node, discoveredPeers.get())
waitFor connectToNodes(node, discoveredPeers.get(), "dnsdisc")
else:
warn "Failed to init Waku DNS discovery"

Expand Down