Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lightpush rest api #2052

Merged
merged 8 commits into from
Sep 22, 2023
3 changes: 3 additions & 0 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import
../../waku/node/rest/filter/handlers as rest_filter_api,
../../waku/node/rest/store/handlers as rest_store_api,
../../waku/node/rest/health/handlers as rest_health_api,
../../waku/node/rest/lightpush/handlers as rest_lightpush_api,
../../waku/node/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/node/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/node/jsonrpc/filter/handlers as rpc_filter_api,
Expand Down Expand Up @@ -590,6 +591,8 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
## Store REST API
installStoreApiHandlers(server.router, app.node)

installLightPushRequestHandler(server.router, app.node)

server.start()
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"

Expand Down
3 changes: 2 additions & 1 deletion tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ import
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter
./wakunode_rest/test_rest_legacy_filter,
./wakunode_rest/test_rest_lightpush

import
./waku_rln_relay/test_waku_rln_relay,
Expand Down
194 changes: 194 additions & 0 deletions tests/wakunode_rest/test_rest_lightpush.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
{.used.}

import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto

import
../../waku/node/message_cache,
../../waku/common/base64,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_lightpush,
../../waku/node/rest/server,
../../waku/node/rest/client,
../../waku/node/rest/responses,
../../waku/node/rest/lightpush/types,
../../waku/node/rest/lightpush/handlers as lightpush_api,
../../waku/node/rest/lightpush/client as lightpush_api_client,
../../waku/waku_relay,
../testlib/wakucore,
../testlib/wakunode


proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(0)

return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))


type RestLightPushTest = object
serviceNode: WakuNode
pushNode: WakuNode
consumerNode: WakuNode
restServer: RestServerRef
client: RestClientRef


proc init(T: type RestLightPushTest): Future[T] {.async.} =
var testSetup = RestLightPushTest()
testSetup.serviceNode = testWakuNode()
testSetup.pushNode = testWakuNode()
testSetup.consumerNode = testWakuNode()

await allFutures(testSetup.serviceNode.start(),
testSetup.pushNode.start(),
testSetup.consumerNode.start())

await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountLightPush()
testSetup.pushNode.mountLightPushClient()


testSetup.serviceNode.peerManager.addServicePeer(
testSetup.consumerNode.peerInfo.toRemotePeerInfo(),
WakuRelayCodec)

await testSetup.serviceNode.connectToNodes(@[testSetup.consumerNode.peerInfo.toRemotePeerInfo()])

testSetup.pushNode.peerManager.addServicePeer(
testSetup.serviceNode.peerInfo.toRemotePeerInfo(),
WakuLightPushCodec)

let restPort = Port(58011)
let restAddress = ValidIpAddress.init("127.0.0.1")
testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet()

installLightPushRequestHandler(testSetup.restServer.router, testSetup.pushNode)

testSetup.restServer.start()

testSetup.client = newRestHttpClient(initTAddress(restAddress, restPort))

return testSetup


proc shutdown(self: RestLightPushTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.serviceNode.stop(), self.pushNode.stop())


suite "Waku v2 Rest API - lightpush":
asyncTest "Push message request":
# Given
let restLightPushTest = await RestLightPushTest.init()

restLightPushTest.consumerNode.subscribe(DefaultPubsubTopic)
restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic)
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

# When
let message : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic,
payload = toBytes("TEST-1")).toRelayWakuMessage()

let requestBody = PushRequest(pubsubTopic: some(DefaultPubsubTopic),
message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 200
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we have any "400" or "500" test cases as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on those. Just not added yet.

$response.contentType == $MIMETYPE_TEXT

await restLightPushTest.shutdown()

asyncTest "Push message bad-request":
# Given
let restLightPushTest = await RestLightPushTest.init()

restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic)
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

# When
let badMessage1 : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic,
payload = toBytes("")).toRelayWakuMessage()
let badRequestBody1 = PushRequest(pubsubTopic: some(DefaultPubsubTopic),
message: badMessage1)

let badMessage2 : RelayWakuMessage = fakeWakuMessage(contentTopic = "",
payload = toBytes("Sthg")).toRelayWakuMessage()
let badRequestBody2 = PushRequest(pubsubTopic: some(DefaultPubsubTopic),
message: badMessage2)

let badRequestBody3 = PushRequest(pubsubTopic: none(PubsubTopic),
message: badMessage2)

var response: RestResponse[string]

