Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Cbindings allow mounting the Store protocol from libwaku #2276

Merged
merged 6 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions examples/cbindings/waku_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ struct ConfigNode {
char key[128];
int relay;
char peers[2048];
int store;
char storeNode[2048];
char storeRetentionPolicy[64];
char storeDbUrl[256];
int storeVacuum;
int storeDbMigration;
int storeMaxNumDbConnections;
};

// libwaku Context
Expand Down Expand Up @@ -247,23 +254,39 @@ int main(int argc, char** argv) {
cfgNode.port = 60000;
cfgNode.relay = 1;

cfgNode.store = 1;
snprintf(cfgNode.storeNode, 2048, "");
snprintf(cfgNode.storeRetentionPolicy, 64, "time:6000000");
snprintf(cfgNode.storeDbUrl, 256, "postgres://postgres:test123@localhost:5432/postgres");
cfgNode.storeVacuum = 0;
cfgNode.storeDbMigration = 0;
cfgNode.storeMaxNumDbConnections = 30;

if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode)
== ARGP_ERR_UNKNOWN) {
show_help_and_exit();
}

ctx = waku_init(event_handler, userData);

char jsonConfig[1024];
snprintf(jsonConfig, 1024, "{ \
\"host\": \"%s\", \
\"port\": %d, \
\"key\": \"%s\", \
\"relay\": %s \
char jsonConfig[2048];
snprintf(jsonConfig, 2048, "{ \
\"host\": \"%s\", \
\"port\": %d, \
\"key\": \"%s\", \
\"relay\": %s, \
\"store\": %s, \
\"storeDbUrl\": \"%s\", \
\"storeRetentionPolicy\": \"%s\", \
\"storeMaxNumDbConnections\": %d \
}", cfgNode.host,
cfgNode.port,
cfgNode.key,
cfgNode.relay ? "true":"false");
cfgNode.relay ? "true":"false",
cfgNode.store ? "true":"false",
cfgNode.storeDbUrl,
cfgNode.storeRetentionPolicy,
cfgNode.storeMaxNumDbConnections);

WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) );
WAKU_CALL( waku_version(&ctx, print_waku_version, userData) );
Expand All @@ -272,7 +295,6 @@ int main(int argc, char** argv) {
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");

WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) );

waku_set_event_callback(event_handler, userData);
waku_start(&ctx, event_handler, userData);

Expand All @@ -288,6 +310,7 @@ int main(int argc, char** argv) {
"/waku/2/default-waku/proto",
event_handler,
userData) );

show_main_menu();
while(1) {
handle_user_input();
Expand Down
5 changes: 5 additions & 0 deletions library/callback.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

type
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe.}
28 changes: 22 additions & 6 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/protocols/store_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc
./alloc,
./callback

################################################################################
### Wrapper around the waku node
Expand All @@ -32,11 +34,6 @@ const RET_OK: cint = 0
const RET_ERR: cint = 1
const RET_MISSING_CALLBACK: cint = 2

type
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe.}

### End of exported types
################################################################################

