Skip to content

Commit

Permalink
chore: Cbindings allow mounting the Store protocol from libwaku (#2276)
Browse files Browse the repository at this point in the history
* libwaku: add changes to mount store in self-node
* libwaku: remove unnecessary code for store
  • Loading branch information
Ivansete-status authored Dec 11, 2023
1 parent 281c13a commit 28142f4
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 16 deletions.
39 changes: 31 additions & 8 deletions examples/cbindings/waku_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ struct ConfigNode {
char key[128];
int relay;
char peers[2048];
int store;
char storeNode[2048];
char storeRetentionPolicy[64];
char storeDbUrl[256];
int storeVacuum;
int storeDbMigration;
int storeMaxNumDbConnections;
};

// libwaku Context
Expand Down Expand Up @@ -247,23 +254,39 @@ int main(int argc, char** argv) {
cfgNode.port = 60000;
cfgNode.relay = 1;

cfgNode.store = 1;
snprintf(cfgNode.storeNode, 2048, "");
snprintf(cfgNode.storeRetentionPolicy, 64, "time:6000000");
snprintf(cfgNode.storeDbUrl, 256, "postgres://postgres:test123@localhost:5432/postgres");
cfgNode.storeVacuum = 0;
cfgNode.storeDbMigration = 0;
cfgNode.storeMaxNumDbConnections = 30;

if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode)
== ARGP_ERR_UNKNOWN) {
show_help_and_exit();
}

ctx = waku_init(event_handler, userData);

char jsonConfig[1024];
snprintf(jsonConfig, 1024, "{ \
\"host\": \"%s\", \
\"port\": %d, \
\"key\": \"%s\", \
\"relay\": %s \
char jsonConfig[2048];
snprintf(jsonConfig, 2048, "{ \
\"host\": \"%s\", \
\"port\": %d, \
\"key\": \"%s\", \
\"relay\": %s, \
\"store\": %s, \
\"storeDbUrl\": \"%s\", \
\"storeRetentionPolicy\": \"%s\", \
\"storeMaxNumDbConnections\": %d \
}", cfgNode.host,
cfgNode.port,
cfgNode.key,
cfgNode.relay ? "true":"false");
cfgNode.relay ? "true":"false",
cfgNode.store ? "true":"false",
cfgNode.storeDbUrl,
cfgNode.storeRetentionPolicy,
cfgNode.storeMaxNumDbConnections);

WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) );
WAKU_CALL( waku_version(&ctx, print_waku_version, userData) );
Expand All @@ -272,7 +295,6 @@ int main(int argc, char** argv) {
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");

WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) );

waku_set_event_callback(event_handler, userData);
waku_start(&ctx, event_handler, userData);

Expand All @@ -288,6 +310,7 @@ int main(int argc, char** argv) {
"/waku/2/default-waku/proto",
event_handler,
userData) );

show_main_menu();
while(1) {
handle_user_input();
Expand Down
5 changes: 5 additions & 0 deletions library/callback.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

type
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe.}
28 changes: 22 additions & 6 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/protocols/store_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc
./alloc,
./callback

################################################################################
### Wrapper around the waku node
Expand All @@ -32,11 +34,6 @@ const RET_OK: cint = 0
const RET_ERR: cint = 1
const RET_MISSING_CALLBACK: cint = 2

type
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe.}

### End of exported types
################################################################################