response = await restLightPushTest.client.sendPushRequest(badRequestBody1)

echo "response", $response

# Then
check:
response.status == 400
$response.contentType == $MIMETYPE_TEXT
NagyZoltanPeter marked this conversation as resolved.
Show resolved Hide resolved
response.data.startsWith("Invalid content body")


# when
response = await restLightPushTest.client.sendPushRequest(badRequestBody2)

# Then
check:
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data.startsWith("Invalid content body")

# when
response = await restLightPushTest.client.sendPushRequest(badRequestBody3)

# Then
check:
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data.startsWith("Invalid content body")

await restLightPushTest.shutdown()

asyncTest "Push message request service not available":
# Given
let restLightPushTest = await RestLightPushTest.init()

# When
let message : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic,
payload = toBytes("TEST-1")).toRelayWakuMessage()

let requestBody = PushRequest(pubsubTopic: some("NoExistTopic"),
message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 503
$response.contentType == $MIMETYPE_TEXT
response.data == "Failed to request a message push: Can not publish to any peers"

await restLightPushTest.shutdown()
5 changes: 3 additions & 2 deletions tests/wakunode_rest/test_rest_relay_serdes.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ suite "Waku v2 Rest API - Relay - serialization":
test "optional fields are not provided":
# Given
let payload = base64.encode("MESSAGE")
let jsonBytes = toBytes("{\"payload\":\"" & $payload & "\"}")
let jsonBytes = toBytes("{\"payload\":\"" & $payload & "\",\"contentTopic\":\"some/topic\"}")

# When
let res = decodeFromJsonBytes(RelayWakuMessage, jsonBytes, requireAllFields = true)
Expand All @@ -29,7 +29,8 @@ suite "Waku v2 Rest API - Relay - serialization":
let value = res.get()
check:
value.payload == payload
value.contentTopic.isNone()
value.contentTopic.isSome()
value.contentTopic.get() == "some/topic"
value.version.isNone()
value.timestamp.isNone()

Expand Down
49 changes: 49 additions & 0 deletions waku/node/rest/lightpush/client.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
json,
std/sets,
stew/byteutils,
strformat,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../waku_core,
../serdes,
../responses,
./types

export types

logScope:
topics = "waku node rest client v2"

proc encodeBytes*(value: PushRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")

let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)

proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")

var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
Ivansete-status marked this conversation as resolved.
Show resolved Hide resolved
Ivansete-status marked this conversation as resolved.
Show resolved Hide resolved

proc sendPushRequest*(body: PushRequest):
RestResponse[string]
{.rest, endpoint: "/lightpush/v1/message", meth: HttpMethod.MethodPost.}
86 changes: 86 additions & 0 deletions waku/node/rest/lightpush/handlers.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/strformat,
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/route,
presto/common

import
../../../waku_core,
../../peer_manager,
../../waku_node,
../../waku/waku_lightpush,
../serdes,
../responses,
./types

export types

logScope:
topics = "waku node rest lightpush api"

const futTimeoutForPushRequestProcessing* = 5.seconds

#### Request handlers

const ROUTE_LIGHTPUSH* = "/lightpush/v1/message"

func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] =
if contentBody.isNone():
return err(RestApiResponse.badRequest("Missing content body"))

let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))

let reqBodyData = contentBody.get().data

let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error))

return ok(requestResult.get())

proc installLightPushRequestHandler*(router: var RestRouter,
node: WakuNode) =

router.api(MethodPost, ROUTE_LIGHTPUSH) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Send a request to push a waku message
debug "post", ROUTE_LIGHTPUSH, contentBody

let decodedBody = decodeRequestBody[PushRequest](contentBody)

if decodedBody.isErr():
return decodedBody.error()

let req: PushRequest = decodedBody.value()
let msg = req.message.toWakuMessage()

if msg.isErr():
return RestApiResponse.badRequest("Invalid message: {msg.error}")

let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
return RestApiResponse.serviceUnavailable("No suitable remote lightpush peers")

let subFut = node.lightpushPublish(req.pubsubTopic,
msg.value(),
peerOpt.get())

if not await subFut.withTimeout(futTimeoutForPushRequestProcessing):
error "Failed to request a message push due to timeout!"
return RestApiResponse.serviceUnavailable("Push request timed out")

if subFut.value().isErr():
return RestApiResponse.serviceUnavailable(fmt("Failed to request a message push: {subFut.value().error}"))

return RestApiResponse.ok()
Loading
Loading