Skip to content

Commit

Permalink
Merge branch 'master' into chore-add-size-retention-policy
Browse files Browse the repository at this point in the history
  • Loading branch information
ABresting authored Oct 9, 2023
2 parents 5022d3e + 2c5eb42 commit 7a4ebbd
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 40 deletions.
39 changes: 37 additions & 2 deletions tests/test_waku_lightpush.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{.used.}

import
std/options,
std/strscans,
testutils/unittests,
chronicles,
chronos,
Expand All @@ -10,10 +12,11 @@ import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/client,
../../waku/waku_lightpush/protocol_metrics,
../../waku/waku_lightpush/rpc,
./testlib/common,
./testlib/wakucore


proc newTestWakuLightpushNode(switch: Switch, handler: PushMessageHandler): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
Expand Down Expand Up @@ -115,4 +118,36 @@ suite "Waku Lightpush":
requestError == error

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
await allFutures(clientSwitch.stop(), serverSwitch.stop())

asyncTest "incorrectly encoded request should return an erring response":
## Setup
let
serverSwitch = newTestSwitch()
handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
## this handler will never be called: request must fail earlier
return ok()
server = await newTestWakuLightpushNode(serverSwitch, handler)

## Given
let
fakeBuffer = @[byte(42)]
fakePeerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()

## When
let
pushRpcResponse = await server.handleRequest(fakePeerId, fakeBuffer)
requestId = pushRpcResponse.requestId

## Then
check:
requestId == ""
pushRpcResponse.response.isSome()

let resp = pushRpcResponse.response.get()

check:
resp.isSuccess == false
resp.info.isSome()
## the error message should start with decodeRpcFailure
scanf(resp.info.get(), decodeRpcFailure)
75 changes: 37 additions & 38 deletions waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,47 +35,46 @@ type
peerManager*: PeerManager
pushHandler*: PushMessageHandler

proc initProtocolHandler*(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let reqDecodeRes = PushRPC.decode(buffer)
if reqDecodeRes.isErr():
error "failed to decode rpc"
waku_lightpush_errors.inc(labelValues = [decodeRpcFailure])
return

let req = reqDecodeRes.get()
if req.request.isNone():
error "invalid lightpush rpc received", error=emptyRequestBodyFailure
waku_lightpush_errors.inc(labelValues = [emptyRequestBodyFailure])
return
proc handleRequest*(wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]): Future[PushRPC] {.async.} =
let reqDecodeRes = PushRPC.decode(buffer)
var
isSuccess = false
pushResponseInfo = ""
requestId = ""

if reqDecodeRes.isErr():
pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error
elif reqDecodeRes.get().request.isNone():
pushResponseInfo = emptyRequestBodyFailure
else:
let pushRpcRequest = reqDecodeRes.get();

requestId = pushRpcRequest.requestId

waku_lightpush_messages.inc(labelValues = ["PushRequest"])
let
pubSubTopic = req.request.get().pubSubTopic
message = req.request.get().message
debug "push request", peerId=conn.peerId, requestId=req.requestId, pubsubTopic=pubsubTopic

var response: PushResponse
var handleRes: WakuLightPushResult[void]
try:
handleRes = await wl.pushHandler(conn.peerId, pubsubTopic, message)
except Exception:
response = PushResponse(is_success: false, info: some(getCurrentExceptionMsg()))
waku_lightpush_errors.inc(labelValues = [messagePushFailure])
error "pushed message handling failed", error= getCurrentExceptionMsg()


if handleRes.isOk():
response = PushResponse(is_success: true, info: some("OK"))
else:
response = PushResponse(is_success: false, info: some(handleRes.error))
waku_lightpush_errors.inc(labelValues = [messagePushFailure])
error "pushed message handling failed", error=handleRes.error

let rpc = PushRPC(requestId: req.requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
request = pushRpcRequest.request

pubSubTopic = request.get().pubSubTopic
message = request.get().message
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
debug "push request", peerId=peerId, requestId=requestId, pubsubTopic=pubsubTopic

let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)

if not isSuccess:
waku_lightpush_errors.inc(labelValues = [pushResponseInfo])
error "failed to push message", error=pushResponseInfo
let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo))
let rpc = PushRPC(requestId: requestId, response: some(response))
return rpc

proc initProtocolHandler*(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let rpc = await handleRequest(wl, conn.peerId, buffer)
await conn.writeLp(rpc.encode().buffer)
wl.handler = handle
wl.codec = WakuLightPushCodec

Expand Down

0 comments on commit 7a4ebbd

Please sign in to comment.