Skip to content

Commit

Permalink
Merge cae05f5 into a34c39c
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Jan 9, 2024
2 parents a34c39c + cae05f5 commit 9e568f7
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 3 deletions.
60 changes: 59 additions & 1 deletion tests/wakunode_rest/test_rest_admin.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/sequtils,
std/[sequtils,strformat],
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
Expand All @@ -10,6 +10,7 @@ import
import
../../waku/waku_core,
../../waku/waku_node,
../../waku/waku_filter_v2/client,
../../waku/node/peer_manager,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/client,
Expand All @@ -26,13 +27,15 @@ suite "Waku v2 Rest API - Admin":
var node1 {.threadvar.}: WakuNode
var node2 {.threadvar.}: WakuNode
var node3 {.threadvar.}: WakuNode
var peerInfo1 {.threadvar.}: RemotePeerInfo
var peerInfo2 {.threadvar.}: RemotePeerInfo
var peerInfo3 {.threadvar.}: RemotePeerInfo
var restServer {.threadvar.}: RestServerRef
var client{.threadvar.}: RestClientRef

asyncSetup:
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60600))
peerInfo1 = node1.switch.peerInfo
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60602))
peerInfo2 = node2.switch.peerInfo
node3 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60604))
Expand Down Expand Up @@ -94,3 +97,58 @@ suite "Waku v2 Rest API - Admin":
getRes.status == 200
$getRes.contentType == $MIMETYPE_JSON
getRes.data.len() == 0

asyncTest "Get filter data":
await allFutures(node1.mountFilter(), node2.mountFilterClient(), node3.mountFilterClient())

let
contentFiltersNode2 = @[DefaultContentTopic, ContentTopic("2"), ContentTopic("3")]
contentFiltersNode3 = @[ContentTopic("3"), ContentTopic("4")]
pubsubTopicNode2 = DefaultPubsubTopic
pubsubTopicNode3 = PubsubTopic("/waku/2/custom-waku/proto")

expectedFilterData2 = fmt"(peerId: ""{$peerInfo2}"", filterCriteria:" &
fmt" @[(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[0]}""), " &
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[1]}""), " &
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[2]}"")]"

expectedFilterData3 = fmt"(peerId: ""{$peerInfo3}"", filterCriteria:" &
fmt" @[(pubsubTopic: ""{pubsubTopicNode3}"", contentTopic: ""{contentFiltersNode3[0]}""), " &
fmt"(pubsubTopic: ""{pubsubTopicNode3}"", contentTopic: ""{contentFiltersNode3[1]}"")]"

let
subscribeResponse2 = await node2.wakuFilterClient.subscribe(
peerInfo1, pubsubTopicNode2, contentFiltersNode2
)
subscribeResponse3 = await node3.wakuFilterClient.subscribe(
peerInfo1, pubsubTopicNode3, contentFiltersNode3
)

assert subscribeResponse2.isOk(), $subscribeResponse2.error
assert subscribeResponse3.isOk(), $subscribeResponse3.error

let getRes = await client.getFilterSubscriptions()

check:
getRes.status == 200
$getRes.contentType == $MIMETYPE_JSON
getRes.data.len() == 2
($getRes.data).contains(expectedFilterData2)
($getRes.data).contains(expectedFilterData3)

asyncTest "Get filter data - no filter subscribers":
await node1.mountFilter()

let getRes = await client.getFilterSubscriptions()

check:
getRes.status == 200
$getRes.contentType == $MIMETYPE_JSON
getRes.data.len() == 0

asyncTest "Get filter data - filter not mounted":
let getRes = await client.getFilterSubscriptionsFilterNotMounted()

check:
getRes.status == 400
getRes.data == "Error: Filter Protocol is not mounted to the node"
8 changes: 8 additions & 0 deletions waku/waku_api/rest/admin/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ proc getPeers*():
proc postPeers*(body: seq[string]):
RestResponse[string]
{.rest, endpoint: "/admin/v1/peers", meth: HttpMethod.MethodPost.}

proc getFilterSubscriptions*():
RestResponse[seq[FilterSubscription]]
{.rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet.}

proc getFilterSubscriptionsFilterNotMounted*():
RestResponse[string]
{.rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet.}
28 changes: 26 additions & 2 deletions waku/waku_api/rest/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ else:
{.push raises: [].}

import
std/strformat,
std/sequtils,
std/[strformat,sequtils,sets,tables],
stew/byteutils,
chronicles,
json_serialization,
Expand All @@ -32,6 +31,7 @@ logScope:
topics = "waku node rest admin api"

const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers"
const ROUTE_ADMIN_V1_FILTER_SUBS* = "/admin/v1/filter/subscriptions"

type PeerProtocolTuple = tuple[multiaddr: string, protocol: string, connected: bool]

Expand Down Expand Up @@ -111,6 +111,30 @@ proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) =

return RestApiResponse.ok()

proc installAdminV1GetFilterSubsHandler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_ADMIN_V1_FILTER_SUBS) do () -> RestApiResponse:

if node.wakuFilter.isNil():
return RestApiResponse.badRequest("Error: Filter Protocol is not mounted to the node")

var
subscriptions: seq[FilterSubscription] = @[]
filterCriteria: seq[FilterTopic]

