Skip to content

Commit

Permalink
Merge 9268abb into a3cd2a1
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Sep 10, 2024
2 parents a3cd2a1 + 9268abb commit 3c9680e
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 10 deletions.
6 changes: 5 additions & 1 deletion examples/cbindings/waku_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void event_handler(int callerRet, const char* msg, size_t len, void* userData) {
exit(1);
}
else if (callerRet == RET_OK) {
printf("Receiving message %s\n", msg);
printf("Receiving event: %s\n", msg);
}
}

Expand Down Expand Up @@ -326,6 +326,10 @@ int main(int argc, char** argv) {
event_handler,
userData) );

WAKU_CALL( waku_get_peerids_from_peerstore(ctx,
event_handler,
userData) );

show_main_menu();
while(1) {
handle_user_input();
Expand Down
15 changes: 15 additions & 0 deletions library/libwaku.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define __libwaku__

#include <stddef.h>
#include <stdint.h>

// The possible returned values for the functions that return int
#define RET_OK 0
Expand Down Expand Up @@ -114,6 +115,15 @@ int waku_connect(void* ctx,
WakuCallBack callback,
void* userData);

int waku_get_peerids_from_peerstore(void* ctx,
WakuCallBack callback,
void* userData);

int waku_get_peerids_by_protocol(void* ctx,
const char* protocol,
WakuCallBack callback,
void* userData);

int waku_listen_addresses(void* ctx,
WakuCallBack callback,
void* userData);
Expand Down Expand Up @@ -150,6 +160,11 @@ int waku_get_my_enr(void* ctx,
WakuCallBack callback,
void* userData);

int waku_peer_exchange_request(void* ctx,
int numPeers,
WakuCallBack callback,
void* userData);

#ifdef __cplusplus
}
#endif
Expand Down
56 changes: 56 additions & 0 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,46 @@ proc waku_connect(

return RET_OK

proc waku_get_peerids_from_peerstore(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)

let connRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
PeerManagementMsgType.GET_ALL_PEER_IDS
),
)
if connRes.isErr():
let msg = $connRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let msg = $connRes.value
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

proc waku_get_peerids_by_protocol(
ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)

let connRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createGetPeerIdsByProtocolRequest($protocol),
)
if connRes.isErr():
let msg = $connRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let msg = $connRes.value
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

proc waku_store_query(
ctx: ptr WakuContext,
jsonQuery: cstring,
Expand Down Expand Up @@ -658,5 +698,21 @@ proc waku_stop_discv5(

return RET_OK

proc waku_peer_exchange_request(
ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)

let discoveredPeers = waku_thread.sendRequestToWakuThread(
ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers)
).valueOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let msg = $discoveredPeers
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

### End of exported procs
################################################################################
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import
../../../../waku/factory/waku,
../../../../waku/discovery/waku_dnsdisc,
../../../../waku/discovery/waku_discv5,
../../../../waku/waku_peer_exchange,
../../../../waku/waku_core/peers,
../../../../waku/node/waku_node,
../../../alloc

type DiscoveryMsgType* = enum
GET_BOOTSTRAP_NODES
UPDATE_DISCV5_BOOTSTRAP_NODES
START_DISCV5
STOP_DISCV5
PEER_EXCHANGE

type DiscoveryRequest* = object
operation: DiscoveryMsgType
Expand All @@ -24,20 +27,25 @@ type DiscoveryRequest* = object
## used in UPDATE_DISCV5_BOOTSTRAP_NODES
nodes: cstring

## used in PEER_EXCHANGE
numPeers: uint64

proc createShared(
T: type DiscoveryRequest,
op: DiscoveryMsgType,
enrTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
nodes: cstring,
numPeers: uint64,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].enrTreeUrl = enrTreeUrl.alloc()
ret[].nameDnsServer = nameDnsServer.alloc()
ret[].timeoutMs = timeoutMs
ret[].nodes = nodes.alloc()
ret[].numPeers = numPeers
return ret

proc createRetrieveBootstrapNodesRequest*(
Expand All @@ -47,22 +55,28 @@ proc createRetrieveBootstrapNodesRequest*(
nameDnsServer: cstring,
timeoutMs: cint,
): ptr type T =
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "")
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "", 0)

proc createUpdateBootstrapNodesRequest*(
T: type DiscoveryRequest, op: DiscoveryMsgType, nodes: cstring
): ptr type T =
return T.createShared(op, "", "", 0, nodes)
return T.createShared(op, "", "", 0, nodes, 0)

proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(START_DISCV5, "", "", 0, "")
return T.createShared(START_DISCV5, "", "", 0, "", 0)

proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(STOP_DISCV5, "", "", 0, "")
return T.createShared(STOP_DISCV5, "", "", 0, "", 0)

proc createPeerExchangeRequest*(
T: type DiscoveryRequest, numPeers: uint64
): ptr type T =
return T.createShared(PEER_EXCHANGE, "", "", 0, "", numPeers)

proc destroyShared(self: ptr DiscoveryRequest) =
deallocShared(self[].enrTreeUrl)
deallocShared(self[].nameDnsServer)
deallocShared(self[].nodes)
deallocShared(self)

proc retrieveBootstrapNodes(
Expand All @@ -87,6 +101,11 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str
return err("error in updateDiscv5BootstrapNodes: " & $error)
return ok()

proc performPeerExchangeRequestTo(
numPeers: uint64, waku: ptr Waku
): Future[Result[int, string]] {.async.} =
return await waku.node.fetchPeerExchangePeers(numPeers)

proc process*(
self: ptr DiscoveryRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
Expand All @@ -112,6 +131,11 @@ proc process*(
of UPDATE_DISCV5_BOOTSTRAP_NODES:
updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr:
return err($error)

return ok("discovery request processed correctly")
of PEER_EXCHANGE:
let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr:
return err("error calling performPeerExchangeRequestTo: " & $error)
return ok($numValidPeers)

return err("discovery request not handled")
Original file line number Diff line number Diff line change
@@ -1,29 +1,49 @@
import std/[sequtils, strutils]
import chronicles, chronos, results
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc
import
../../../../waku/factory/waku,
../../../../waku/node/waku_node,
../../../alloc,
../../../../waku/node/peer_manager

type PeerManagementMsgType* = enum
type PeerManagementMsgType* {.pure.} = enum
CONNECT_TO
GET_ALL_PEER_IDS
GET_PEER_IDS_BY_PROTOCOL

type PeerManagementRequest* = object
operation: PeerManagementMsgType
peerMultiAddr: cstring
dialTimeout: Duration
protocol: cstring

proc createShared*(
T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr: string,
dialTimeout: Duration,
peerMultiAddr = "",
dialTimeout = chronos.milliseconds(0), ## arbitrary Duration as not all ops needs dialTimeout
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].peerMultiAddr = peerMultiAddr.alloc()
ret[].dialTimeout = dialTimeout
return ret

proc createGetPeerIdsByProtocolRequest*(
T: type PeerManagementRequest, protocol = ""
): ptr type T =
var ret = createShared(T)
ret[].operation = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL
ret[].protocol = protocol.alloc()
return ret

proc destroyShared(self: ptr PeerManagementRequest) =
deallocShared(self[].peerMultiAddr)
if not isNil(self[].peerMultiAddr):
deallocShared(self[].peerMultiAddr)

if not isNil(self[].protocol):
deallocShared(self[].protocol)

deallocShared(self)

proc connectTo(
Expand Down Expand Up @@ -53,5 +73,14 @@ proc process*(
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)
of GET_ALL_PEER_IDS:
## returns a comma-separated string of peerIDs
let peerIDs = waku.node.peerManager.peerStore.peers().mapIt($it.peerId).join(",")
return ok(peerIDs)
of GET_PEER_IDS_BY_PROTOCOL:
## returns a comma-separated string of peerIDs that mount the given protocol
let (inPeers, outPeers) = waku.node.peerManager.connectedPeers($self[].protocol)
let allPeerIDs = inPeers & outPeers
return ok(allPeerIDs.mapIt(it.hex()).join(","))

return ok("")
7 changes: 7 additions & 0 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,13 @@ proc getNumConnectedPeers*(
## Returns the number of connected peers and subscribed to the passed pubsub topic.
## The 'gossipsub' atribute is defined in the GossipSub ref object.

if pubsubTopic == "":
## Return all the connected peers
var numConnPeers = 0
for k, v in w.gossipsub:
numConnPeers.inc(v.len)
return ok(numConnPeers)

if not w.gossipsub.hasKey(pubsubTopic):
return err(
"getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &
Expand Down

0 comments on commit 3c9680e

Please sign in to comment.