Expand Down Expand Up @@ -348,5 +345,24 @@ proc waku_connect(ctx: ptr ptr Context,

return RET_OK

proc waku_store_query(ctx: ptr ptr Context,
queryJson: cstring,
peerId: cstring,
timeoutMs: cint,
callback: WakuCallBack,
userData: pointer): cint
{.dynlib, exportc.} =

ctx[][].userData = userData

## TODO: implement the logic that make the "self" node to act as a Store client

# if sendReqRes.isErr():
# let msg = $sendReqRes.error
# callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
# return RET_ERR

return RET_OK

### End of exported procs
################################################################################
78 changes: 77 additions & 1 deletion library/waku_thread/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,71 @@ proc parseRelay(jsonNode: JsonNode,

return true

proc parseStore(jsonNode: JsonNode,
store: var bool,
storeNode: var string,
storeRetentionPolicy: var string,
storeDbUrl: var string,
storeVacuum: var bool,
storeDbMigration: var bool,
storeMaxNumDbConnections: var int,
jsonResp: var JsonEvent): bool =

if not jsonNode.contains("store"):
## the store parameter is not required. By default is is disabled
store = false
return true

if jsonNode["store"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The store config param should be a boolean");
return false

store = jsonNode["store"].getBool()

if jsonNode.contains("storeNode"):
if jsonNode["storeNode"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeNode config param should be a string");
return false

storeNode = jsonNode["storeNode"].getStr()

if jsonNode.contains("storeRetentionPolicy"):
if jsonNode["storeRetentionPolicy"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeRetentionPolicy config param should be a string");
return false

storeRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr()

if jsonNode.contains("storeDbUrl"):
if jsonNode["storeDbUrl"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeDbUrl config param should be a string");
return false

storeDbUrl = jsonNode["storeDbUrl"].getStr()

if jsonNode.contains("storeVacuum"):
if jsonNode["storeVacuum"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The storeVacuum config param should be a bool");
return false

storeVacuum = jsonNode["storeVacuum"].getBool()

if jsonNode.contains("storeDbMigration"):
if jsonNode["storeDbMigration"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The storeDbMigration config param should be a bool");
return false

storeDbMigration = jsonNode["storeDbMigration"].getBool()

if jsonNode.contains("storeMaxNumDbConnections"):
if jsonNode["storeMaxNumDbConnections"].kind != JsonNodeKind.JInt:
jsonResp = JsonErrorEvent.new("The storeMaxNumDbConnections config param should be an int");
return false

storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt()

return true

proc parseTopics(jsonNode: JsonNode, topics: var seq[string]) =
if jsonNode.contains("topics"):
for topic in jsonNode["topics"].items:
Expand All @@ -103,14 +168,20 @@ proc parseConfig*(configNodeJson: string,
netConfig: var NetConfig,
relay: var bool,
topics: var seq[string],
store: var bool,
storeNode: var string,
storeRetentionPolicy: var string,
storeDbUrl: var string,
storeVacuum: var bool,
storeDbMigration: var bool,
storeMaxNumDbConnections: var int,
jsonResp: var JsonEvent): bool =

if configNodeJson.len == 0:
jsonResp = JsonErrorEvent.new("The configNodeJson is empty")
return false

var jsonNode: JsonNode

try:
jsonNode = parseJson(configNodeJson)
except JsonParsingError:
Expand Down Expand Up @@ -152,6 +223,11 @@ proc parseConfig*(configNodeJson: string,
# topics
parseTopics(jsonNode, topics)

# store
if not parseStore(jsonNode, store, storeNode, storeRetentionPolicy, storeDbUrl,
storeVacuum, storeDbMigration, storeMaxNumDbConnections, jsonResp):
return false

let wakuFlags = CapabilitiesBitfield.init(
lightpush = false,
filter = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import
std/options
import
chronos,
chronicles,
stew/results,
stew/shims/net
import
Expand All @@ -17,7 +18,12 @@ import
../../../../waku/node/waku_node,
../../../../waku/node/builder,
../../../../waku/node/config,
../../../../waku/waku_archive/driver/builder,
../../../../waku/waku_archive/driver,
../../../../waku/waku_archive/retention_policy/builder,
../../../../waku/waku_archive/retention_policy,
../../../../waku/waku_relay/protocol,
../../../../waku/waku_store,
../../../events/[json_error_event,json_message_event,json_base_event],
../../../alloc,
../../config
Expand Down Expand Up @@ -46,21 +52,91 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

proc configureStore(node: WakuNode,
storeNode: string,
storeRetentionPolicy: string,
storeDbUrl: string,
storeVacuum: bool,
storeDbMigration: bool,
storeMaxNumDbConnections: int):
Future[Result[void, string]] {.async.} =
## This snippet is extracted/duplicated from the app.nim file

var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
# error "Unrecoverable error occurred", error = msg
## TODO: use a callback given as a parameter
discard

# Archive setup
let archiveDriverRes = ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onErrAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

let retPolicyRes = RetentionPolicy.new(storeRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes = node.mountArchive(archiveDriverRes.get(),
retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

# Store setup
try:
await mountStore(node)
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

mountStoreClient(node)
if storeNode != "":
let storeNodeInfo = parsePeerInfo(storeNode)
if storeNodeInfo.isOk():
node.peerManager.addServicePeer(storeNodeInfo.value, WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNodeInfo.error)

return ok()

proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} =

var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value

## relay
var relay: bool
var topics = @[""]

## store
var store: bool
var storeNode: string
var storeRetentionPolicy: string
var storeDbUrl: string
var storeVacuum: bool
var storeDbMigration: bool
var storeMaxNumDbConnections: int

var jsonResp: JsonEvent

if not parseConfig($configJson,
privateKey,
netConfig,
relay,
topics,
store,
storeNode,
storeRetentionPolicy,
storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
jsonResp):
return err($jsonResp)

Expand Down Expand Up @@ -113,6 +189,15 @@ proc createNode(configJson: cstring):
await newNode.mountRelay()
newNode.peerManager.start()

if store:
(await newNode.configureStore(storeNode,
storeRetentionPolicy,
storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections)).isOkOr:
return err("error configuring store: " & $error)

return ok(newNode)

proc process*(self: ptr NodeLifecycleRequest,
Expand Down
Loading

0 comments on commit 28142f4

Please sign in to comment.