for (peerId, criteria) in node.wakuFilter.subscriptions.pairs():
filterCriteria = criteria.toSeq().mapIt(FilterTopic(pubsubTopic: it[0],
contentTopic: it[1]))

subscriptions.add(FilterSubscription(peerId: $peerId, filterCriteria: filterCriteria))

let resp = RestApiResponse.jsonResponse(subscriptions, status=Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}"))

return resp.get()

proc installAdminApiHandlers*(router: var RestRouter, node: WakuNode) =
installAdminV1GetPeersHandler(router, node)
installAdminV1PostPeersHandler(router, node)
installAdminV1GetFilterSubsHandler(router, node)
41 changes: 41 additions & 0 deletions waku/waku_api/rest/admin/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ paths:
description: Cannot connect to one or more peers.
'5XX':
description: Unexpected error.
/admin/v1/filter/subscriptions:
get:
summary: Get filter protocol subscribers
description: Retrieve information about the serving filter subscriptions
operationId: getFilterInfo
tags:
- admin
responses:
'200':
description: Information about subscribed filter peers and topics
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/FilterSubscription'
'400':
description: Filter Protocol is not mounted to the node
'5XX':
description: Unexpected error.

components:
schemas:
Expand All @@ -72,3 +92,24 @@ components:
type: string
connected:
type: boolean

FilterSubscription:
type: object
required:
- peerId
- filterCriteria
properties:
peerId:
type: string
filterCriteria:
type: array
items:
type: object
required:
- pubsubTopic
- contentTopic
properties:
pubsubTopic:
type: string
contentTopic:
type: string
84 changes: 84 additions & 0 deletions waku/waku_api/rest/admin/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ type

type WakuPeers* = seq[WakuPeer]

type
FilterTopic* = object
pubsubTopic*: string
contentTopic*: string

type
FilterSubscription* = object
peerId*: string
filterCriteria*: seq[FilterTopic]

#### Serialization and deserialization

proc writeValue*(writer: var JsonWriter[RestJson], value: ProtocolState)
Expand All @@ -41,6 +51,20 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: WakuPeer)
writer.writeField("protocols", value.protocols)
writer.endRecord()

proc writeValue*(writer: var JsonWriter[RestJson], value: FilterTopic)
{.raises: [IOError].} =
writer.beginRecord()
writer.writeField("pubsubTopic", value.pubsubTopic)
writer.writeField("contentTopic", value.contentTopic)
writer.endRecord()

proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscription)
{.raises: [IOError].} =
writer.beginRecord()
writer.writeField("peerId", value.peerId)
writer.writeField("filterCriteria", value.filterCriteria)
writer.endRecord()

proc readValue*(reader: var JsonReader[RestJson], value: var ProtocolState)
{.gcsafe, raises: [SerializationError, IOError].} =
var
Expand Down Expand Up @@ -101,6 +125,66 @@ proc readValue*(reader: var JsonReader[RestJson], value: var WakuPeer)
protocols: protocols.get()
)

proc readValue*(reader: var JsonReader[RestJson], value: var FilterTopic)
{.gcsafe, raises: [SerializationError, IOError].} =
var
pubsubTopic: Option[string]
contentTopic: Option[string]

for fieldName in readObjectFields(reader):
case fieldName
of "pubsubTopic":
if pubsubTopic.isSome():
reader.raiseUnexpectedField("Multiple `pubsubTopic` fields found", "FilterTopic")
pubsubTopic = some(reader.readValue(string))
of "contentTopic":
if contentTopic.isSome():
reader.raiseUnexpectedField("Multiple `contentTopic` fields found", "FilterTopic")
contentTopic = some(reader.readValue(string))
else:
unrecognizedFieldWarning()

if pubsubTopic.isNone():
reader.raiseUnexpectedValue("Field `pubsubTopic` is missing")

if contentTopic.isNone():
reader.raiseUnexpectedValue("Field `contentTopic` are missing")

value = FilterTopic(
pubsubTopic: pubsubTopic.get(),
contentTopic: contentTopic.get()
)

proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscription)
{.gcsafe, raises: [SerializationError, IOError].} =
var
peerId: Option[string]
filterCriteria: Option[seq[FilterTopic]]

for fieldName in readObjectFields(reader):
case fieldName
of "peerId":
if peerId.isSome():
reader.raiseUnexpectedField("Multiple `peerId` fields found", "FilterSubscription")
peerId = some(reader.readValue(string))
of "filterCriteria":
if filterCriteria.isSome():
reader.raiseUnexpectedField("Multiple `filterCriteria` fields found", "FilterSubscription")
filterCriteria = some(reader.readValue(seq[FilterTopic]))
else:
unrecognizedFieldWarning()

if peerId.isNone():
reader.raiseUnexpectedValue("Field `peerId` is missing")

if filterCriteria.isNone():
reader.raiseUnexpectedValue("Field `filterCriteria` are missing")

value = FilterSubscription(
peerId: peerId.get(),
filterCriteria: filterCriteria.get()
)

## Utility for populating WakuPeers and ProtocolState
func `==`*(a, b: ProtocolState): bool {.inline.} =
return a.protocol == b.protocol
Expand Down

0 comments on commit 9e568f7

Please sign in to comment.