Expand Down Expand Up @@ -348,5 +345,24 @@ proc waku_connect(ctx: ptr ptr Context,

return RET_OK

proc waku_store_query(ctx: ptr ptr Context,
queryJson: cstring,
peerId: cstring,
timeoutMs: cint,
callback: WakuCallBack,
userData: pointer): cint
{.dynlib, exportc.} =

ctx[][].userData = userData

## TODO: implement the logic that make the "self" node to act as a Store client

# if sendReqRes.isErr():
# let msg = $sendReqRes.error
# callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
# return RET_ERR

return RET_OK

### End of exported procs
################################################################################
78 changes: 77 additions & 1 deletion library/waku_thread/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,71 @@ proc parseRelay(jsonNode: JsonNode,

return true

proc parseStore(jsonNode: JsonNode,
store: var bool,
storeNode: var string,
storeRetentionPolicy: var string,
storeDbUrl: var string,
storeVacuum: var bool,
storeDbMigration: var bool,
storeMaxNumDbConnections: var int,
jsonResp: var JsonEvent): bool =

if not jsonNode.contains("store"):
## the store parameter is not required. By default is is disabled
store = false
return true

if jsonNode["store"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The store config param should be a boolean");
return false

store = jsonNode["store"].getBool()

if jsonNode.contains("storeNode"):
if jsonNode["storeNode"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeNode config param should be a string");
return false

storeNode = jsonNode["storeNode"].getStr()

if jsonNode.contains("storeRetentionPolicy"):
if jsonNode["storeRetentionPolicy"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeRetentionPolicy config param should be a string");
return false

storeRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr()

if jsonNode.contains("storeDbUrl"):
if jsonNode["storeDbUrl"].kind != JsonNodeKind.JString:
jsonResp = JsonErrorEvent.new("The storeDbUrl config param should be a string");
return false

storeDbUrl = jsonNode["storeDbUrl"].getStr()

if jsonNode.contains("storeVacuum"):
if jsonNode["storeVacuum"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The storeVacuum config param should be a bool");
return false

storeVacuum = jsonNode["storeVacuum"].getBool()

if jsonNode.contains("storeDbMigration"):
if jsonNode["storeDbMigration"].kind != JsonNodeKind.JBool:
jsonResp = JsonErrorEvent.new("The storeDbMigration config param should be a bool");
return false

storeDbMigration = jsonNode["storeDbMigration"].getBool()

if jsonNode.contains("storeMaxNumDbConnections"):
if jsonNode["storeMaxNumDbConnections"].kind != JsonNodeKind.JInt:
jsonResp = JsonErrorEvent.new("The storeMaxNumDbConnections config param should be an int");
return false

storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt()

return true

proc parseTopics(jsonNode: JsonNode, topics: var seq[string]) =
if jsonNode.contains("topics"):
for topic in jsonNode["topics"].items:
Expand All @@ -103,14 +168,20 @@ proc parseConfig*(configNodeJson: string,
netConfig: var NetConfig,
relay: var bool,
topics: var seq[string],
store: var bool,
storeNode: var string,
storeRetentionPolicy: var string,
storeDbUrl: var string,
storeVacuum: var bool,
storeDbMigration: var bool,
storeMaxNumDbConnections: var int,
jsonResp: var JsonEvent): bool =

if configNodeJson.len == 0:
jsonResp = JsonErrorEvent.new("The configNodeJson is empty")
return false

var jsonNode: JsonNode

try:
jsonNode = parseJson(configNodeJson)
except JsonParsingError:
Expand Down Expand Up @@ -152,6 +223,11 @@ proc parseConfig*(configNodeJson: string,
# topics
parseTopics(jsonNode, topics)

# store
if not parseStore(jsonNode, store, storeNode, storeRetentionPolicy, storeDbUrl,
storeVacuum, storeDbMigration, storeMaxNumDbConnections, jsonResp):
return false

let wakuFlags = CapabilitiesBitfield.init(
lightpush = false,
filter = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import
std/options
import
chronos,
chronicles,
stew/results,
stew/shims/net
import
Expand All @@ -17,7 +18,12 @@ import
../../../../waku/node/waku_node,
../../../../waku/node/builder,
../../../../waku/node/config,
../../../../waku/waku_archive/driver/builder,
../../../../waku/waku_archive/driver,
../../../../waku/waku_archive/retention_policy/builder,
../../../../waku/waku_archive/retention_policy,
../../../../waku/waku_relay/protocol,
../../../../waku/waku_store,
../../../events/[json_error_event,json_message_event,json_base_event],
../../../alloc,
../../config
Expand Down Expand Up @@ -46,21 +52,91 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

proc configureStore(node: WakuNode,
storeNode: string,
storeRetentionPolicy: string,
storeDbUrl: string,
storeVacuum: bool,
storeDbMigration: bool,
storeMaxNumDbConnections: int):
Future[Result[void, string]] {.async.} =
## This snippet is extracted/duplicated from the app.nim file

var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
# error "Unrecoverable error occurred", error = msg
## TODO: use a callback given as a parameter
discard

# Archive setup
let archiveDriverRes = ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onErrAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

let retPolicyRes = RetentionPolicy.new(storeRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes = node.mountArchive(archiveDriverRes.get(),
retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

# Store setup
try:
await mountStore(node)
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())

mountStoreClient(node)
if storeNode != "":
let storeNodeInfo = parsePeerInfo(storeNode)
if storeNodeInfo.isOk():
node.peerManager.addServicePeer(storeNodeInfo.value, WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNodeInfo.error)

return ok()

proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} =

var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value

## relay
var relay: bool
var topics = @[""]

## store
var store: bool
var storeNode: string
var storeRetentionPolicy: string
var storeDbUrl: string
var storeVacuum: bool
var storeDbMigration: bool
var storeMaxNumDbConnections: int

var jsonResp: JsonEvent

if not parseConfig($configJson,
privateKey,
netConfig,
relay,
topics,
store,
storeNode,
storeRetentionPolicy,
storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
jsonResp):
return err($jsonResp)

Expand Down Expand Up @@ -113,6 +189,15 @@ proc createNode(configJson: cstring):
await newNode.mountRelay()
newNode.peerManager.start()

if store:
(await newNode.configureStore(storeNode,
storeRetentionPolicy,
storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections)).isOkOr:
return err("error configuring store: " & $error)

return ok(newNode)

proc process*(self: ptr NodeLifecycleRequest,
Expand Down
Loading
Loading