From 0b40fb065b227a099635ea660d067bf601a7e9b7 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 27 Nov 2023 09:04:49 +0100 Subject: [PATCH 1/5] first step to start supporting store in cbindings --- examples/cbindings/waku_example.c | 7 + library/callback.nim | 5 + library/libwaku.h | 6 + library/libwaku.nim | 47 +++++- .../requests/protocols/store_request.nim | 144 ++++++++++++++++++ .../waku_thread_request.nim | 6 +- library/waku_thread/waku_thread.nim | 1 + 7 files changed, 209 insertions(+), 7 deletions(-) create mode 100644 library/callback.nim create mode 100644 library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index ec1b8f9d32..2c84d5c692 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -288,6 +288,13 @@ int main(int argc, char** argv) { "/waku/2/default-waku/proto", event_handler, userData) ); + + WAKU_CALL ( waku_store_config(&ctx, + "postgres://postgres:test123@localhost:5432/postgres", + "time:6000000", + event_handler, + userData) ); + show_main_menu(); while(1) { handle_user_input(); diff --git a/library/callback.nim b/library/callback.nim new file mode 100644 index 0000000000..79cb0c0051 --- /dev/null +++ b/library/callback.nim @@ -0,0 +1,5 @@ + +type + WakuCallBack* = proc(callerRet: cint, + msg: ptr cchar, + len: csize_t) {.cdecl, gcsafe.} diff --git a/library/libwaku.h b/library/libwaku.h index bbd7f35155..7424932804 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -83,6 +83,12 @@ int waku_connect(void* ctx, WakuCallBack callback, void* userData); +int waku_store_config(void* ctx, + const char* peerMultiAddr, + unsigned int timeoutMs, + WakuCallBack callback, + void* userData); + #ifdef __cplusplus } #endif diff --git a/library/libwaku.nim b/library/libwaku.nim index 9f6449d535..26483f30bd 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -18,8 +18,10 @@ import ./waku_thread/inter_thread_communication/requests/node_lifecycle_request, ./waku_thread/inter_thread_communication/requests/peer_manager_request, ./waku_thread/inter_thread_communication/requests/protocols/relay_request, + ./waku_thread/inter_thread_communication/requests/protocols/store_request, ./waku_thread/inter_thread_communication/waku_thread_request, - ./alloc + ./alloc, + ./callback ################################################################################ ### Wrapper around the waku node @@ -32,11 +34,6 @@ const RET_OK: cint = 0 const RET_ERR: cint = 1 const RET_MISSING_CALLBACK: cint = 2 -type - WakuCallBack* = proc(callerRet: cint, - msg: ptr cchar, - len: csize_t) {.cdecl, gcsafe.} - ### End of exported types ################################################################################ @@ -348,5 +345,43 @@ proc waku_connect(ctx: ptr ptr Context, return RET_OK +proc waku_store_config(ctx: ptr ptr Context, + dbUrl: cstring, + retentionPolicy: cstring, + callback: WakuCallBack, + userData: pointer): cint + {.dynlib, exportc.} = + + ctx[][].userData = userData + let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], + RequestType.STORE, + StoreConfigRequest.createShared(StoreConfigState.STORE_ENABLE, + dbUrl, + retentionPolicy)) + if sendReqRes.isErr(): + let msg = $sendReqRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + return RET_ERR + + return RET_OK + +proc waku_store_query(ctx: ptr ptr Context, + queryJson: cstring, + peerId: cstring, + timeoutMs: cint, + callback: WakuCallBack, + userData: pointer): cint + {.dynlib, exportc.} = + + ctx[][].userData = userData + + # if sendReqRes.isErr(): + # let msg = $sendReqRes.error + # callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + # return RET_ERR + + return RET_OK + ### End of exported procs ################################################################################ diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim new file mode 100644 index 0000000000..4c526880fc --- /dev/null +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -0,0 +1,144 @@ + +import + std/[options,sequtils,strutils] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../../../waku/node/waku_node, + ../../../../../waku/waku_archive/driver/builder, + ../../../../../waku/waku_archive/driver, + ../../../../../waku/waku_archive/retention_policy/builder, + ../../../../../waku/waku_archive/retention_policy, + ../../../../alloc, + ../../../../callback + +type + StoreConfigState* = enum + STORE_DISABLE + STORE_ENABLE + +type + StoreReqType* = enum + CONFIG_STORE + REMOTE_QUERY ## to perform a query to another Store node + LOCAL_QUERY ## to retrieve the data from 'self' node + +type + StoreConfigRequest* = object + storeState: StoreConfigState + dbUrl: cstring + retentionPolicy: cstring + +type + StoreQueryRequest* = object + queryJson: cstring + peerAddr: cstring + timeoutMs: cint + storeCallback: WakuCallBack + +type + StoreRequest* = object + operation: StoreReqType + storeReq: pointer + +proc createShared*(T: type StoreRequest, + operation: StoreReqType, + request: pointer): ptr type T = + var ret = createShared(T) + ret[].request = request + return ret + +proc createShared*(T: type StoreConfigRequest, + storeState: StoreConfigState, + dbUrl: cstring, + retentionPolicy: cstring): ptr type T = + var ret = createShared(T) + ret[].storeState = storeState + ret[].dbUrl = dbUrl.alloc() + ret[].retentionPolicy = retentionPolicy.alloc() + return ret + +proc createShared*(T: type StoreQueryRequest, + queryJson: cstring, + peerAddr: cstring, + timeoutMs: cint, + storeCallback: WakuCallBack = nil): ptr type T = + + var ret = createShared(T) + ret[].timeoutMs = timeoutMs + ret[].queryJson = queryJson.alloc() + ret[].peerAddr = peerAddr.alloc() + ret[].storeCallback = storeCallback + return ret + +proc destroyShared(self: ptr StoreConfigRequest) = + deallocShared(self[].dbUrl) + deallocShared(self[].retentionPolicy) + deallocShared(self) + +proc destroyShared(self: ptr StoreQueryRequest) = + deallocShared(self[].queryJson) + deallocShared(self[].peerAddr) + deallocShared(self) + +proc process(request: ptr StoreConfigRequest, + node: ptr WakuNode): Future[Result[string, string]] {.async.} = + + defer: destroyShared(request) + + if request[].storeState == STORE_ENABLE: + ## TODO: the following snippet is duplicated from app.nim + var onErrAction = proc(msg: string) {.gcsafe, closure.} = + ## Action to be taken when an internal error occurs during the node run. + ## e.g. the connection with the database is lost and not recovered. + error "Unrecoverable error occurred", error = msg + + let dbVacuum: bool = false + let dbMigration: bool = false + let maxNumConn: int = 10 ## TODO: get it from user configuration + # Archive setup + let archiveDriverRes = ArchiveDriver.new($request[].dbUrl, + dbVacuum, + dbMigration, + maxNumConn, + onErrAction) + if archiveDriverRes.isErr(): + return err("failed to setup archive driver: " & archiveDriverRes.error) + + let retPolicyRes = RetentionPolicy.new($request[].retentionPolicy) + if retPolicyRes.isErr(): + return err("failed to create retention policy: " & retPolicyRes.error) + + let mountArcRes = node[].mountArchive(archiveDriverRes.get(), + retPolicyRes.get()) + if mountArcRes.isErr(): + return err("failed to mount waku archive protocol: " & mountArcRes.error) + + # Store setup + try: + await node[].mountStore() + except CatchableError: + return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + +proc process(self: ptr StoreQueryRequest, + node: ptr WakuNode): Future[Result[string, string]] {.async.} = + defer: destroyShared(self) + +proc process*(self: ptr StoreRequest, + node: ptr WakuNode): Future[Result[string, string]] {.async.} = + + defer: deallocShared(self) + + case self.operation: + of CONFIG_STORE: + return await cast[ptr StoreConfigRequest](self[].storeReq).process(node) + of REMOTE_QUERY: + return await cast[ptr StoreQueryRequest](self[].storeReq).process(node) + of LOCAL_QUERY: + discard + # cast[ptr StoreQueryRequest](request[].reqContent).process(node) + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 79ba7797d7..a38c09dba3 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -12,13 +12,15 @@ import ../../../waku/node/waku_node, ./requests/node_lifecycle_request, ./requests/peer_manager_request, - ./requests/protocols/relay_request + ./requests/protocols/relay_request, + ./requests/protocols/store_request type RequestType* {.pure.} = enum LIFECYCLE, PEER_MANAGER, RELAY, + STORE, type InterThreadRequest* = object @@ -50,6 +52,8 @@ proc process*(T: type InterThreadRequest, cast[ptr PeerManagementRequest](request[].reqContent).process(node[]) of RELAY: cast[ptr RelayRequest](request[].reqContent).process(node) + of STORE: + cast[ptr StoreRequest](request[].reqContent).process(node) return await retFut diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 807fb1fe72..b8d99ba783 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -49,6 +49,7 @@ proc run(ctx: ptr Context) {.thread.} = ## and attends library user requests (stop, connect_to, etc.) var node: WakuNode + node.mountStoreClient() while running.load == true: ## Trying to get a request from the libwaku main thread From aa1e52db79262f76f3c0d333b3389e8709c60593 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 8 Dec 2023 09:12:06 +0100 Subject: [PATCH 2/5] libwaku: add changes to mount store in self-node --- examples/cbindings/waku_example.c | 44 +++++++--- library/waku_thread/config.nim | 78 ++++++++++++++++- .../requests/node_lifecycle_request.nim | 85 +++++++++++++++++++ 3 files changed, 192 insertions(+), 15 deletions(-) diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index 2c84d5c692..dcb1d74d9e 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -28,6 +28,13 @@ struct ConfigNode { char key[128]; int relay; char peers[2048]; + int store; + char storeNode[2048]; + char storeRetentionPolicy[64]; + char storeDbUrl[256]; + int storeVacuum; + int storeDbMigration; + int storeMaxNumDbConnections; }; // libwaku Context @@ -247,6 +254,14 @@ int main(int argc, char** argv) { cfgNode.port = 60000; cfgNode.relay = 1; + cfgNode.store = 1; + snprintf(cfgNode.storeNode, 2048, ""); + snprintf(cfgNode.storeRetentionPolicy, 64, "time:6000000"); + snprintf(cfgNode.storeDbUrl, 256, "postgres://postgres:test123@localhost:5432/postgres"); + cfgNode.storeVacuum = 0; + cfgNode.storeDbMigration = 0; + cfgNode.storeMaxNumDbConnections = 30; + if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) == ARGP_ERR_UNKNOWN) { show_help_and_exit(); @@ -254,16 +269,24 @@ int main(int argc, char** argv) { ctx = waku_init(event_handler, userData); - char jsonConfig[1024]; - snprintf(jsonConfig, 1024, "{ \ - \"host\": \"%s\", \ - \"port\": %d, \ - \"key\": \"%s\", \ - \"relay\": %s \ + char jsonConfig[2048]; + snprintf(jsonConfig, 2048, "{ \ + \"host\": \"%s\", \ + \"port\": %d, \ + \"key\": \"%s\", \ + \"relay\": %s, \ + \"store\": %s, \ + \"storeDbUrl\": \"%s\", \ + \"storeRetentionPolicy\": \"%s\", \ + \"storeMaxNumDbConnections\": %d \ }", cfgNode.host, cfgNode.port, cfgNode.key, - cfgNode.relay ? "true":"false"); + cfgNode.relay ? "true":"false", + cfgNode.store ? "true":"false", + cfgNode.storeDbUrl, + cfgNode.storeRetentionPolicy, + cfgNode.storeMaxNumDbConnections); WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) ); WAKU_CALL( waku_version(&ctx, print_waku_version, userData) ); @@ -272,7 +295,6 @@ int main(int argc, char** argv) { printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) ); - waku_set_event_callback(event_handler, userData); waku_start(&ctx, event_handler, userData); @@ -289,12 +311,6 @@ int main(int argc, char** argv) { event_handler, userData) ); - WAKU_CALL ( waku_store_config(&ctx, - "postgres://postgres:test123@localhost:5432/postgres", - "time:6000000", - event_handler, - userData) ); - show_main_menu(); while(1) { handle_user_input(); diff --git a/library/waku_thread/config.nim b/library/waku_thread/config.nim index 6309b67717..29ca7b11c5 100644 --- a/library/waku_thread/config.nim +++ b/library/waku_thread/config.nim @@ -91,6 +91,71 @@ proc parseRelay(jsonNode: JsonNode, return true +proc parseStore(jsonNode: JsonNode, + store: var bool, + storeNode: var string, + storeRetentionPolicy: var string, + storeDbUrl: var string, + storeVacuum: var bool, + storeDbMigration: var bool, + storeMaxNumDbConnections: var int, + jsonResp: var JsonEvent): bool = + + if not jsonNode.contains("store"): + ## the store parameter is not required. By default is is disabled + store = false + return true + + if jsonNode["store"].kind != JsonNodeKind.JBool: + jsonResp = JsonErrorEvent.new("The store config param should be a boolean"); + return false + + store = jsonNode["store"].getBool() + + if jsonNode.contains("storeNode"): + if jsonNode["storeNode"].kind != JsonNodeKind.JString: + jsonResp = JsonErrorEvent.new("The storeNode config param should be a string"); + return false + + storeNode = jsonNode["storeNode"].getStr() + + if jsonNode.contains("storeRetentionPolicy"): + if jsonNode["storeRetentionPolicy"].kind != JsonNodeKind.JString: + jsonResp = JsonErrorEvent.new("The storeRetentionPolicy config param should be a string"); + return false + + storeRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr() + + if jsonNode.contains("storeDbUrl"): + if jsonNode["storeDbUrl"].kind != JsonNodeKind.JString: + jsonResp = JsonErrorEvent.new("The storeDbUrl config param should be a string"); + return false + + storeDbUrl = jsonNode["storeDbUrl"].getStr() + + if jsonNode.contains("storeVacuum"): + if jsonNode["storeVacuum"].kind != JsonNodeKind.JBool: + jsonResp = JsonErrorEvent.new("The storeVacuum config param should be a bool"); + return false + + storeVacuum = jsonNode["storeVacuum"].getBool() + + if jsonNode.contains("storeDbMigration"): + if jsonNode["storeDbMigration"].kind != JsonNodeKind.JBool: + jsonResp = JsonErrorEvent.new("The storeDbMigration config param should be a bool"); + return false + + storeDbMigration = jsonNode["storeDbMigration"].getBool() + + if jsonNode.contains("storeMaxNumDbConnections"): + if jsonNode["storeMaxNumDbConnections"].kind != JsonNodeKind.JInt: + jsonResp = JsonErrorEvent.new("The storeMaxNumDbConnections config param should be an int"); + return false + + storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt() + + return true + proc parseTopics(jsonNode: JsonNode, topics: var seq[string]) = if jsonNode.contains("topics"): for topic in jsonNode["topics"].items: @@ -103,6 +168,13 @@ proc parseConfig*(configNodeJson: string, netConfig: var NetConfig, relay: var bool, topics: var seq[string], + store: var bool, + storeNode: var string, + storeRetentionPolicy: var string, + storeDbUrl: var string, + storeVacuum: var bool, + storeDbMigration: var bool, + storeMaxNumDbConnections: var int, jsonResp: var JsonEvent): bool = if configNodeJson.len == 0: @@ -110,7 +182,6 @@ proc parseConfig*(configNodeJson: string, return false var jsonNode: JsonNode - try: jsonNode = parseJson(configNodeJson) except JsonParsingError: @@ -152,6 +223,11 @@ proc parseConfig*(configNodeJson: string, # topics parseTopics(jsonNode, topics) + # store + if not parseStore(jsonNode, store, storeNode, storeRetentionPolicy, storeDbUrl, + storeVacuum, storeDbMigration, storeMaxNumDbConnections, jsonResp): + return false + let wakuFlags = CapabilitiesBitfield.init( lightpush = false, filter = false, diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index cf134ceeb3..bf83ab1a7a 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -3,6 +3,7 @@ import std/options import chronos, + chronicles, stew/results, stew/shims/net import @@ -17,7 +18,12 @@ import ../../../../waku/node/waku_node, ../../../../waku/node/builder, ../../../../waku/node/config, + ../../../../waku/waku_archive/driver/builder, + ../../../../waku/waku_archive/driver, + ../../../../waku/waku_archive/retention_policy/builder, + ../../../../waku/waku_archive/retention_policy, ../../../../waku/waku_relay/protocol, + ../../../../waku/waku_store, ../../../events/[json_error_event,json_message_event,json_base_event], ../../../alloc, ../../config @@ -46,14 +52,77 @@ proc destroyShared(self: ptr NodeLifecycleRequest) = deallocShared(self[].configJson) deallocShared(self) +proc configureStore(node: WakuNode, + storeNode: string, + storeRetentionPolicy: string, + storeDbUrl: string, + storeVacuum: bool, + storeDbMigration: bool, + storeMaxNumDbConnections: int): + Future[Result[void, string]] {.async.} = + ## This snippet is extracted/duplicated from the app.nim file + + var onErrAction = proc(msg: string) {.gcsafe, closure.} = + ## Action to be taken when an internal error occurs during the node run. + ## e.g. the connection with the database is lost and not recovered. + # error "Unrecoverable error occurred", error = msg + ## TODO: use a callback given as a parameter + discard + + # Archive setup + let archiveDriverRes = ArchiveDriver.new(storeDbUrl, + storeVacuum, + storeDbMigration, + storeMaxNumDbConnections, + onErrAction) + if archiveDriverRes.isErr(): + return err("failed to setup archive driver: " & archiveDriverRes.error) + + let retPolicyRes = RetentionPolicy.new(storeRetentionPolicy) + if retPolicyRes.isErr(): + return err("failed to create retention policy: " & retPolicyRes.error) + + let mountArcRes = node.mountArchive(archiveDriverRes.get(), + retPolicyRes.get()) + if mountArcRes.isErr(): + return err("failed to mount waku archive protocol: " & mountArcRes.error) + + # Store setup + try: + await mountStore(node) + except CatchableError: + return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + + mountStoreClient(node) + if storeNode != "": + let storeNodeInfo = parsePeerInfo(storeNode) + if storeNodeInfo.isOk(): + node.peerManager.addServicePeer(storeNodeInfo.value, WakuStoreCodec) + else: + return err("failed to set node waku store peer: " & storeNodeInfo.error) + + return ok() + proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} = var privateKey: PrivateKey var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), Port(60000'u16)).value + + ## relay var relay: bool var topics = @[""] + + ## store + var store: bool + var storeNode: string + var storeRetentionPolicy: string + var storeDbUrl: string + var storeVacuum: bool + var storeDbMigration: bool + var storeMaxNumDbConnections: int + var jsonResp: JsonEvent if not parseConfig($configJson, @@ -61,6 +130,13 @@ proc createNode(configJson: cstring): netConfig, relay, topics, + store, + storeNode, + storeRetentionPolicy, + storeDbUrl, + storeVacuum, + storeDbMigration, + storeMaxNumDbConnections, jsonResp): return err($jsonResp) @@ -113,6 +189,15 @@ proc createNode(configJson: cstring): await newNode.mountRelay() newNode.peerManager.start() + if store: + (await newNode.configureStore(storeNode, + storeRetentionPolicy, + storeDbUrl, + storeVacuum, + storeDbMigration, + storeMaxNumDbConnections)).isOkOr: + return err("error configuring store: " & $error) + return ok(newNode) proc process*(self: ptr NodeLifecycleRequest, From 8e396224042052fefc96664006a1264704d62d23 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 8 Dec 2023 09:12:57 +0100 Subject: [PATCH 3/5] libwaku: remove unnecessary code for store --- library/libwaku.nim | 21 ------ .../requests/protocols/store_request.nim | 69 ------------------- library/waku_thread/waku_thread.nim | 1 - 3 files changed, 91 deletions(-) diff --git a/library/libwaku.nim b/library/libwaku.nim index 26483f30bd..d93c658ab1 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -345,27 +345,6 @@ proc waku_connect(ctx: ptr ptr Context, return RET_OK -proc waku_store_config(ctx: ptr ptr Context, - dbUrl: cstring, - retentionPolicy: cstring, - callback: WakuCallBack, - userData: pointer): cint - {.dynlib, exportc.} = - - ctx[][].userData = userData - let sendReqRes = waku_thread.sendRequestToWakuThread( - ctx[], - RequestType.STORE, - StoreConfigRequest.createShared(StoreConfigState.STORE_ENABLE, - dbUrl, - retentionPolicy)) - if sendReqRes.isErr(): - let msg = $sendReqRes.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR - - return RET_OK - proc waku_store_query(ctx: ptr ptr Context, queryJson: cstring, peerId: cstring, diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index 4c526880fc..2ae76903ed 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -2,7 +2,6 @@ import std/[options,sequtils,strutils] import - chronicles, chronos, stew/results, stew/shims/net @@ -15,23 +14,11 @@ import ../../../../alloc, ../../../../callback -type - StoreConfigState* = enum - STORE_DISABLE - STORE_ENABLE - type StoreReqType* = enum - CONFIG_STORE REMOTE_QUERY ## to perform a query to another Store node LOCAL_QUERY ## to retrieve the data from 'self' node -type - StoreConfigRequest* = object - storeState: StoreConfigState - dbUrl: cstring - retentionPolicy: cstring - type StoreQueryRequest* = object queryJson: cstring @@ -51,16 +38,6 @@ proc createShared*(T: type StoreRequest, ret[].request = request return ret -proc createShared*(T: type StoreConfigRequest, - storeState: StoreConfigState, - dbUrl: cstring, - retentionPolicy: cstring): ptr type T = - var ret = createShared(T) - ret[].storeState = storeState - ret[].dbUrl = dbUrl.alloc() - ret[].retentionPolicy = retentionPolicy.alloc() - return ret - proc createShared*(T: type StoreQueryRequest, queryJson: cstring, peerAddr: cstring, @@ -74,55 +51,11 @@ proc createShared*(T: type StoreQueryRequest, ret[].storeCallback = storeCallback return ret -proc destroyShared(self: ptr StoreConfigRequest) = - deallocShared(self[].dbUrl) - deallocShared(self[].retentionPolicy) - deallocShared(self) - proc destroyShared(self: ptr StoreQueryRequest) = deallocShared(self[].queryJson) deallocShared(self[].peerAddr) deallocShared(self) -proc process(request: ptr StoreConfigRequest, - node: ptr WakuNode): Future[Result[string, string]] {.async.} = - - defer: destroyShared(request) - - if request[].storeState == STORE_ENABLE: - ## TODO: the following snippet is duplicated from app.nim - var onErrAction = proc(msg: string) {.gcsafe, closure.} = - ## Action to be taken when an internal error occurs during the node run. - ## e.g. the connection with the database is lost and not recovered. - error "Unrecoverable error occurred", error = msg - - let dbVacuum: bool = false - let dbMigration: bool = false - let maxNumConn: int = 10 ## TODO: get it from user configuration - # Archive setup - let archiveDriverRes = ArchiveDriver.new($request[].dbUrl, - dbVacuum, - dbMigration, - maxNumConn, - onErrAction) - if archiveDriverRes.isErr(): - return err("failed to setup archive driver: " & archiveDriverRes.error) - - let retPolicyRes = RetentionPolicy.new($request[].retentionPolicy) - if retPolicyRes.isErr(): - return err("failed to create retention policy: " & retPolicyRes.error) - - let mountArcRes = node[].mountArchive(archiveDriverRes.get(), - retPolicyRes.get()) - if mountArcRes.isErr(): - return err("failed to mount waku archive protocol: " & mountArcRes.error) - - # Store setup - try: - await node[].mountStore() - except CatchableError: - return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) - proc process(self: ptr StoreQueryRequest, node: ptr WakuNode): Future[Result[string, string]] {.async.} = defer: destroyShared(self) @@ -133,8 +66,6 @@ proc process*(self: ptr StoreRequest, defer: deallocShared(self) case self.operation: - of CONFIG_STORE: - return await cast[ptr StoreConfigRequest](self[].storeReq).process(node) of REMOTE_QUERY: return await cast[ptr StoreQueryRequest](self[].storeReq).process(node) of LOCAL_QUERY: diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index b8d99ba783..807fb1fe72 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -49,7 +49,6 @@ proc run(ctx: ptr Context) {.thread.} = ## and attends library user requests (stop, connect_to, etc.) var node: WakuNode - node.mountStoreClient() while running.load == true: ## Trying to get a request from the libwaku main thread From a85bf459acadaabd9e95f4db0900f1286298d2ea Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 8 Dec 2023 09:20:38 +0100 Subject: [PATCH 4/5] libwaku.h: clean code --- library/libwaku.h | 6 ------ 1 file changed, 6 deletions(-) diff --git a/library/libwaku.h b/library/libwaku.h index 7424932804..bbd7f35155 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -83,12 +83,6 @@ int waku_connect(void* ctx, WakuCallBack callback, void* userData); -int waku_store_config(void* ctx, - const char* peerMultiAddr, - unsigned int timeoutMs, - WakuCallBack callback, - void* userData); - #ifdef __cplusplus } #endif From c524bf6c099f89706eca08cf18bb77f074ae3e49 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 8 Dec 2023 09:22:44 +0100 Subject: [PATCH 5/5] libwaku.nim: add TODO comment --- library/libwaku.nim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/library/libwaku.nim b/library/libwaku.nim index d93c658ab1..c60c752bda 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -355,6 +355,8 @@ proc waku_store_query(ctx: ptr ptr Context, ctx[][].userData = userData + ## TODO: implement the logic that make the "self" node to act as a Store client + # if sendReqRes.isErr(): # let msg = $sendReqRes.error # callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))