Skip to content

Commit

Permalink
Merge e532e21 into e72bb7e
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Jun 10, 2024
2 parents e72bb7e + e532e21 commit f2ccdcc
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 23 deletions.
12 changes: 6 additions & 6 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
return

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
debug "waku.relay received",
notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = topic.computeMessageHash(msg).to0xHex(),
Expand Down Expand Up @@ -356,10 +356,10 @@ proc publish*(
#TODO instead of discard return error when 0 peers received the message
discard await node.wakuRelay.publish(pubsubTopic, message)

trace "waku.relay published",
notice "waku.relay published",
peerId = node.peerId,
pubsubTopic = pubsubTopic,
hash = pubsubTopic.computeMessageHash(message).to0xHex(),
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
publishTime = getNowInNanosecondTime()

return ok()
Expand Down Expand Up @@ -952,7 +952,7 @@ proc mountLightPush*(
if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
debug "Lightpush request has not been published to any peers",
notice "Lightpush request has not been published to any peers",
msg_hash = msgHash

return ok()
Expand Down Expand Up @@ -994,15 +994,15 @@ proc lightpushPublish*(
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
debug "publishing message with lightpush",
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)

if not node.wakuLightPush.isNil():
debug "publishing message with self hosted lightpush",
notice "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
Expand Down
4 changes: 2 additions & 2 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ proc handleMessage*(
else:
getNanosecondTime(getTime().toUnixFloat())

trace "handling message",
notice "archive handling message",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
Expand All @@ -117,7 +117,7 @@ proc handleMessage*(
waku_archive_errors.inc(labelValues = [insertFailure])
error "failed to insert message", error = error

debug "message archived",
notice "message archived",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
Expand Down
14 changes: 8 additions & 6 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ proc handleSubscribeRequest*(
return FilterSubscribeResponse.ok(request.requestId)

proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
trace "pushing message to subscribed peer", peer = peer
trace "pushing message to subscribed peer", peer_id = shortLog(peer)

if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
Expand All @@ -176,7 +176,7 @@ proc pushToPeers(
let msgHash =
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()

debug "pushing message to subscribed peers",
notice "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
Expand Down Expand Up @@ -216,7 +216,7 @@ proc handleMessage*(
) {.async.} =
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
notice "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash

let handleMessageStartTime = Moment.now()

Expand All @@ -225,8 +225,10 @@ proc handleMessage*(
let subscribedPeers =
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len == 0:
debug "no subscribed peers found",
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
notice "no subscribed peers found",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash
return

let messagePush = MessagePush(pubsubTopic: pubsubTopic, wakuMessage: message)
Expand All @@ -242,7 +244,7 @@ proc handleMessage*(
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
else:
debug "pushed message succesfully to all subscribers",
notice "pushed message succesfully to all subscribers",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash,
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ proc handleRequest*(
pubSubTopic = request.get().pubSubTopic
message = request.get().message
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
debug "push request",
peerId = peerId,
notice "lightpush request",
peer_id = peerId,
requestId = requestId,
pubsubTopic = pubsubTopic,
hash = pubsubTopic.computeMessageHash(message).to0xHex()
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex()

let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
isSuccess = handleRes.isOk()
Expand Down
14 changes: 8 additions & 6 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,17 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
pubsubTopic = pubsubTopic, error = $error
return ValidationResult.Reject

let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()

# now sequentially validate the message
for (validator, _) in w.wakuValidators:
for (validator, errorMessage) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)

if validatorRes != ValidationResult.Accept:
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
error "protocol generateOrderedValidator reject waku validator",
msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes
msg_hash = msgHash,
pubsubTopic = pubsubTopic,
validatorRes = validatorRes,
error = errorMessage

return validatorRes

Expand Down Expand Up @@ -305,8 +307,8 @@ proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[int] {.async.} =
let data = message.encode().buffer
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic

return await procCall GossipSub(w).publish(pubsubTopic, data)

0 comments on commit f2ccdcc

Please sign in to comment.