Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: API discovery handler #2109

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_api/message_cache,
../../waku/waku_api/cache_handlers,
../../waku/waku_api/handlers,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
Expand Down Expand Up @@ -679,18 +679,38 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)

let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)

let filterDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
else: none(DiscoveryHandler)

rest_filter_api.installFilterRestApiHandlers(
server.router,
app.node,
filterCache,
filterDiscoHandler,
)
else:
notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode"


## Store REST API
installStoreApiHandlers(server.router, app.node)
let storeDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
else: none(DiscoveryHandler)

installStoreApiHandlers(server.router, app.node, storeDiscoHandler)

## Light push API
if conf.lightpushnode != "" and
app.node.wakuLightpushClient != nil:
rest_lightpush_api.installLightPushRequestHandler(server.router, app.node)
let lightDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
else: none(DiscoveryHandler)

rest_lightpush_api.installLightPushRequestHandler(server.router, app.node, lightDiscoHandler)
else:
notInstalledTab["lightpush"] = "/lightpush endpoints are not available. Please check your configuration: --lightpushnode"

Expand Down
2 changes: 1 addition & 1 deletion tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ procSuite "Waku v2 Rest API - Store":
response.status == 412
$response.contentType == $MIMETYPE_TEXT
response.data.messages.len == 0
response.data.error_message.get == "Missing known store-peer node"
response.data.error_message.get == NoPeerNoDiscError.errobj.message

# Now add the storenode from "config"
node.peerManager.addServicePeer(remotePeerInfo,
Expand Down
23 changes: 0 additions & 23 deletions waku/waku_api/cache_handlers.nim

This file was deleted.

50 changes: 50 additions & 0 deletions waku/waku_api/handlers.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
chronos,
chronicles,
std/[options, sequtils],
stew/results
import
../waku_discv5,
../waku_relay,
../waku_core,
./message_cache

### Discovery

type DiscoveryHandler* = proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.}

proc defaultDiscoveryHandler*(discv5: WakuDiscoveryV5, cap: Capabilities): DiscoveryHandler =
proc(): Future[Result[Option[RemotePeerInfo], string]] {.async, closure.} =
#Discv5 is already filtering peers by shards no need to pass a predicate.
let findPeers = discv5.findRandomPeers()

if not await findPeers.withTimeout(60.seconds):
return err("discovery process timed out!")

var peers = findPeers.read()

peers.keepItIf(it.supportsCapability(cap))

if peers.len == 0:
return ok(none(RemotePeerInfo))

let remotePeerInfo = peers[0].toRemotePeerInfo().valueOr:
return err($error)

return ok(some(remotePeerInfo))

### Message Cache

proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)

proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler =
return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
if cache.isSubscribed(msg.contentTopic):
cache.addMessage(msg.contentTopic, msg)
2 changes: 1 addition & 1 deletion waku/waku_api/jsonrpc/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import
../../../waku_rln_relay/rln/wrappers,
../../../waku_node,
../../message_cache,
../../cache_handlers,
../../handlers,
../message

from std/times import getTime
Expand Down
146 changes: 94 additions & 52 deletions waku/waku_api/rest/filter/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import
../../../waku_filter_v2/client as filter_protocol_client,
../../../waku_filter_v2/common as filter_protocol_type,
../../message_cache,
../../handlers,
../serdes,
../responses,
./types
Expand Down Expand Up @@ -145,11 +146,18 @@ proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type

return resp.get()

proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache):
Future[RestApiResponse]
{.async.} =
const NoPeerNoDiscoError = FilterSubscribeError.serviceUnavailable(
"No suitable service peer & no discovery method")

const NoPeerNoneFoundError = FilterSubscribeError.serviceUnavailable(
"No suitable service peer & none discovered")

proc filterPostPutSubscriptionRequestHandler(
node: WakuNode,
contentBody: Option[ContentBody],
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
): Future[RestApiResponse] {.async.} =
## handles any filter subscription requests, adds or modifies.

let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody)
Expand All @@ -159,14 +167,17 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode,

let req: FilterSubscribeRequest = decodedBody.value()

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)

let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

if peerOpt.isNone():
return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers"))
peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)

let subFut = node.filterSubscribe(req.pubsubTopic,
req.contentFilters,
peerOpt.get())
let subFut = node.filterSubscribe(req.pubsubTopic, req.contentFilters, peer)

if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
Expand All @@ -178,29 +189,36 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode,

return makeRestResponse(req.requestId, subFut.read())

proc installFilterPostSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterPostSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody

let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler)

proc installFilterPutSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterPutSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic
debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody

let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache)
return response
return await filterPostPutSubscriptionRequestHandler(node, contentBody, cache, discHandler)

proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterDeleteSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody
Expand All @@ -213,13 +231,18 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,

let req: FilterUnsubscribeRequest = decodedBody.value()

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)

if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)

let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peer)

let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
Expand All @@ -233,9 +256,12 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())

proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
proc installFilterDeleteAllSubscriptionsHandler(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody
Expand All @@ -248,13 +274,18 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,

let req: FilterUnsubscribeAllRequest = decodedBody.value()

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(req.requestId, NoPeerNoDiscoError)

if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

peerOp.valueOr:
return makeRestResponse(req.requestId, NoPeerNoneFoundError)

let unsubFut = node.filterUnsubscribeAll(peerOpt.get())
let unsubFut = node.filterUnsubscribeAll(peer)

if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
Expand All @@ -268,18 +299,26 @@ proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,

const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}"

proc installFilterPingSubscriberHandler(router: var RestRouter,
node: WakuNode) =
proc installFilterPingSubscriberHandler(
router: var RestRouter,
node: WakuNode,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse:
## Checks if a node has valid subscription or not.
debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)
if peerOpt.isNone():
return makeRestResponse(requestId.get(),
FilterSubscribeError.serviceUnavailable("No suitable remote filter peers"))
let peer = node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let handler = discHandler.valueOr:
return makeRestResponse(requestId.get(), NoPeerNoDiscoError)

let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError($error)

peerOp.valueOr:
return makeRestResponse(requestId.get(), NoPeerNoneFoundError)

let pingFutRes = node.wakuFilterClient.ping(peerOpt.get())
let pingFutRes = node.wakuFilterClient.ping(peer)

if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to ping filter service peer due to timeout!"
Expand Down Expand Up @@ -325,12 +364,15 @@ proc installFilterGetMessagesHandler(router: var RestRouter,

return resp.get()

proc installFilterRestApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterPingSubscriberHandler(router, node)
installFilterPostSubscriptionsHandler(router, node, cache)
installFilterPutSubscriptionsHandler(router, node, cache)
installFilterDeleteSubscriptionsHandler(router, node, cache)
installFilterDeleteAllSubscriptionsHandler(router, node, cache)
proc installFilterRestApiHandlers*(
router: var RestRouter,
node: WakuNode,
cache: MessageCache,
discHandler: Option[DiscoveryHandler] = none(DiscoveryHandler),
) =
installFilterPingSubscriberHandler(router, node, discHandler)
installFilterPostSubscriptionsHandler(router, node, cache, discHandler)
installFilterPutSubscriptionsHandler(router, node, cache, discHandler)
installFilterDeleteSubscriptionsHandler(router, node, cache, discHandler)
installFilterDeleteAllSubscriptionsHandler(router, node, cache, discHandler)
installFilterGetMessagesHandler(router, node, cache)
Loading