diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 3b458cc917..97dbec7475 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -38,6 +38,7 @@ import ../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, ../../waku/waku_api/rest/store/handlers as rest_store_api, ../../waku/waku_api/rest/health/handlers as rest_health_api, + ../../waku/waku_api/rest/admin/handlers as rest_admin_api, ../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api, ../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api, ../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api, @@ -567,6 +568,9 @@ proc startApp*(app: App): Future[AppResult[void]] {.async.} = proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] = let server = ? newRestHttpServer(address, port) + ## Admin REST API + installAdminApiHandlers(server.router, app.node) + ## Debug REST API installDebugApiHandlers(server.router, app.node) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 2922817700..c527d7f63e 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -99,7 +99,8 @@ import ./wakunode_rest/test_rest_store, ./wakunode_rest/test_rest_filter, ./wakunode_rest/test_rest_legacy_filter, - ./wakunode_rest/test_rest_lightpush + ./wakunode_rest/test_rest_lightpush, + ./wakunode_rest/test_rest_admin import ./waku_rln_relay/test_waku_rln_relay, diff --git a/tests/wakunode_rest/test_all.nim b/tests/wakunode_rest/test_all.nim index 5c568820aa..620ae8a708 100644 --- a/tests/wakunode_rest/test_all.nim +++ b/tests/wakunode_rest/test_all.nim @@ -9,4 +9,5 @@ import ./test_rest_relay_serdes, ./test_rest_relay, ./test_rest_serdes, - ./test_rest_store + ./test_rest_store, + ./test_rest_admin diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim new file mode 100644 index 0000000000..42aea1b173 --- /dev/null +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -0,0 +1,96 @@ +{.used.} + +import + std/sequtils, + stew/shims/net, + testutils/unittests, + presto, presto/client as presto_client, + libp2p/crypto/crypto + +import + ../../waku/waku_core, + ../../waku/waku_node, + ../../waku/node/peer_manager, + ../../waku/waku_api/rest/server, + ../../waku/waku_api/rest/client, + ../../waku/waku_api/rest/responses, + ../../waku/waku_api/rest/admin/types, + ../../waku/waku_api/rest/admin/handlers as admin_api, + ../../waku/waku_api/rest/admin/client as admin_api_client, + ../../waku/waku_relay, + ../testlib/wakucore, + ../testlib/wakunode, + ../testlib/testasync + +suite "Waku v2 Rest API - Admin": + var node1 {.threadvar.}: WakuNode + var node2 {.threadvar.}: WakuNode + var node3 {.threadvar.}: WakuNode + var peerInfo2 {.threadvar.}: RemotePeerInfo + var peerInfo3 {.threadvar.}: RemotePeerInfo + var restServer {.threadvar.}: RestServerRef + var client{.threadvar.}: RestClientRef + + asyncSetup: + node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(60600)) + node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(60602)) + peerInfo2 = node2.switch.peerInfo + node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(60604)) + peerInfo3 = node3.switch.peerInfo + + await allFutures(node1.start(), node2.start(), node3.start()) + await allFutures(node1.mountRelay(), node2.mountRelay(), node3.mountRelay()) + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("127.0.0.1") + restServer = RestServerRef.init(restAddress, restPort).tryGet() + + installAdminApiHandlers(restServer.router, node1) + + restServer.start() + + client = newRestHttpClient(initTAddress(restAddress, restPort)) + + asyncTearDown: + await restServer.stop() + await restServer.closeWait() + await allFutures(node1.stop(), node2.stop(), node3.stop()) + + asyncTest "Set and get remote peers": + # Connect to nodes 2 and 3 using the Admin API + let postRes = await client.postPeers(@[constructMultiaddrStr(peerInfo2), + constructMultiaddrStr(peerInfo3)]) + + check: + postRes.status == 200 + + # Verify that newly connected peers are being managed + let getRes = await client.getPeers() + + check: + getRes.status == 200 + $getRes.contentType == $MIMETYPE_JSON + getRes.data.len() == 2 + # Check peer 2 + getRes.data.anyIt(it.protocols.find(WakuRelayCodec) >= 0 and + it.multiaddr == constructMultiaddrStr(peerInfo2)) + # Check peer 3 + getRes.data.anyIt(it.protocols.find(WakuRelayCodec) >= 0 and + it.multiaddr == constructMultiaddrStr(peerInfo3)) + + asyncTest "Set wrong peer": + let nonExistentPeer = "/ip4/0.0.0.0/tcp/10000/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D" + let postRes = await client.postPeers(@[nonExistentPeer]) + + check: + postRes.status == 400 + $postRes.contentType == $MIMETYPE_TEXT + postRes.data == "Failed to connect to peer at index: 0 - " & nonExistentPeer + + # Verify that newly connected peers are being managed + let getRes = await client.getPeers() + + check: + getRes.status == 200 + $getRes.contentType == $MIMETYPE_JSON + getRes.data.len() == 0 diff --git a/waku/waku_api/jsonrpc/admin/handlers.nim b/waku/waku_api/jsonrpc/admin/handlers.nim index c89fb05de0..599a725af7 100644 --- a/waku/waku_api/jsonrpc/admin/handlers.nim +++ b/waku/waku_api/jsonrpc/admin/handlers.nim @@ -14,30 +14,13 @@ import ../../../waku_relay, ../../../waku_node, ../../../node/peer_manager, + ../../../waku_core, ./types logScope: topics = "waku node jsonrpc admin_api" - - -proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string = - # Constructs a multiaddress with both wire address and p2p identity - $wireaddr & "/p2p/" & $peerId - -proc constructMultiaddrStr*(peerInfo: PeerInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity - if peerInfo.listenAddrs.len == 0: - return "" - constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId) - -proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity - if remotePeerInfo.addrs.len == 0: - return "" - constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId) - proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool: diff --git a/waku/waku_api/rest/admin/client.nim b/waku/waku_api/rest/admin/client.nim new file mode 100644 index 0000000000..9980cadae8 --- /dev/null +++ b/waku/waku_api/rest/admin/client.nim @@ -0,0 +1,62 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client], + stew/byteutils + +import + ../serdes, + ../responses, + ./types + +export types + + +logScope: + topics = "waku node rest admin api" + +proc decodeBytes*(t: typedesc[seq[WakuPeer]], data: openArray[byte], + contentType: Opt[ContentTypeData]): RestResult[seq[WakuPeer]] = + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported response contentType value", contentType = contentType + return err("Unsupported response contentType") + + let decoded = decodeFromJsonBytes(seq[WakuPeer], data).valueOr: + return err("Invalid response from server, could not decode.") + + return ok(decoded) + +proc decodeBytes*(t: typedesc[string], value: openArray[byte], + contentType: Opt[ContentTypeData]): RestResult[string] = + if MediaType.init($contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +proc encodeBytes*(value: seq[string], + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc getPeers*(): + RestResponse[seq[WakuPeer]] + {.rest, endpoint: "/admin/v1/peers", meth: HttpMethod.MethodGet.} + +proc postPeers*(body: seq[string]): + RestResponse[string] + {.rest, endpoint: "/admin/v1/peers", meth: HttpMethod.MethodPost.} diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim new file mode 100644 index 0000000000..ec295fe3ae --- /dev/null +++ b/waku/waku_api/rest/admin/handlers.nim @@ -0,0 +1,113 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/strformat, + std/sequtils, + stew/byteutils, + chronicles, + json_serialization, + presto/route, + libp2p/[peerinfo, switch] + +import + ../../../waku_core, + ../../../waku_store, + ../../../waku_filter, + ../../../waku_relay, + ../../../waku_node, + ../../../node/peer_manager, + ../responses, + ../serdes, + ./types + +export types + +logScope: + topics = "waku node rest admin api" + +const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" + +type PeerProtocolTuple = tuple[multiaddr: string, protocol: string, connected: bool] + +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) + +proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) = + for peer in peersTup: + peers.add(peer.multiaddr, peer.protocol, peer.connected) + + +proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = + router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do () -> RestApiResponse: + var peers: WakuPeers = @[] + + if not node.wakuRelay.isNil(): + # Map managed peers to WakuPeers and add to return list + let relayPeers = node.peerManager + .peerStore.peers(WakuRelayCodec) + .mapIt(( + multiaddr: constructMultiaddrStr(it), + protocol: WakuRelayCodec, + connected: it.connectedness == Connectedness.Connected) + ) + tuplesToWakuPeers(peers, relayPeers) + + if not node.wakuFilterLegacy.isNil(): + # Map WakuFilter peers to WakuPeers and add to return list + let filterPeers = node.peerManager.peerStore.peers(WakuLegacyFilterCodec) + .mapIt((multiaddr: constructMultiaddrStr(it), + protocol: WakuLegacyFilterCodec, + connected: it.connectedness == Connectedness.Connected)) + tuplesToWakuPeers(peers, filterPeers) + + if not node.wakuStore.isNil(): + # Map WakuStore peers to WakuPeers and add to return list + let storePeers = node.peerManager.peerStore + .peers(WakuStoreCodec) + .mapIt((multiaddr: constructMultiaddrStr(it), + protocol: WakuStoreCodec, + connected: it.connectedness == Connectedness.Connected)) + tuplesToWakuPeers(peers, storePeers) + + let resp = RestApiResponse.jsonResponse(peers, status=Http200) + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}")) + + return resp.get() + +proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) = + router.api(MethodPost, ROUTE_ADMIN_V1_PEERS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + + let peers: seq[string] = decodeRequestBody[seq[string]](contentBody).valueOr: + return RestApiResponse.badRequest(fmt("Failed to decode request: {error}")) + + for i, peer in peers: + let peerInfo = parsePeerInfo(peer).valueOr: + return RestApiResponse.badRequest(fmt("Couldn't parse remote peer info: {error}")) + + if not (await node.peerManager.connectRelay(peerInfo, source="rest")): + return RestApiResponse.badRequest(fmt("Failed to connect to peer at index: {i} - {peer}")) + + return RestApiResponse.ok() + +proc installAdminApiHandlers*(router: var RestRouter, node: WakuNode) = + installAdminV1GetPeersHandler(router, node) + installAdminV1PostPeersHandler(router, node) diff --git a/waku/waku_api/rest/admin/openapi.yaml b/waku/waku_api/rest/admin/openapi.yaml new file mode 100644 index 0000000000..2ce64e46c6 --- /dev/null +++ b/waku/waku_api/rest/admin/openapi.yaml @@ -0,0 +1,74 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: admin + description: Admin REST API for WakuV2 node + +paths: + /admin/v1/peers: + get: + summary: Get connected peers info + description: Retrieve information about connected peers. + operationId: getPeerInfo + tags: + - admin + responses: + '200': + description: Information about a Waku v2 node. + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/WakuPeer' + '5XX': + description: Unexpected error. + post: + summary: Adds new peer(s) to connect with + description: Adds new peer(s) to connect with. + operationId: postPeerInfo + tags: + - admin + requestBody: + content: + application/json: + schema: + type: array + items: + type: string + responses: + '200': + description: Ok + '400': + description: Cannot connect to one or more peers. + '5XX': + description: Unexpected error. + +components: + schemas: + WakuPeer: + type: object + required: + - multiaddr + - protocols + properties: + multiaddr: + type: string + protocols: + type: array + items: + type: object + required: + - protocol + - connected + properties: + protocol: + type: string + connected: + type: boolean diff --git a/waku/waku_api/rest/admin/types.nim b/waku/waku_api/rest/admin/types.nim new file mode 100644 index 0000000000..ab6b8f22c0 --- /dev/null +++ b/waku/waku_api/rest/admin/types.nim @@ -0,0 +1,132 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + chronicles, + json_serialization, + json_serialization/std/options, + json_serialization/lexer +import + ../serdes + +#### Types + +type + ProtocolState* = object + protocol*: string + connected*: bool + +type + WakuPeer* = object + multiaddr*: string + protocols*: seq[ProtocolState] + +type WakuPeers* = seq[WakuPeer] + +#### Serialization and deserialization + +proc writeValue*(writer: var JsonWriter[RestJson], value: ProtocolState) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("protocol", value.protocol) + writer.writeField("connected", value.connected) + writer.endRecord() + +proc writeValue*(writer: var JsonWriter[RestJson], value: WakuPeer) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("multiaddr", value.multiaddr) + writer.writeField("protocols", value.protocols) + writer.endRecord() + +proc readValue*(reader: var JsonReader[RestJson], value: var ProtocolState) + {.gcsafe, raises: [SerializationError, IOError].} = + var + protocol: Option[string] + connected: Option[bool] + + for fieldName in readObjectFields(reader): + case fieldName + of "protocol": + if protocol.isSome(): + reader.raiseUnexpectedField("Multiple `protocol` fields found", "ProtocolState") + protocol = some(reader.readValue(string)) + of "connected": + if connected.isSome(): + reader.raiseUnexpectedField("Multiple `connected` fields found", "ProtocolState") + connected = some(reader.readValue(bool)) + else: + unrecognizedFieldWarning() + + if connected.isNone(): + reader.raiseUnexpectedValue("Field `connected` is missing") + + if protocol.isNone(): + reader.raiseUnexpectedValue("Field `protocol` is missing") + + value = ProtocolState( + protocol: protocol.get(), + connected: connected.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var WakuPeer) + {.gcsafe, raises: [SerializationError, IOError].} = + var + multiaddr: Option[string] + protocols: Option[seq[ProtocolState]] + + for fieldName in readObjectFields(reader): + case fieldName + of "multiaddr": + if multiaddr.isSome(): + reader.raiseUnexpectedField("Multiple `multiaddr` fields found", "WakuPeer") + multiaddr = some(reader.readValue(string)) + of "protocols": + if protocols.isSome(): + reader.raiseUnexpectedField("Multiple `protocols` fields found", "WakuPeer") + protocols = some(reader.readValue(seq[ProtocolState])) + else: + unrecognizedFieldWarning() + + if multiaddr.isNone(): + reader.raiseUnexpectedValue("Field `multiaddr` is missing") + + if protocols.isNone(): + reader.raiseUnexpectedValue("Field `protocols` are missing") + + value = WakuPeer( + multiaddr: multiaddr.get(), + protocols: protocols.get() + ) + +## Utility for populating WakuPeers and ProtocolState +func `==`*(a, b: ProtocolState): bool {.inline.} = + return a.protocol == b.protocol + +func `==`*(a: ProtocolState, b: string): bool {.inline.} = + return a.protocol == b + +func `==`*(a, b: WakuPeer): bool {.inline.} = + return a.multiaddr == b.multiaddr + +proc add*(peers: var WakuPeers, multiaddr: string, protocol: string, connected: bool) = + var + peer: WakuPeer = WakuPeer( + multiaddr: multiaddr, + protocols: @[ProtocolState( + protocol: protocol, + connected: connected + )] + ) + let idx = peers.find(peer) + + if idx < 0: + peers.add(peer) + else: + peers[idx].protocols.add(ProtocolState( + protocol: protocol, + connected: connected + )) + diff --git a/waku/waku_core.nim b/waku/waku_core.nim index ac68e4df0d..4491047e3a 100644 --- a/waku/waku_core.nim +++ b/waku/waku_core.nim @@ -3,11 +3,13 @@ import ./waku_core/time, ./waku_core/message, ./waku_core/peers, - ./waku_core/subscription + ./waku_core/subscription, + ./waku_core/multiaddrstr export topics, time, message, peers, - subscription + subscription, + multiaddrstr diff --git a/waku/waku_core/multiaddrstr.nim b/waku/waku_core/multiaddrstr.nim new file mode 100644 index 0000000000..0d568519cc --- /dev/null +++ b/waku/waku_core/multiaddrstr.nim @@ -0,0 +1,27 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + libp2p/[peerinfo, switch] + +import + ./peers + +proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string = + # Constructs a multiaddress with both wire address and p2p identity + return $wireaddr & "/p2p/" & $peerId + +proc constructMultiaddrStr*(peerInfo: PeerInfo): string = + # Constructs a multiaddress with both location (wire) address and p2p identity + if peerInfo.listenAddrs.len == 0: + return "" + return constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId) + +proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = + # Constructs a multiaddress with both location (wire) address and p2p identity + if remotePeerInfo.addrs.len == 0: + return "" + return constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId) +