Skip to content

Commit

Permalink
set msg_hash info logs under a new compilation flag 'log_msg_hash'
Browse files Browse the repository at this point in the history
This is needed to help the Distributed Systems Testing team, DST, to
have less logs when analysing thousands of nodes. With that, the
DST team will set the log level to INFO and that will avoid printing
too many logs.
  • Loading branch information
Ivansete-status committed May 29, 2024
1 parent 25a1aa9 commit c9879a8
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 67 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ ifeq ($(POSTGRES), 1)
NIM_PARAMS := $(NIM_PARAMS) -d:postgres -d:nimDebugDlOpen
endif

ifeq ($(LOG_HASH), 1)
NIM_PARAMS := $(NIM_PARAMS) -d:log_msg_hash
endif

clean: | clean-libbacktrace


Expand Down
53 changes: 29 additions & 24 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,13 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
return

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
debug "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = topic.computeMessageHash(msg).to0xHex(),
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
when defined(log_msg_hash):
info "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = topic.computeMessageHash(msg).to0xHex(),
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

let msgSizeKB = msg.payload.len / 1000

Expand Down Expand Up @@ -356,11 +357,12 @@ proc publish*(
#TODO instead of discard return error when 0 peers received the message
discard await node.wakuRelay.publish(pubsubTopic, message)

trace "waku.relay published",
peerId = node.peerId,
pubsubTopic = pubsubTopic,
hash = pubsubTopic.computeMessageHash(message).to0xHex(),
publishTime = getNowInNanosecondTime()
when defined(log_msg_hash):
info "waku.relay published",
peerId = node.peerId,
pubsubTopic = pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
publishTime = getNowInNanosecondTime()

return ok()

Expand Down Expand Up @@ -951,9 +953,10 @@ proc mountLightPush*(

if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
debug "Lightpush request has not been published to any peers",
msg_hash = msgHash
when defined(log_msg_hash):
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
info "Lightpush request has not been published to any peers",
msg_hash = msgHash

return ok()

Expand Down Expand Up @@ -994,19 +997,21 @@ proc lightpushPublish*(
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
debug "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
when defined(log_msg_hash):
info "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)

if not node.wakuLightPush.isNil():
debug "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
when defined(log_msg_hash):
info "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message)

if pubsubTopic.isSome():
Expand Down
30 changes: 16 additions & 14 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,29 @@ proc handleMessage*(
else:
getNanosecondTime(getTime().toUnixFloat())

trace "handling message",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = msgDigestHex
when defined(log_msg_hash):
info "archive handling message",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = msgDigestHex

let insertStartTime = getTime().toUnixFloat()

(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
error "failed to insert message", error = error

debug "message archived",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = msgDigestHex
when defined(log_msg_hash):
info "message archived",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = msgDigestHex

let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)
Expand Down
42 changes: 24 additions & 18 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ proc handleSubscribeRequest*(
return FilterSubscribeResponse.ok(request.requestId)

proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
trace "pushing message to subscribed peer", peer = peer
trace "pushing message to subscribed peer", peer_id = shortLog(peer)

if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
Expand All @@ -172,15 +172,16 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
proc pushToPeers(
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
) {.async.} =
let targetPeerIds = peers.mapIt(shortLog(it))
let msgHash =
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
when defined(log_msg_hash):
let targetPeerIds = peers.mapIt(shortLog(it))
let msgHash =
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()

debug "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
msg_hash = msgHash
info "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
msg_hash = msgHash

let bufferToPublish = messagePush.encode().buffer

Expand Down Expand Up @@ -216,7 +217,8 @@ proc handleMessage*(
) {.async.} =
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
when defined(log_msg_hash):
info "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash

let handleMessageStartTime = Moment.now()

Expand All @@ -225,8 +227,11 @@ proc handleMessage*(
let subscribedPeers =
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len == 0:
debug "no subscribed peers found",
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
when defined(log_msg_hash):
info "no subscribed peers found",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash
return

let messagePush = MessagePush(pubsubTopic: pubsubTopic, wakuMessage: message)
Expand All @@ -242,12 +247,13 @@ proc handleMessage*(
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
else:
debug "pushed message succesfully to all subscribers",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash,
numPeers = subscribedPeers.len,
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
when defined(log_msg_hash):
info "pushed message succesfully to all subscribers",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash,
numPeers = subscribedPeers.len,
target_peer_ids = subscribedPeers.mapIt(shortLog(it))

let
handleMessageDuration = Moment.now() - handleMessageStartTime
Expand Down
11 changes: 6 additions & 5 deletions waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ proc handleRequest*(
pubSubTopic = request.get().pubSubTopic
message = request.get().message
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
debug "push request",
peerId = peerId,
requestId = requestId,
pubsubTopic = pubsubTopic,
hash = pubsubTopic.computeMessageHash(message).to0xHex()
when defined(log_msg_hash):
info "lightpush request",
peer_id = peerId,
requestId = requestId,
pubsubTopic = pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex()

let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
isSuccess = handleRes.isOk()
Expand Down
15 changes: 9 additions & 6 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,17 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
pubsubTopic = pubsubTopic, error = $error
return ValidationResult.Reject

let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()

# now sequentially validate the message
for (validator, _) in w.wakuValidators:
for (validator, errorMessage) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)

if validatorRes != ValidationResult.Accept:
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
error "protocol generateOrderedValidator reject waku validator",
msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes
msg_hash = msgHash,
pubsubTopic = pubsubTopic,
validatorRes = validatorRes,
error = errorMessage

return validatorRes

Expand Down Expand Up @@ -305,8 +307,9 @@ proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[int] {.async.} =
let data = message.encode().buffer
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
when defined(log_msg_hash):
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
info "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic

return await procCall GossipSub(w).publish(pubsubTopic, data)

0 comments on commit c9879a8

Please sign in to comment.