diff --git a/Makefile b/Makefile index f39a3394a..70cf930ea 100644 --- a/Makefile +++ b/Makefile @@ -232,6 +232,7 @@ format: $(NPH) *.nim $(NPH) codex/ $(NPH) tests/ + $(NPH) library/ clean-nph: rm -f $(NPH) @@ -242,4 +243,29 @@ print-nph-path: clean: | clean-nph +################ +## C Bindings ## +################ +.PHONY: libcodex + +STATIC ?= 0 + +ifneq ($(strip $(CODEX_LIB_PARAMS)),) +NIM_PARAMS := $(NIM_PARAMS) $(CODEX_LIB_PARAMS) +endif + +libcodex: + $(MAKE) deps + rm -f build/libcodex* + +ifeq ($(STATIC), 1) + echo -e $(BUILD_MSG) "build/$@.a" && \ + $(ENV_SCRIPT) nim libcodexStatic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims +else ifeq ($(detected_OS),Windows) + echo -e $(BUILD_MSG) "build/$@.dll" && \ + $(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="-G \"MSYS Makefiles\" -DCMAKE_BUILD_TYPE=Release" codex.nims +else + echo -e $(BUILD_MSG) "build/$@.so" && \ + $(ENV_SCRIPT) nim libcodexDynamic $(NIM_PARAMS) -d:LeopardCmakeFlags="\"-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Release\"" codex.nims +endif endif # "variables.mk" was not included diff --git a/README.md b/README.md index 2a15051f5..97139beef 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,56 @@ To get acquainted with Codex, consider: The client exposes a REST API that can be used to interact with the clients. Overview of the API can be found on [api.codex.storage](https://api.codex.storage). +## Bindings + +Codex provides a C API that can be wrapped by other languages. The bindings is located in the `library` folder. +Currently, only a Go binding is included. + +### Build the C library + +```bash +make libcodex +``` + +This produces the shared library under `build/`. + +### Run the Go example + +Build the Go example: + +```bash +go build -o codex-go examples/golang/codex.go +``` + +Export the library path: + +```bash +export LD_LIBRARY_PATH=build +``` + +Run the example: + +```bash +./codex-go +``` + +### Static vs Dynamic build + +By default, Codex builds a dynamic library (`libcodex.so`), which you can load at runtime. +If you prefer a static library (`libcodex.a`), set the `STATIC` flag: + +### Limitation + +Callbacks must be fast and non-blocking; otherwise, the working thread will hang and prevent other requests from being processed. + +```bash +# Build dynamic (default) +make libcodex + +# Build static +make STATIC=1 libcodex +``` + ## Contributing and development Feel free to dive in, contributions are welcomed! Open an issue or submit PRs. diff --git a/build.nims b/build.nims index 886603210..f78010526 100644 --- a/build.nims +++ b/build.nims @@ -25,6 +25,27 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = exec(cmd) +proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "dynamic") = + if not dirExists "build": + mkDir "build" + + if `type` == "dynamic": + let lib_name = (when defined(windows): name & ".dll" else: name & ".so") + exec "nim c" & " --out:build/" & lib_name & " --threads:on --app:lib --opt:size --noMain --mm:refc --header --d:metrics " & + "--nimMainPrefix:libcodex -d:noSignalHandler " & + "-d:LeopardExtraCompilerFlags=-fPIC " & + "-d:chronicles_runtime_filtering " & + "-d:chronicles_log_level=TRACE " & + params & " " & srcDir & name & ".nim" + else: + exec "nim c" & " --out:build/" & name & + ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --d:metrics " & + "--nimMainPrefix:libcodex -d:noSignalHandler " & + "-d:LeopardExtraCompilerFlags=-fPIC " & + "-d:chronicles_runtime_filtering " & + "-d:chronicles_log_level=TRACE " & + params & " " & srcDir & name & ".nim" + proc test(name: string, srcDir = "tests/", params = "", lang = "c") = buildBinary name, srcDir, params exec "build/" & name @@ -121,3 +142,23 @@ task showCoverage, "open coverage html": echo " ======== Opening HTML coverage report in browser... ======== " if findExe("open") != "": exec("open coverage/report/index.html") + +task libcodexDynamic, "Generate bindings": + var params = "" + when compiles(commandLineParams): + for param in commandLineParams(): + if param.len > 0 and param.startsWith("-"): + params.add " " & param + + let name = "libcodex" + buildLibrary name, "library/", params, "dynamic" + +task libcodextatic, "Generate bindings": + var params = "" + when compiles(commandLineParams): + for param in commandLineParams(): + if param.len > 0 and param.startsWith("-"): + params.add " " & param + + let name = "libcodex" + buildLibrary name, "library/", params, "static" diff --git a/codex.nim b/codex.nim index 7749bdee2..b3e40608b 100644 --- a/codex.nim +++ b/codex.nim @@ -54,6 +54,16 @@ when isMainModule: , ) config.setupLogging() + + try: + updateLogLevel(config.logLevel) + except ValueError as err: + try: + stderr.write "Invalid value for --log-level. " & err.msg & "\n" + except IOError: + echo "Invalid value for --log-level. " & err.msg + quit QuitFailure + config.setupMetrics() if not (checkAndCreateDataDir((config.dataDir).string)): @@ -94,7 +104,7 @@ when isMainModule: ## Ctrl+C handling proc doShutdown() = - shutdown = server.stop() + shutdown = server.shutdown() state = CodexStatus.Stopping notice "Stopping Codex" diff --git a/codex/clock.nim b/codex/clock.nim index c02e04aa3..5c63d15e9 100644 --- a/codex/clock.nim +++ b/codex/clock.nim @@ -1,3 +1,5 @@ +{.push raises: [].} + import pkg/chronos import pkg/stew/endians2 import pkg/upraises @@ -11,7 +13,9 @@ type method now*(clock: Clock): SecondsSince1970 {.base, gcsafe, upraises: [].} = raiseAssert "not implemented" -method waitUntil*(clock: Clock, time: SecondsSince1970) {.base, async.} = +method waitUntil*( + clock: Clock, time: SecondsSince1970 +) {.base, async: (raises: [CancelledError]).} = raiseAssert "not implemented" method start*(clock: Clock) {.base, async.} = diff --git a/codex/codex.nim b/codex/codex.nim index 813574641..3f0b2e5ef 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -61,6 +61,15 @@ type CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet +func config*(self: CodexServer): CodexConf = + return self.config + +func node*(self: CodexServer): CodexNodeRef = + return self.codexNode + +func repoStore*(self: CodexServer): RepoStore = + return self.repoStore + proc waitForSync(provider: Provider): Future[void] {.async.} = var sleepTime = 1 trace "Checking sync state of Ethereum provider..." @@ -160,8 +169,8 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} = proc start*(s: CodexServer) {.async.} = trace "Starting codex node", config = $s.config - await s.repoStore.start() + s.maintenance.start() await s.codexNode.switch.start() @@ -175,27 +184,49 @@ proc start*(s: CodexServer) {.async.} = await s.bootstrapInteractions() await s.codexNode.start() - s.restServer.start() + + if s.restServer != nil: + s.restServer.start() proc stop*(s: CodexServer) {.async.} = notice "Stopping codex node" - let res = await noCancel allFinishedFailed[void]( + var futures = @[ - s.restServer.stop(), s.codexNode.switch.stop(), s.codexNode.stop(), s.repoStore.stop(), s.maintenance.stop(), ] - ) + + if s.restServer != nil: + futures.add(s.restServer.stop()) + + let res = await noCancel allFinishedFailed[void](futures) if res.failure.len > 0: error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" +proc close*(s: CodexServer) {.async.} = + var futures = @[s.codexNode.close(), s.repoStore.close()] + + let res = await noCancel allFinishedFailed[void](futures) + if not s.taskpool.isNil: - s.taskpool.shutdown() + try: + s.taskpool.shutdown() + except Exception as exc: + error "Failed to stop the taskpool", failures = res.failure.len + raiseAssert("Failure in taskpool shutdown:" & exc.msg) + + if res.failure.len > 0: + error "Failed to close codex node", failures = res.failure.len + raiseAssert "Failed to close codex node" + +proc shutdown*(server: CodexServer) {.async.} = + await server.stop() + await server.close() proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey @@ -295,7 +326,7 @@ proc new*( ) peerStore = PeerCtxStore.new() - pendingBlocks = PendingBlocksManager.new() + pendingBlocks = PendingBlocksManager.new(retries = config.blockRetries) advertiser = Advertiser.new(repoStore, discovery) blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) @@ -320,10 +351,13 @@ proc new*( taskPool = taskpool, ) + var restServer: RestServerRef = nil + + if config.apiBindAddress.isSome: restServer = RestServerRef .new( codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin), - initTAddress(config.apiBindAddress, config.apiPort), + initTAddress(config.apiBindAddress.get(), config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high, ) diff --git a/codex/conf.nim b/codex/conf.nim index 77ef96caa..206a3e042 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -34,6 +34,7 @@ import pkg/libp2p import pkg/ethers import pkg/questionable import pkg/questionable/results +import pkg/stew/base64 import ./codextypes import ./discovery @@ -46,13 +47,14 @@ import ./utils/natutils from ./contracts/config import DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas from ./validationconfig import MaxSlots, ValidationGroups +from ./blockexchange/engine/pendingblocks import DefaultBlockRetries export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig export ValidationGroups, MaxSlots export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, - DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas + DefaultRequestCacheSize, DefaultMaxPriorityFeePerGas, DefaultBlockRetries type ThreadCount* = distinct Natural @@ -202,8 +204,10 @@ type .}: string apiBindAddress* {. - desc: "The REST API bind address", defaultValue: "127.0.0.1", name: "api-bindaddr" - .}: string + desc: "The REST API bind address", + defaultValue: "127.0.0.1".some, + name: "api-bindaddr" + .}: Option[string] apiPort* {. desc: "The REST Api port", @@ -261,6 +265,13 @@ type name: "block-mn" .}: int + blockRetries* {. + desc: "Number of times to retry fetching a block before giving up", + defaultValue: DefaultBlockRetries, + defaultValueDesc: $DefaultBlockRetries, + name: "block-retries" + .}: int + cacheSize* {. desc: "The size of the block cache, 0 disables the cache - " & @@ -474,7 +485,7 @@ func prover*(self: CodexConf): bool = self.persistence and self.persistenceCmd == PersistenceCmd.prover proc getCodexVersion(): string = - let tag = strip(staticExec("git tag")) + let tag = strip(staticExec("git describe --tags --abbrev=0")) if tag.isEmptyOrWhitespace: return "untagged build" return tag @@ -510,55 +521,73 @@ proc parseCmdArg*( if res.isOk: ma = res.get() else: - warn "Invalid MultiAddress", input = input, error = res.error() + fatal "Invalid MultiAddress", input = input, error = res.error() quit QuitFailure except LPError as exc: - warn "Invalid MultiAddress uri", uri = input, error = exc.msg + fatal "Invalid MultiAddress uri", uri = input, error = exc.msg quit QuitFailure ma -proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} = - let count = parseInt(input) - if count != 0 and count < 2: - warn "Invalid number of threads", input = input +proc parse*(T: type ThreadCount, p: string): Result[ThreadCount, string] = + try: + let count = parseInt(p) + if count != 0 and count < 2: + return err("Invalid number of threads: " & p) + return ok(ThreadCount(count)) + except ValueError as e: + return err("Invalid number of threads: " & p & ", error=" & e.msg) + +proc parseCmdArg*(T: type ThreadCount, input: string): T = + let val = ThreadCount.parse(input) + if val.isErr: + fatal "Cannot parse the thread count.", input = input, error = val.error() quit QuitFailure - ThreadCount(count) + return val.get() -proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = +proc parse*(T: type SignedPeerRecord, p: string): Result[SignedPeerRecord, string] = var res: SignedPeerRecord try: - if not res.fromURI(uri): - warn "Invalid SignedPeerRecord uri", uri = uri - quit QuitFailure - except LPError as exc: - warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg - quit QuitFailure - except CatchableError as exc: - warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg + if not res.fromURI(p): + return err("The uri is not a valid SignedPeerRecord: " & p) + return ok(res) + except LPError, Base64Error: + let e = getCurrentException() + return err(e.msg) + +proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = + let res = SignedPeerRecord.parse(uri) + if res.isErr: + fatal "Cannot parse the signed peer.", error = res.error(), input = uri quit QuitFailure - res + return res.get() -func parseCmdArg*(T: type NatConfig, p: string): T {.raises: [ValueError].} = +func parse*(T: type NatConfig, p: string): Result[NatConfig, string] = case p.toLowerAscii of "any": - NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatAny)) of "none": - NatConfig(hasExtIp: false, nat: NatStrategy.NatNone) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatNone)) of "upnp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp)) of "pmp": - NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp) + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp)) else: if p.startsWith("extip:"): try: let ip = parseIpAddress(p[6 ..^ 1]) - NatConfig(hasExtIp: true, extIp: ip) + return ok(NatConfig(hasExtIp: true, extIp: ip)) except ValueError: let error = "Not a valid IP address: " & p[6 ..^ 1] - raise newException(ValueError, error) + return err(error) else: - let error = "Not a valid NAT option: " & p - raise newException(ValueError, error) + return err("Not a valid NAT option: " & p) + +proc parseCmdArg*(T: type NatConfig, p: string): T = + let res = NatConfig.parse(p) + if res.isErr: + fatal "Cannot parse the NAT config.", error = res.error(), input = p + quit QuitFailure + return res.get() proc completeCmdArg*(T: type NatConfig, val: string): seq[string] = return @[] @@ -566,19 +595,25 @@ proc completeCmdArg*(T: type NatConfig, val: string): seq[string] = proc parseCmdArg*(T: type EthAddress, address: string): T = EthAddress.init($address).get() -proc parseCmdArg*(T: type NBytes, val: string): T = +func parse*(T: type NBytes, p: string): Result[NBytes, string] = var num = 0'i64 - let count = parseSize(val, num, alwaysBin = true) + let count = parseSize(p, num, alwaysBin = true) if count == 0: - warn "Invalid number of bytes", nbytes = val + return err("Invalid number of bytes: " & p) + return ok(NBytes(num)) + +proc parseCmdArg*(T: type NBytes, val: string): T = + let res = NBytes.parse(val) + if res.isErr: + fatal "Cannot parse NBytes.", error = res.error(), input = val quit QuitFailure - NBytes(num) + return res.get() proc parseCmdArg*(T: type Duration, val: string): T = var dur: Duration let count = parseDuration(val, dur) if count == 0: - warn "Cannot parse duration", dur = dur + fatal "Cannot parse duration", dur = dur quit QuitFailure dur @@ -595,7 +630,7 @@ proc readValue*(r: var TomlReader, val: var SignedPeerRecord) = try: val = SignedPeerRecord.parseCmdArg(uri) except LPError as err: - warn "Invalid SignedPeerRecord uri", uri = uri, error = err.msg + fatal "Invalid SignedPeerRecord uri", uri = uri, error = err.msg quit QuitFailure proc readValue*(r: var TomlReader, val: var MultiAddress) = @@ -607,7 +642,7 @@ proc readValue*(r: var TomlReader, val: var MultiAddress) = if res.isOk: val = res.get() else: - warn "Invalid MultiAddress", input = input, error = res.error() + fatal "Invalid MultiAddress", input = input, error = res.error() quit QuitFailure proc readValue*( @@ -779,15 +814,6 @@ proc setupLogging*(conf: CodexConf) = else: defaultChroniclesStream.outputs[0].writer = writer - try: - updateLogLevel(conf.logLevel) - except ValueError as err: - try: - stderr.write "Invalid value for --log-level. " & err.msg & "\n" - except IOError: - echo "Invalid value for --log-level. " & err.msg - quit QuitFailure - proc setupMetrics*(config: CodexConf) = if config.metricsEnabled: let metricsAddress = config.metricsAddress diff --git a/codex/contracts/clock.nim b/codex/contracts/clock.nim index b78635399..1d4f57ba5 100644 --- a/codex/contracts/clock.nim +++ b/codex/contracts/clock.nim @@ -1,3 +1,5 @@ +{.push raises: [].} + import std/times import pkg/ethers import pkg/questionable @@ -72,7 +74,9 @@ method now*(clock: OnChainClock): SecondsSince1970 = doAssert clock.started, "clock should be started before calling now()" return toUnix(getTime() + clock.offset) -method waitUntil*(clock: OnChainClock, time: SecondsSince1970) {.async.} = +method waitUntil*( + clock: OnChainClock, time: SecondsSince1970 +) {.async: (raises: [CancelledError]).} = while (let difference = time - clock.now(); difference > 0): clock.newBlock.clear() discard await clock.newBlock.wait().withTimeout(chronos.seconds(difference)) diff --git a/codex/node.nim b/codex/node.nim index e010b0854..a6282d6c5 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -81,6 +81,7 @@ type BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {. gcsafe, async: (raises: [CancelledError]) .} + OnBlockStoredProc = proc(chunk: seq[byte]): void {.gcsafe, raises: [].} func switch*(self: CodexNodeRef): Switch = return self.switch @@ -403,6 +404,7 @@ proc store*( filename: ?string = string.none, mimetype: ?string = string.none, blockSize = DefaultBlockSize, + onBlockStored: OnBlockStoredProc = nil, ): Future[?!Cid] {.async.} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest @@ -432,6 +434,9 @@ proc store*( if err =? (await self.networkStore.putBlock(blk)).errorOption: error "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") + + if not onBlockStored.isNil: + onBlockStored(chunk) except CancelledError as exc: raise exc except CatchableError as exc: @@ -871,6 +876,7 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.clock.isNil: await self.clock.stop() +proc close*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index bea2971c7..c6fbdbc5f 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -428,7 +428,6 @@ proc start*( ): Future[void] {.async: (raises: [CancelledError, CodexError]).} = ## Start repo ## - if self.started: trace "Repo already started" return @@ -450,6 +449,5 @@ proc stop*(self: RepoStore): Future[void] {.async: (raises: []).} = return trace "Stopping repo" - await self.close() self.started = false diff --git a/codex/validation.nim b/codex/validation.nim index 58a0e6b73..d9f8fb5e7 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -80,7 +80,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = proc markProofAsMissing( validation: Validation, slotId: SlotId, period: Period -) {.async.} = +) {.async: (raises: [CancelledError]).} = logScope: currentPeriod = validation.getCurrentPeriod() @@ -91,18 +91,18 @@ proc markProofAsMissing( else: let inDowntime {.used.} = await validation.market.inDowntime(slotId) trace "Proof not missing", checkedPeriod = period, inDowntime - except CancelledError: - raise + except CancelledError as e: + raise e except CatchableError as e: error "Marking proof as missing failed", msg = e.msg -proc markProofsAsMissing(validation: Validation) {.async.} = +proc markProofsAsMissing(validation: Validation) {.async: (raises: [CancelledError]).} = let slots = validation.slots for slotId in slots: let previousPeriod = validation.getCurrentPeriod() - 1 await validation.markProofAsMissing(slotId, previousPeriod) -proc run(validation: Validation) {.async: (raises: []).} = +proc run(validation: Validation) {.async: (raises: [CancelledError]).} = trace "Validation started" try: while true: diff --git a/examples/golang/README.md b/examples/golang/README.md new file mode 100644 index 000000000..30a129328 --- /dev/null +++ b/examples/golang/README.md @@ -0,0 +1,24 @@ + +## Pre-requisite + +libcodex.so is needed to be compiled and present in build folder. + +## Compilation + +From the codex root folder: + +```code +go build -o codex-go examples/golang/codex.go +``` + +## Run +From the codex root folder: + + +```code +export LD_LIBRARY_PATH=build +``` + +```code +./codex-go +``` diff --git a/examples/golang/codex.go b/examples/golang/codex.go new file mode 100644 index 000000000..3384105bf --- /dev/null +++ b/examples/golang/codex.go @@ -0,0 +1,1357 @@ +package main + +/* + #cgo LDFLAGS: -L../../build/ -lcodex + #cgo LDFLAGS: -L../../ -Wl,-rpath,../../ + + #include "../../library/libcodex.h" + #include + #include + #include + + void libcodexNimMain(void); + static void codex_host_init_once(void){ + static int done; + if (!__atomic_exchange_n(&done, 1, __ATOMIC_SEQ_CST)) libcodexNimMain(); + } + + extern void globalEventCallback(int ret, char* msg, size_t len, void* userData); + + typedef struct { + int ret; + char* msg; + size_t len; + uintptr_t h; + } Resp; + + static void* allocResp(uintptr_t h) { + Resp* r = (Resp*)calloc(1, sizeof(Resp)); + r->h = h; + return r; + } + + static void freeResp(void* resp) { + if (resp != NULL) { + free(resp); + } + } + + static int getRet(void* resp) { + if (resp == NULL) { + return 0; + } + Resp* m = (Resp*) resp; + return m->ret; + } + + static char* getMyCharPtr(void* resp) { + if (resp == NULL) { + return NULL; + } + Resp* m = (Resp*) resp; + return m->msg; + } + + static size_t getMyCharLen(void* resp) { + if (resp == NULL) { + return 0; + } + Resp* m = (Resp*) resp; + return m->len; + } + + // resp must be set != NULL in case interest on retrieving data from the callback + void callback(int ret, char* msg, size_t len, void* resp); + + static void* cGoCodexNew(const char* configJson, void* resp) { + void* ret = codex_new(configJson, (CodexCallback) callback, resp); + return ret; + } + + static int cGoCodexVersion(void* codexCtx, void* resp) { + return codex_version(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexRevision(void* codexCtx, void* resp) { + return codex_revision(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexRepo(void* codexCtx, void* resp) { + return codex_repo(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexDebug(void* codexCtx, void* resp) { + return codex_debug(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexSpr(void* codexCtx, void* resp) { + return codex_spr(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexPeerId(void* codexCtx, void* resp) { + return codex_peer_id(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexLogLevel(void* codexCtx, char* logLevel, void* resp) { + return codex_log_level(codexCtx, logLevel, (CodexCallback) callback, resp); + } + + static int cGoCodexConnect(void* codexCtx, char* peerId, const char** peerAddresses, uintptr_t peerAddressesSize, void* resp) { + return codex_connect(codexCtx, peerId, peerAddresses, peerAddressesSize, (CodexCallback) callback, resp); + } + + static int cGoCodexPeerDebug(void* codexCtx, char* peerId, void* resp) { + return codex_peer_debug(codexCtx, peerId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadInit(void* codexCtx, char* filepath, size_t chunkSize, void* resp) { + return codex_upload_init(codexCtx, filepath, chunkSize, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadChunk(void* codexCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) { + return codex_upload_chunk(codexCtx, sessionId, chunk, len, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadFinalize(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_finalize(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadCancel(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_cancel(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexUploadFile(void* codexCtx, char* sessionId, void* resp) { + return codex_upload_file(codexCtx, sessionId, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadInit(void* codexCtx, char* cid, size_t chunkSize, bool local, void* resp) { + return codex_download_init(codexCtx, cid, chunkSize, local, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadChunk(void* codexCtx, char* cid, void* resp) { + return codex_download_chunk(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadStream(void* codexCtx, char* cid, size_t chunkSize, bool local, const char* filepath, void* resp) { + return codex_download_stream(codexCtx, cid, chunkSize, local, filepath, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadCancel(void* codexCtx, char* cid, void* resp) { + return codex_download_cancel(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexDownloadManifest(void* codexCtx, char* cid, void* resp) { + return codex_download_manifest(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexStorageList(void* codexCtx, void* resp) { + return codex_storage_list(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexStorageFetch(void* codexCtx, char* cid, void* resp) { + return codex_storage_fetch(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexStorageSpace(void* codexCtx, void* resp) { + return codex_storage_space(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexStorageDelete(void* codexCtx, char* cid, void* resp) { + return codex_storage_delete(codexCtx, cid, (CodexCallback) callback, resp); + } + + static int cGoCodexStart(void* codexCtx, void* resp) { + return codex_start(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexStop(void* codexCtx, void* resp) { + return codex_stop(codexCtx, (CodexCallback) callback, resp); + } + + static int cGoCodexDestroy(void* codexCtx, void* resp) { + return codex_destroy(codexCtx, (CodexCallback) callback, resp); + } + + static void cGoCodexSetEventCallback(void* codexCtx) { + // The 'globalEventCallback' Go function is shared amongst all possible Codex instances. + + // Given that the 'globalEventCallback' is shared, we pass again the + // codexCtx instance but in this case is needed to pick up the correct method + // that will handle the event. + + // In other words, for every call the libcodex makes to globalEventCallback, + // the 'userData' parameter will bring the context of the node that registered + // that globalEventCallback. + + // This technique is needed because cgo only allows to export Go functions and not methods. + + codex_set_event_callback(codexCtx, (CodexCallback) globalEventCallback, codexCtx); + } + +*/ +import "C" +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "os" + "os/signal" + "path" + "runtime/cgo" + "sync" + "syscall" + "unsafe" +) + +type LogLevel string + +const ( + Trace LogLevel = "TRACE" + Debug LogLevel = "DEBUG" + Info LogLevel = "INFO" + Notice LogLevel = "NOTICE" + Warn LogLevel = "WARN" + Error LogLevel = "ERROR" + Fatal LogLevel = "FATAL" +) + +type LogFormat string + +const ( + LogFormatAuto LogFormat = "auto" + LogFormatColors LogFormat = "colors" + LogFormatNoColors LogFormat = "nocolors" + LogFormatJSON LogFormat = "json" +) + +type RepoKind string + +const ( + FS RepoKind = "fs" + SQLite RepoKind = "sqlite" + LevelDb RepoKind = "leveldb" +) + +type CodexConfig struct { + LogFormat LogFormat `json:"log-format,omitempty"` + MetricsEnabled bool `json:"metrics,omitempty"` + MetricsAddress string `json:"metrics-address,omitempty"` + DataDir string `json:"data-dir,omitempty"` + ListenAddrs []string `json:"listen-addrs,omitempty"` + Nat string `json:"nat,omitempty"` + DiscoveryPort int `json:"disc-port,omitempty"` + NetPrivKeyFile string `json:"net-privkey,omitempty"` + BootstrapNodes []byte `json:"bootstrap-node,omitempty"` + MaxPeers int `json:"max-peers,omitempty"` + NumThreads int `json:"num-threads,omitempty"` + AgentString string `json:"agent-string,omitempty"` + RepoKind RepoKind `json:"repo-kind,omitempty"` + StorageQuota int `json:"storage-quota,omitempty"` + BlockTtl int `json:"block-ttl,omitempty"` + BlockMaintenanceInterval int `json:"block-mi,omitempty"` + BlockMaintenanceNumberOfBlocks int `json:"block-mn,omitempty"` + CacheSize int `json:"cache-size,omitempty"` + LogFile string `json:"log-file,omitempty"` +} + +type RestPeerRecord struct { + PeerId string `json:"peerId"` + SeqNo int `json:"seqNo"` + Addresses []string `json:"addresses,omitempty"` +} + +type RestNode struct { + NodeId string `json:"nodeId"` + PeerId string `json:"peerId"` + Record string `json:"record"` + Address *string `json:"address"` + Seen bool `json:"seen"` +} + +type RestRoutingTable struct { + LocalNode RestNode `json:"localNode"` + Nodes []RestNode `json:"nodes"` +} + +type CodexDebugInfo struct { + ID string `json:"id"` + Addrs []string `json:"addrs"` + Spr string `json:"spr"` + AnnounceAddresses []string `json:"announceAddresses"` + Table RestRoutingTable `json:"table"` +} + +type CodexNode struct { + ctx unsafe.Pointer +} + +const defaultBlockSize = 1024 * 64 + +type OnUploadProgressFunc func(read, total int, percent float64, err error) + +type ChunckSize int + +type CodexUploadOptions struct { + filepath string + chunkSize ChunckSize + onProgress OnUploadProgressFunc +} + +func (c ChunckSize) valOrDefault() int { + if c == 0 { + return defaultBlockSize + } + + return int(c) +} + +func (c ChunckSize) toSizeT() C.size_t { + return C.size_t(c.valOrDefault()) +} + +type CodexDownloadStreamOptions = struct { + filepath string + chunkSize ChunckSize + onProgress OnUploadProgressFunc + writer io.Writer + local bool + datasetSize int + datasetSizeAuto bool +} + +type CodexDownloadInitOptions = struct { + local bool + chunkSize ChunckSize +} + +type bridgeCtx struct { + wg *sync.WaitGroup + h cgo.Handle + resp unsafe.Pointer + result string + err error + + // Callback used for receiving progress updates during upload/download. + // + // For the upload, the bytes parameter indicates the number of bytes uploaded. + // If the chunk size is superior or equal to the blocksize (passed in init function), + // the callback will be called when a block is put in the store. + // Otherwise, it will be called when a chunk is pushed into the stream. + // + // For the download, the bytes is the size of the chunk received, and the chunk + // is the actual chunk of data received. + onProgress func(bytes int, chunk []byte) +} + +type CodexManifest struct { + Cid string + TreeCid string `json:"treeCid"` + DatasetSize int `json:"datasetSize"` + BlockSize int `json:"blockSize"` + Filename string `json:"filename"` + Mimetype string `json:"mimetype"` + Protected bool `json:"protected"` +} + +type CodexManifestWithCid struct { + Cid string `json:"cid"` + Manifest CodexManifest `json:"manifest"` +} + +type CodexSpace struct { + TotalBlocks int `json:"totalBlocks"` + QuotaMaxBytes int64 `json:"quotaMaxBytes"` + QuotaUsedBytes int64 `json:"quotaUsedBytes"` + QuotaReservedBytes int64 `json:"quotaReservedBytes"` +} + +func newBridgeCtx() *bridgeCtx { + bridge := &bridgeCtx{} + bridge.wg = &sync.WaitGroup{} + bridge.wg.Add(1) + bridge.h = cgo.NewHandle(bridge) + bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) + + return bridge +} + +func (b *bridgeCtx) free() { + if b.h > 0 { + b.h.Delete() + b.h = 0 + } + + if b.resp != nil { + C.freeResp(b.resp) + b.resp = nil + } +} + +func (b *bridgeCtx) CallError(name string) error { + return fmt.Errorf("Failed the call to %s. Returned code: %d.", name, C.getRet(b.resp)) +} + +func (b *bridgeCtx) wait() (string, error) { + b.wg.Wait() + + return b.result, b.err +} + +func getReaderSize(r io.Reader) int64 { + switch v := r.(type) { + case *os.File: + stat, err := v.Stat() + if err != nil { + return 0 + } + return stat.Size() + case *bytes.Buffer: + return int64(v.Len()) + default: + return 0 + } +} + +//export callback +func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { + if resp == nil { + return + } + + m := (*C.Resp)(resp) + m.ret = ret + m.msg = msg + m.len = len + + if m.h == 0 { + return + } + + h := cgo.Handle(m.h) + + if h == 0 { + return + } + + if v, ok := h.Value().(*bridgeCtx); ok { + switch ret { + case C.RET_PROGRESS: + if v.onProgress == nil { + return + } + + if msg != nil { + chunk := C.GoBytes(unsafe.Pointer(msg), C.int(len)) + v.onProgress(int(C.int(len)), chunk) + } else { + v.onProgress(int(C.int(len)), nil) + } + case C.RET_OK: + retMsg := C.GoStringN(msg, C.int(len)) + v.result = retMsg + v.err = nil + + if v.wg != nil { + v.wg.Done() + } + case C.RET_ERR: + retMsg := C.GoStringN(msg, C.int(len)) + v.err = errors.New(retMsg) + + if v.wg != nil { + v.wg.Done() + } + } + } +} + +func CodexNew(config CodexConfig) (*CodexNode, error) { + bridge := newBridgeCtx() + defer bridge.free() + + jsonConfig, err := json.Marshal(config) + + if err != nil { + return nil, err + } + + cJsonConfig := C.CString(string(jsonConfig)) + defer C.free(unsafe.Pointer(cJsonConfig)) + + ctx := C.cGoCodexNew(cJsonConfig, bridge.resp) + + if _, err := bridge.wait(); err != nil { + return nil, bridge.err + } + + return &CodexNode{ctx: ctx}, bridge.err +} + +func (self CodexNode) CodexVersion() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexVersion(self.ctx, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexVersion") + } + + return bridge.wait() +} + +func (self CodexNode) CodexRevision() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexRevision(self.ctx, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexRevision") + } + + return bridge.wait() +} + +func (self CodexNode) CodexRepo() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexRepo(self.ctx, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexRepo") + } + + return bridge.wait() +} + +func (self CodexNode) CodexDebug() (CodexDebugInfo, error) { + var info CodexDebugInfo + + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexDebug(self.ctx, bridge.resp) != C.RET_OK { + return info, bridge.CallError("cGoCodexDebug") + } + + value, err := bridge.wait() + if err != nil { + return info, err + } + + err = json.Unmarshal([]byte(value), &info) + + return info, err +} + +func (self CodexNode) CodexSpr() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexSpr(self.ctx, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexSpr") + } + + return bridge.wait() +} + +func (self CodexNode) CodexPeerId() (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexPeerId(self.ctx, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexPeerId") + } + + return bridge.wait() +} + +func (self CodexNode) CodexLogLevel(logLevel LogLevel) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cLogLevel = C.CString(fmt.Sprintf("%s", logLevel)) + defer C.free(unsafe.Pointer(cLogLevel)) + + if C.cGoCodexLogLevel(self.ctx, cLogLevel, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexLogLevel") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexConnect(peerId string, peerAddresses []string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cPeerId = C.CString(peerId) + defer C.free(unsafe.Pointer(cPeerId)) + + if len(peerAddresses) > 0 { + var cAddresses = make([]*C.char, len(peerAddresses)) + for i, addr := range peerAddresses { + cAddresses[i] = C.CString(addr) + defer C.free(unsafe.Pointer(cAddresses[i])) + } + + if C.cGoCodexConnect(self.ctx, cPeerId, &cAddresses[0], C.uintptr_t(len(peerAddresses)), bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexConnect") + } + } else { + if C.cGoCodexConnect(self.ctx, cPeerId, nil, 0, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexConnect") + } + } + + _, err := bridge.wait() + return err +} + +func (self CodexNode) CodexPeerDebug(peerId string) (RestPeerRecord, error) { + var record RestPeerRecord + + bridge := newBridgeCtx() + defer bridge.free() + + var cPeerId = C.CString(peerId) + defer C.free(unsafe.Pointer(cPeerId)) + + if C.cGoCodexPeerDebug(self.ctx, cPeerId, bridge.resp) != C.RET_OK { + return record, bridge.CallError("cGoCodexPeerDebug") + } + + value, err := bridge.wait() + if err != nil { + return record, err + } + + err = json.Unmarshal([]byte(value), &record) + + return record, err +} + +func (self CodexNode) CodexUploadInit(options *CodexUploadOptions) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cFilename = C.CString(options.filepath) + defer C.free(unsafe.Pointer(cFilename)) + + if C.cGoCodexUploadInit(self.ctx, cFilename, options.chunkSize.toSizeT(), bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexUploadInit") + } + + return bridge.wait() +} + +func (self CodexNode) CodexUploadChunk(sessionId string, chunk []byte) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + var cChunkPtr *C.uint8_t + if len(chunk) > 0 { + cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0])) + } + + if C.cGoCodexUploadChunk(self.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexUploadChunk") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexUploadFinalize(sessionId string) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadFinalize(self.ctx, cSessionId, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexUploadFinalize") + } + + return bridge.wait() +} + +func (self CodexNode) CodexUploadCancel(sessionId string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadCancel(self.ctx, cSessionId, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexUploadCancel") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexUploadReader(options CodexUploadOptions, r io.Reader) (string, error) { + sessionId, err := self.CodexUploadInit(&options) + + if err != nil { + return "", err + } + + buf := make([]byte, options.chunkSize.valOrDefault()) + total := 0 + var size int64 + + if options.onProgress != nil { + size = getReaderSize(r) + } + + for { + n, err := r.Read(buf) + + if err == io.EOF { + break + } + + if err != nil { + if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } + + return "", err + } + + if n == 0 { + break + } + + if err := self.CodexUploadChunk(sessionId, buf[:n]); err != nil { + if cancelErr := self.CodexUploadCancel(sessionId); cancelErr != nil { + return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) + } + + return "", err + } + + total += n + if options.onProgress != nil && size > 0 { + percent := float64(total) / float64(size) * 100.0 + // The last block could be a bit over the size due to padding + // on the chunk size. + if percent > 100.0 { + percent = 100.0 + } + options.onProgress(n, total, percent, nil) + } + } + + return self.CodexUploadFinalize(sessionId) +} + +func (self CodexNode) CodexUploadReaderAsync(options CodexUploadOptions, r io.Reader, onDone func(cid string, err error)) { + go func() { + cid, err := self.CodexUploadReader(options, r) + onDone(cid, err) + }() +} + +func (self CodexNode) CodexUploadFile(options CodexUploadOptions) (string, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if options.onProgress != nil { + stat, err := os.Stat(options.filepath) + + if err != nil { + return "", err + } + + size := stat.Size() + total := 0 + + if size > 0 { + bridge.onProgress = func(read int, _ []byte) { + if read == 0 { + return + } + + total += read + + percent := float64(total) / float64(size) * 100.0 + // The last block could be a bit over the size due to padding + // on the chunk size. + if percent > 100.0 { + percent = 100.0 + } + + options.onProgress(read, int(size), percent, nil) + } + } + } + + sessionId, err := self.CodexUploadInit(&options) + if err != nil { + return "", err + } + + var cSessionId = C.CString(sessionId) + defer C.free(unsafe.Pointer(cSessionId)) + + if C.cGoCodexUploadFile(self.ctx, cSessionId, bridge.resp) != C.RET_OK { + return "", bridge.CallError("cGoCodexUploadFile") + } + + return bridge.wait() +} + +func (self CodexNode) CodexUploadFileAsync(options CodexUploadOptions, onDone func(cid string, err error)) { + go func() { + cid, err := self.CodexUploadFile(options) + onDone(cid, err) + }() +} + +func (self CodexNode) CodexDownloadManifest(cid string) (CodexManifest, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadManifest(self.ctx, cCid, bridge.resp) != C.RET_OK { + return CodexManifest{}, bridge.CallError("cGoCodexDownloadManifest") + } + + val, err := bridge.wait() + if err != nil { + return CodexManifest{}, err + } + + manifest := CodexManifest{Cid: cid} + err = json.Unmarshal([]byte(val), &manifest) + if err != nil { + return CodexManifest{}, err + } + + return manifest, nil +} + +func (self CodexNode) CodexDownloadStream(cid string, options CodexDownloadStreamOptions) error { + bridge := newBridgeCtx() + defer bridge.free() + + if options.datasetSizeAuto { + manifest, err := self.CodexDownloadManifest(cid) + + if err != nil { + return err + } + + options.datasetSize = manifest.DatasetSize + } + + total := 0 + bridge.onProgress = func(read int, chunk []byte) { + if read == 0 { + return + } + + if options.writer != nil { + w := options.writer + if _, err := w.Write(chunk); err != nil { + if options.onProgress != nil { + options.onProgress(0, 0, 0.0, err) + } + } + } + + total += read + + if options.onProgress != nil { + var percent = 0.0 + if options.datasetSize > 0 { + percent = float64(total) / float64(options.datasetSize) * 100.0 + } + + options.onProgress(read, total, percent, nil) + } + } + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + var cFilepath = C.CString(options.filepath) + defer C.free(unsafe.Pointer(cFilepath)) + + var cLocal = C.bool(options.local) + + if C.cGoCodexDownloadStream(self.ctx, cCid, options.chunkSize.toSizeT(), cLocal, cFilepath, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexDownloadLocal") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexDownloadInit(cid string, options CodexDownloadInitOptions) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + var cLocal = C.bool(options.local) + + if C.cGoCodexDownloadInit(self.ctx, cCid, options.chunkSize.toSizeT(), cLocal, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexDownloadInit") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexDownloadChunk(cid string) ([]byte, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var bytes []byte + + bridge.onProgress = func(read int, chunk []byte) { + bytes = chunk + } + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadChunk(self.ctx, cCid, bridge.resp) != C.RET_OK { + return nil, bridge.CallError("cGoCodexDownloadChunk") + } + + if _, err := bridge.wait(); err != nil { + return nil, err + } + + return bytes, nil +} + +func (self CodexNode) CodexDownloadCancel(cid string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexDownloadCancel(self.ctx, cCid, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexDownloadCancel") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexStorageList() ([]CodexManifest, error) { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexStorageList(self.ctx, bridge.resp) != C.RET_OK { + return nil, bridge.CallError("cGoCodexStorageList") + } + value, err := bridge.wait() + if err != nil { + return nil, err + } + + var items []CodexManifestWithCid + err = json.Unmarshal([]byte(value), &items) + if err != nil { + return nil, err + } + + var list []CodexManifest + for _, item := range items { + item.Manifest.Cid = item.Cid + list = append(list, item.Manifest) + } + + return list, err +} + +func (self CodexNode) CodexStorageFetch(cid string) (CodexManifest, error) { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexStorageFetch(self.ctx, cCid, bridge.resp) != C.RET_OK { + return CodexManifest{}, bridge.CallError("cGoCodexStorageFetch") + } + + value, err := bridge.wait() + if err != nil { + return CodexManifest{}, err + } + + var manifest CodexManifest + err = json.Unmarshal([]byte(value), &manifest) + if err != nil { + return CodexManifest{}, err + } + + manifest.Cid = cid + + return manifest, nil +} + +func (self CodexNode) CodexStorageSpace() (CodexSpace, error) { + var space CodexSpace + + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexStorageSpace(self.ctx, bridge.resp) != C.RET_OK { + return space, bridge.CallError("cGoCodexStorageSpace") + } + + value, err := bridge.wait() + if err != nil { + return space, err + } + + err = json.Unmarshal([]byte(value), &space) + + return space, err +} + +func (self CodexNode) CodexStorageDelete(cid string) error { + bridge := newBridgeCtx() + defer bridge.free() + + var cCid = C.CString(cid) + defer C.free(unsafe.Pointer(cCid)) + + if C.cGoCodexStorageDelete(self.ctx, cCid, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexStorageDelete") + } + + _, err := bridge.wait() + return err +} + +func (self CodexNode) CodexStart() error { + bridge := newBridgeCtx() + defer bridge.free() + + if C.cGoCodexStart(self.ctx, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexStart") + } + + _, err := bridge.wait() + + return err +} + +func (self CodexNode) CodexStartAsync(onDone func(error)) { + go func() { + err := self.CodexStart() + onDone(err) + }() +} + +func (self CodexNode) CodexStop() error { + bridge := newBridgeCtx() + + if C.cGoCodexStop(self.ctx, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexStop") + } + + _, err := bridge.wait() + return err +} + +func (self CodexNode) CodexDestroy() error { + bridge := newBridgeCtx() + + if C.cGoCodexDestroy(self.ctx, bridge.resp) != C.RET_OK { + return bridge.CallError("cGoCodexDestroy") + } + + _, err := bridge.wait() + return err +} + +//export globalEventCallback +func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { + // This is shared among all Golang instances + + self := CodexNode{ctx: userData} + self.MyEventCallback(callerRet, msg, len) +} + +func (self CodexNode) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) { + log.Println("Event received:", C.GoStringN(msg, C.int(len))) +} + +func (self CodexNode) CodexSetEventCallback() { + // Notice that the events for self node are handled by the 'MyEventCallback' method + C.cGoCodexSetEventCallback(self.ctx) +} + +func main() { + config := CodexConfig{} + + node, err := CodexNew(config) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex created.") + + node.CodexSetEventCallback() + + version, err := node.CodexVersion() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex version:", version) + + revision, err := node.CodexRevision() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex revision:", revision) + + repo, err := node.CodexRepo() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex repo:", repo) + + log.Println("Starting Codex...") + + err = node.CodexStart() + + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex started...") + + // for i := 0; i < 150; i++ { + + debug, err := node.CodexDebug() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + pretty, err := json.MarshalIndent(debug, "", " ") + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println(string(pretty)) + + spr, err := node.CodexSpr() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex SPR:", spr) + + peerId, err := node.CodexPeerId() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Peer Id:", peerId) + + err = node.CodexLogLevel(Trace) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Log Level set to TRACE") + + sessionId, err := node.CodexUploadInit(&CodexUploadOptions{filepath: "hello.txt"}) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + log.Println("Codex Upload Init sessionId:", sessionId) + + err = node.CodexUploadChunk(sessionId, []byte("Hello ")) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + err = node.CodexUploadChunk(sessionId, []byte("World!")) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + cid, err := node.CodexUploadFinalize(sessionId) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Upload Finalized, cid:", cid) + + buf := bytes.NewBuffer([]byte("Hello World!")) + cid, err = node.CodexUploadReader(CodexUploadOptions{filepath: "hello.txt", onProgress: func(read, total int, percent float64, err error) { + if err != nil { + log.Fatalf("Error happened during upload: %v\n", err) + } + + log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) + }}, buf) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Upload Finalized from reader, cid:", cid) + + current, err := os.Getwd() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + // Choose a big file to see the progress logs + filepath := path.Join(current, "examples", "golang", "hello.txt") + //filepath := path.Join(current, "examples", "golang", "discord-0.0.109.deb") + + options := CodexUploadOptions{filepath: filepath, onProgress: func(read, total int, percent float64, err error) { + if err != nil { + log.Fatalf("Error happened during upload: %v\n", err) + } + + log.Printf("Uploaded %d bytes, total %d bytes (%.2f%%)\n", read, total, percent) + }} + + cid, err = node.CodexUploadFile(options) + + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Upload File finalized, cid: .", cid) + + f, err := os.Create("hello.loaded.txt") + if err != nil { + log.Fatal(err) + } + defer f.Close() + + if err := node.CodexDownloadStream(cid, + CodexDownloadStreamOptions{writer: f, filepath: "hello.reloaded.txt", + onProgress: func(read, total int, percent float64, err error) { + log.Println("Downloaded", read, "bytes. Total:", total, "bytes (", percent, "%)") + }, + }); err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Download finished.") + + // log.Println("Codex Download Init starting... attempt", i+1) + + if err := node.CodexDownloadInit(cid, CodexDownloadInitOptions{local: true}); err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Download Init finished.") + + // log.Println("Codex Download Chunk starting... attempt", i+1) + + chunk, err := node.CodexDownloadChunk(cid) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Download Chunk finished. Size:", len(chunk)) + + if err := node.CodexDownloadCancel(cid); err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex Download Cancel finished.") + + manifest, err := node.CodexDownloadManifest(cid) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Manifest content:", manifest) + + manifests, err := node.CodexStorageList() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Storage List content:", manifests) + + manifest, err = node.CodexStorageFetch(cid) + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Storage Fetch content:", manifest) + + space, err := node.CodexStorageSpace() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Storage Space content:", space) + + if err := node.CodexStorageDelete(cid); err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Storage Delete finished.") + // } + + // err = node.CodexConnect(peerId, []string{}) + // if err != nil { + // log.Fatal("Error happened:", err.Error()) + // } + + // log.Println("Codex connecting to self...") + + // val, err := node.CodexPeerDebug(peerId) + // if err != nil { + // log.Fatal("Error happened:", err.Error()) + // } + + // log.Println("Codex debugging self...", val) + + // Wait for a SIGINT or SIGTERM signal + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + <-ch + + log.Println("Stopping the node...") + + err = node.CodexStop() + + if err != nil { + log.Fatal("Error happened:", err.Error()) + } + + log.Println("Codex stopped...") + + log.Println("Destroying the node...") + + err = node.CodexDestroy() + if err != nil { + log.Fatal("Error happened:", err.Error()) + } +} diff --git a/examples/golang/hello.txt b/examples/golang/hello.txt new file mode 100644 index 000000000..c57eff55e --- /dev/null +++ b/examples/golang/hello.txt @@ -0,0 +1 @@ +Hello World! \ No newline at end of file diff --git a/library/README.md b/library/README.md new file mode 100644 index 000000000..db6423fed --- /dev/null +++ b/library/README.md @@ -0,0 +1,37 @@ +# Codex Library + +Codex exposes a C binding that serves as a stable contract, making it straightforward to integrate Codex into other languages such as Go. + +The implementation was inspired by [nim-library-template](https://github.com/logos-co/nim-library-template) +and by the [nwaku](https://github.com/waku-org/nwaku/tree/master/library) library. + +The source code contains detailed comments to explain the threading and callback flow. +The diagram below summarizes the lifecycle: context creation, request execution, and shutdown. + +```mermaid +sequenceDiagram + autonumber + actor App as App/User + participant Go as Go Wrapper + participant C as C API (libcodex.h) + participant Ctx as CodexContext + participant Thr as Worker Thread + participant Eng as CodexServer + + App->>Go: Start + Go->>C: codex_start_node + C->>Ctx: enqueue request + C->>Ctx: fire signal + Ctx->>Thr: wake worker + Thr->>Ctx: dequeue request + Thr-->>Ctx: ACK + Ctx-->>C: forward ACK + C-->>Go: RET OK + Go->>App: Unblock + Thr->>Eng: execute (async) + Eng-->>Thr: result ready + Thr-->>Ctx: callback + Ctx-->>C: forward callback + C-->>Go: forward callback + Go-->>App: done +``` \ No newline at end of file diff --git a/library/alloc.nim b/library/alloc.nim new file mode 100644 index 000000000..1a6f118b5 --- /dev/null +++ b/library/alloc.nim @@ -0,0 +1,42 @@ +## Can be shared safely between threads +type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] + +proc alloc*(str: cstring): cstring = + # Byte allocation from the given address. + # There should be the corresponding manual deallocation with deallocShared ! + if str.isNil(): + var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator + ret[0] = '\0' # Set the null terminator + return ret + + let ret = cast[cstring](allocShared(len(str) + 1)) + copyMem(ret, str, len(str) + 1) + return ret + +proc alloc*(str: string): cstring = + ## Byte allocation from the given address. + ## There should be the corresponding manual deallocation with deallocShared ! + var ret = cast[cstring](allocShared(str.len + 1)) + let s = cast[seq[char]](str) + for i in 0 ..< str.len: + ret[i] = s[i] + ret[str.len] = '\0' + return ret + +proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] = + let data = allocShared(sizeof(T) * s.len) + if s.len != 0: + copyMem(data, unsafeAddr s[0], s.len) + return (cast[ptr UncheckedArray[T]](data), s.len) + +proc deallocSharedSeq*[T](s: var SharedSeq[T]) = + deallocShared(s.data) + s.len = 0 + +proc toSeq*[T](s: SharedSeq[T]): seq[T] = + ## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required + ## as req[T] is a GC managed type. + var ret = newSeq[T]() + for i in 0 ..< s.len: + ret.add(s.data[i]) + return ret diff --git a/library/codex_context.nim b/library/codex_context.nim new file mode 100644 index 000000000..f1b6b1a58 --- /dev/null +++ b/library/codex_context.nim @@ -0,0 +1,225 @@ +## This file defines the Codex context and its thread flow: +## 1. Client enqueues a request and signals the Codex thread. +## 2. The Codex thread dequeues the request and sends an ack (reqReceivedSignal). +## 3. The Codex thread executes the request asynchronously. +## 4. On completion, the Codex thread invokes the client callback with the result and userData. + +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import std/[options, locks, atomics] +import chronicles +import chronos +import chronos/threadsync +import taskpools/channels_spsc_single +import ./ffi_types +import ./codex_thread_requests/[codex_thread_request] + +from ../codex/codex import CodexServer + +logScope: + topics = "codexlib" + +type CodexContext* = object + thread: Thread[(ptr CodexContext)] + + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive + # requests concurrently and spare us the need of locks + lock: Lock + + # Channel to send requests to the Codex thread. + # Requests will be popped from this channel. + reqChannel: ChannelSPSCSingle[ptr CodexThreadRequest] + + # To notify the Codex thread that a request is ready + reqSignal: ThreadSignalPtr + + # To notify the client thread that the request was received. + # It is acknowledgment signal (handshake). + reqReceivedSignal: ThreadSignalPtr + + # Custom state attached by the client to a request, + # returned when its callback is invoked + userData*: pointer + + # Function called by the library to notify the client of global events + eventCallback*: pointer + + # Custom state attached by the client to the context, + # returned with every event callback + eventUserData*: pointer + + # Set to false to stop the Codex thread (during codex_destroy) + running: Atomic[bool] + +template callEventCallback(ctx: ptr CodexContext, eventName: string, body: untyped) = + ## Template used to notify the client of global events + ## Example: onConnectionChanged, onProofMissing, etc. + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return + + foreignThreadGc: + try: + let event = body + cast[CodexCallback](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[CodexCallback](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) + +proc sendRequestToCodexThread*( + ctx: ptr CodexContext, + reqType: RequestType, + reqContent: pointer, + callback: CodexCallback, + userData: pointer, + timeout = InfiniteDuration, +): Result[void, string] = + ctx.lock.acquire() + + defer: + ctx.lock.release() + + let req = CodexThreadRequest.createShared(reqType, reqContent, callback, userData) + + # Send the request to the Codex thread + let sentOk = ctx.reqChannel.trySend(req) + if not sentOk: + deallocShared(req) + return err("Failed to send request to the codex thread: " & $req[]) + + # Notify the Codex thread that a request is available + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deallocShared(req) + return err( + "Failed to send request to the codex thread: unable to fireSync: " & + $fireSyncRes.error + ) + + if fireSyncRes.get() == false: + deallocShared(req) + return err("Failed to send request to the codex thread: fireSync timed out.") + + # Wait until the Codex Thread properly received the request + let res = ctx.reqReceivedSignal.waitSync(timeout) + if res.isErr(): + deallocShared(req) + return err( + "Failed to send request to the codex thread: unable to receive reqReceivedSignal signal." + ) + + ## Notice that in case of "ok", the deallocShared(req) is performed by the Codex Thread in the + ## process proc. See the 'codex_thread_request.nim' module for more details. + ok() + +proc runCodex(ctx: ptr CodexContext) {.async: (raises: []).} = + var codex: CodexServer + + while true: + try: + # Wait until a request is available + await ctx.reqSignal.wait() + except Exception as e: + error "Failure in run codex thread while waiting for reqSignal.", error = e.msg + continue + + # If codex_destroy was called, exit the loop + if ctx.running.load == false: + break + + var request: ptr CodexThreadRequest + + # Pop a request from the channel + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "Failure in run codex: unable to receive request in codex thread." + continue + + # yield immediately to the event loop + # with asyncSpawn only, the code will be executed + # synchronously until the first await + asyncSpawn ( + proc() {.async.} = + await sleepAsync(0) + await CodexThreadRequest.process(request, addr codex) + )() + + # Notify the main thread that we picked up the request + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "Failure in run codex: unable to fire back to requester thread.", + error = fireRes.error + +proc run(ctx: ptr CodexContext) {.thread.} = + waitFor runCodex(ctx) + +proc createCodexContext*(): Result[ptr CodexContext, string] = + ## This proc is called from the main thread and it creates + ## the Codex working thread. + + # Allocates a CodexContext in shared memory (for the main thread) + var ctx = createShared(CodexContext, 1) + + # This signal is used by the main side to wake the Codex thread + # when a new request is enqueued. + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return + err("Failed to create a context: unable to create reqSignal ThreadSignalPtr.") + + # Used to let the caller know that the Codex thread has + # acknowledged / picked up a request (like a handshake). + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err( + "Failed to create codex context: unable to create reqReceivedSignal ThreadSignalPtr." + ) + + # Protects shared state inside CodexContext + ctx.lock.initLock() + + # Codex thread will loop until codex_destroy is called + ctx.running.store(true) + + try: + createThread(ctx.thread, run, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err( + "Failed to create codex context: unable to create thread: " & + getCurrentExceptionMsg() + ) + + return ok(ctx) + +proc destroyCodexContext*(ctx: ptr CodexContext): Result[void, string] = + # Signal the Codex thread to stop + ctx.running.store(false) + + # Wake the worker up if it's waiting + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("Failed to destroy codex context: " & $error) + + if not signaledOnTime: + return err( + "Failed to destroy codex context: unable to get signal reqSignal on time in destroyCodexContext." + ) + + # Wait for the thread to finish + joinThread(ctx.thread) + + # Clean up + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + freeShared(ctx) + + return ok() diff --git a/library/codex_thread_requests/codex_thread_request.nim b/library/codex_thread_requests/codex_thread_request.nim new file mode 100644 index 000000000..c2eab9ad0 --- /dev/null +++ b/library/codex_thread_requests/codex_thread_request.nim @@ -0,0 +1,123 @@ +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the Codex Thread. + +import std/json +import results +import chronos +import ../ffi_types +import ./requests/node_lifecycle_request +import ./requests/node_info_request +import ./requests/node_debug_request +import ./requests/node_p2p_request +import ./requests/node_upload_request +import ./requests/node_download_request +import ./requests/node_storage_request + +from ../../codex/codex import CodexServer + +type RequestType* {.pure.} = enum + LIFECYCLE + INFO + DEBUG + P2P + UPLOAD + DOWNLOAD + STORAGE + +type CodexThreadRequest* = object + reqType: RequestType + + # Request payloed + reqContent: pointer + + # Callback to notify the client thread of the result + callback: CodexCallback + + # Custom state attached by the client to the request, + # returned when its callback is invoked. + userData: pointer + +proc createShared*( + T: type CodexThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: CodexCallback, + userData: pointer, +): ptr type T = + var ret = createShared(T) + ret[].reqType = reqType + ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData + return ret + +# NOTE: User callbacks are executed on the working thread. +# They must be fast and non-blocking; otherwise this thread will be blocked +# and no further requests can be processed. +# We can improve this by dispatching the callbacks to a thread pool or +# moving to a MP channel. +# See: https://github.com/codex-storage/nim-codex/pull/1322#discussion_r2340708316 +proc handleRes[T: string | void | seq[byte]]( + res: Result[T, string], request: ptr CodexThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + defer: + deallocShared(request) + + if res.isErr(): + foreignThreadGc: + let msg = $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +proc process*( + T: type CodexThreadRequest, request: ptr CodexThreadRequest, codex: ptr CodexServer +) {.async: (raises: []).} = + ## Processes the request in the Codex thread. + ## Dispatch to the appropriate request handler based on reqType. + let retFut = + case request[].reqType + of LIFECYCLE: + cast[ptr NodeLifecycleRequest](request[].reqContent).process(codex) + of INFO: + cast[ptr NodeInfoRequest](request[].reqContent).process(codex) + of RequestType.DEBUG: + cast[ptr NodeDebugRequest](request[].reqContent).process(codex) + of P2P: + cast[ptr NodeP2PRequest](request[].reqContent).process(codex) + of STORAGE: + cast[ptr NodeStorageRequest](request[].reqContent).process(codex) + of DOWNLOAD: + let onChunk = proc(bytes: seq[byte]) = + if bytes.len > 0: + request[].callback( + RET_PROGRESS, + cast[ptr cchar](unsafeAddr bytes[0]), + cast[csize_t](bytes.len), + request[].userData, + ) + + cast[ptr NodeDownloadRequest](request[].reqContent).process(codex, onChunk) + of UPLOAD: + let onBlockReceived = proc(bytes: int) = + request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData) + + cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onBlockReceived) + + handleRes(await retFut, request) + +proc `$`*(self: CodexThreadRequest): string = + return $self.reqType diff --git a/library/codex_thread_requests/requests/node_debug_request.nim b/library/codex_thread_requests/requests/node_debug_request.nim new file mode 100644 index 000000000..1015bacb0 --- /dev/null +++ b/library/codex_thread_requests/requests/node_debug_request.nim @@ -0,0 +1,102 @@ +{.push raises: [].} + +## This file contains the debug info available with Codex. +## The DEBUG type will return info about the P2P node. +## The PEER type is available only with codex_enable_api_debug_peers flag. +## It will return info about a specific peer if available. + +import std/[options] +import chronos +import chronicles +import codexdht/discv5/spr +import ../../alloc +import ../../../codex/conf +import ../../../codex/rest/json +import ../../../codex/node + +from ../../../codex/codex import CodexServer, node + +logScope: + topics = "codexlib codexlibdebug" + +type NodeDebugMsgType* = enum + DEBUG + PEER + +type NodeDebugRequest* = object + operation: NodeDebugMsgType + peerId: cstring + +proc createShared*( + T: type NodeDebugRequest, op: NodeDebugMsgType, peerId: cstring = "" +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].peerId = peerId.alloc() + return ret + +proc destroyShared(self: ptr NodeDebugRequest) = + deallocShared(self[].peerId) + deallocShared(self) + +proc getDebug( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + let node = codex[].node + let table = RestRoutingTable.init(node.discovery.protocol.routingTable) + + let json = + %*{ + "id": $node.switch.peerInfo.peerId, + "addrs": node.switch.peerInfo.addrs.mapIt($it), + "spr": + if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "", + "announceAddresses": node.discovery.announceAddrs, + "table": table, + } + + return ok($json) + +proc getPeer( + codex: ptr CodexServer, peerId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + when codex_enable_api_debug_peers: + let node = codex[].node + let res = PeerId.init($peerId) + if res.isErr: + return err("Failed to get peer: invalid peer ID " & $peerId & ": " & $res.error()) + + let id = res.get() + + try: + let peerRecord = await node.findPeer(id) + if peerRecord.isNone: + return err("Failed to get peer: peer not found") + + return ok($ %RestPeerRecord.init(peerRecord.get())) + except CancelledError: + return err("Failed to get peer: operation cancelled") + except CatchableError as e: + return err("Failed to get peer: " & e.msg) + else: + return err("Failed to get peer: peer debug API is disabled") + +proc process*( + self: ptr NodeDebugRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeDebugMsgType.DEBUG: + let res = (await getDebug(codex)) + if res.isErr: + error "Failed to get DEBUG.", error = res.error + return err($res.error) + return res + of NodeDebugMsgType.PEER: + let res = (await getPeer(codex, self.peerId)) + if res.isErr: + error "Failed to get PEER.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_download_request.nim b/library/codex_thread_requests/requests/node_download_request.nim new file mode 100644 index 000000000..39883b017 --- /dev/null +++ b/library/codex_thread_requests/requests/node_download_request.nim @@ -0,0 +1,320 @@ +{.push raises: [].} + +## This file contains the download request. +## A session is created for each download identified by the CID, +## allowing to resume, pause and cancel the download (using chunks). +## +## There are two ways to download a file: +## 1. Via chunks: the cid parameter is the CID of the file to download. Steps are: +## - INIT: initializes the download session +## - CHUNK: downloads the next chunk of the file +## - CANCEL: cancels the download session +## 2. Via stream. +## - STREAM: downloads the file in a streaming manner, calling +## the onChunk handler for each chunk and / or writing to a file if filepath is set. +## Cancel is supported in this mode because the worker will be busy +## downloading the file so it cannot pickup another request to cancel the download. + +import std/[options, streams] +import chronos +import chronicles +import libp2p/stream/[lpstream] +import serde/json as serde +import ../../alloc +import ../../../codex/units +import ../../../codex/codextypes + +from ../../../codex/codex import CodexServer, node +from ../../../codex/node import retrieve, fetchManifest +from ../../../codex/rest/json import `%`, RestContent +from libp2p import Cid, init, `$` + +logScope: + topics = "codexlib codexlibdownload" + +type NodeDownloadMsgType* = enum + INIT + CHUNK + STREAM + CANCEL + MANIFEST + +type OnChunkHandler = proc(bytes: seq[byte]): void {.gcsafe, raises: [].} + +type NodeDownloadRequest* = object + operation: NodeDownloadMsgType + cid: cstring + chunkSize: csize_t + local: bool + filepath: cstring + +type + DownloadSessionId* = string + DownloadSessionCount* = int + DownloadSession* = object + stream: LPStream + chunkSize: int + +var downloadSessions {.threadvar.}: Table[DownloadSessionId, DownloadSession] + +proc createShared*( + T: type NodeDownloadRequest, + op: NodeDownloadMsgType, + cid: cstring = "", + chunkSize: csize_t = 0, + local: bool = false, + filepath: cstring = "", +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].cid = cid.alloc() + ret[].chunkSize = chunkSize + ret[].local = local + ret[].filepath = filepath.alloc() + + return ret + +proc destroyShared(self: ptr NodeDownloadRequest) = + deallocShared(self[].cid) + deallocShared(self[].filepath) + deallocShared(self) + +proc init( + codex: ptr CodexServer, cCid: cstring = "", chunkSize: csize_t = 0, local: bool +): Future[Result[string, string]] {.async: (raises: []).} = + ## Init a new session to download the file identified by cid. + ## + ## If the session already exists, do nothing and return ok. + ## Meaning that a cid can only have one active download session. + ## If the chunckSize is 0, the default block size will be used. + ## If local is true, the file will be retrived from the local store. + + if downloadSessions.contains($cCid): + return ok("Download session already exists.") + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to download locally: cannot parse cid: " & $cCid) + + let node = codex[].node + var stream: LPStream + + try: + let res = await node.retrieve(cid.get(), local) + if res.isErr(): + return err("Failed to init the download: " & res.error.msg) + stream = res.get() + except CancelledError: + downloadSessions.del($cCid) + return err("Failed to init the download: download cancelled.") + + let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int + downloadSessions[$cCid] = DownloadSession(stream: stream, chunkSize: blockSize) + + return ok("") + +proc chunk( + codex: ptr CodexServer, cid: cstring = "", onChunk: OnChunkHandler +): Future[Result[string, string]] {.async: (raises: []).} = + ## Download the next chunk of the file identified by cid. + ## The chunk is passed to the onChunk handler. + ## + ## If the stream is at EOF, return ok with empty string. + ## + ## If an error is raised while reading the stream, the session is deleted + ## and an error is returned. + + if not downloadSessions.contains($cid): + return err("Failed to download chunk: no session for cid " & $cid) + + var session: DownloadSession + try: + session = downloadSessions[$cid] + except KeyError: + return err("Failed to download chunk: no session for cid " & $cid) + + let stream = session.stream + if stream.atEof: + return ok("") + + let chunkSize = session.chunkSize + var buf = newSeq[byte](chunkSize) + + try: + let read = await stream.readOnce(addr buf[0], buf.len) + buf.setLen(read) + except LPStreamError as e: + await stream.close() + downloadSessions.del($cid) + return err("Failed to download chunk: " & $e.msg) + except CancelledError: + await stream.close() + downloadSessions.del($cid) + return err("Failed to download chunk: download cancelled.") + + if buf.len <= 0: + return err("Failed to download chunk: no data") + + onChunk(buf) + + return ok("") + +proc streamData( + codex: ptr CodexServer, + cid: Cid, + local: bool, + onChunk: OnChunkHandler, + chunkSize: csize_t, + filepath: cstring, +): Future[Result[string, string]] {.async: (raises: [CancelledError]).} = + let node = codex[].node + + let res = await node.retrieve(cid, local = local) + if res.isErr(): + return err("Failed to retrieve CID: " & res.error.msg) + + let stream = res.get() + + if stream.atEof: + return err("Failed to retrieve CID: empty stream.") + + let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int + var buf = newSeq[byte](blockSize) + var read = 0 + var outputStream: OutputStreamHandle + var filedest: string = $filepath + + try: + if filepath != "": + outputStream = filedest.fileOutput() + + while not stream.atEof: + let read = await stream.readOnce(addr buf[0], buf.len) + buf.setLen(read) + + if buf.len <= 0: + break + + onChunk(buf) + + if outputStream != nil: + outputStream.write(buf) + + if outputStream != nil: + outputStream.close() + except LPStreamError as e: + return err("Failed to stream file: " & $e.msg) + except IOError as e: + return err("Failed to write to file: " & $e.msg) + finally: + await stream.close() + downloadSessions.del($cid) + + return ok("") + +proc stream( + codex: ptr CodexServer, + cCid: cstring, + chunkSize: csize_t, + local: bool, + filepath: cstring, + onChunk: OnChunkHandler, +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + ## Stream the file identified by cid, calling the onChunk handler for each chunk + ## and / or writing to a file if filepath is set. + ## + ## If local is true, the file will be retrived from the local store. + + let node = codex[].node + + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to download locally: cannot parse cid: " & $cCid) + + try: + let res = await codex.streamData(cid.get(), local, onChunk, chunkSize, filepath) + if res.isErr: + return err($res.error) + except CancelledError: + return err("Failed to download locally: download cancelled.") + + return ok("") + +proc cancel( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + ## Cancel the download session identified by cid. + ## This operation is not supported when using the stream mode, + ## because the worker will be busy downloading the file. + + if not downloadSessions.contains($cCid): + return err("Failed to download chunk: no session for cid " & $cCid) + + var session: DownloadSession + try: + session = downloadSessions[$cCid] + except KeyError: + return err("Failed to download chunk: no session for cid " & $cCid) + + let stream = session.stream + await stream.close() + downloadSessions.del($cCid) + + return ok("") + +proc manifest( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to fetch manifest: cannot parse cid: " & $cCid) + + try: + let node = codex[].node + let manifest = await node.fetchManifest(cid.get()) + if manifest.isErr: + return err("Failed to fetch manifest: " & manifest.error.msg) + + return ok(serde.toJson(manifest.get())) + except CancelledError: + return err("Failed to fetch manifest: download cancelled.") + +proc process*( + self: ptr NodeDownloadRequest, codex: ptr CodexServer, onChunk: OnChunkHandler +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeDownloadMsgType.INIT: + let res = (await init(codex, self.cid, self.chunkSize, self.local)) + if res.isErr: + error "Failed to INIT.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.CHUNK: + let res = (await chunk(codex, self.cid, onChunk)) + if res.isErr: + error "Failed to CHUNK.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.STREAM: + let res = ( + await stream(codex, self.cid, self.chunkSize, self.local, self.filepath, onChunk) + ) + if res.isErr: + error "Failed to STREAM.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.CANCEL: + let res = (await cancel(codex, self.cid)) + if res.isErr: + error "Failed to CANCEL.", error = res.error + return err($res.error) + return res + of NodeDownloadMsgType.MANIFEST: + let res = (await manifest(codex, self.cid)) + if res.isErr: + error "Failed to MANIFEST.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_info_request.nim b/library/codex_thread_requests/requests/node_info_request.nim new file mode 100644 index 000000000..2e397fcfe --- /dev/null +++ b/library/codex_thread_requests/requests/node_info_request.nim @@ -0,0 +1,76 @@ +## This file contains the lifecycle request type that will be handled. + +import std/[options] +import chronos +import chronicles +import confutils +import codexdht/discv5/spr +import ../../../codex/conf +import ../../../codex/rest/json +import ../../../codex/node + +from ../../../codex/codex import CodexServer, config, node + +logScope: + topics = "codexlib codexlibinfo" + +type NodeInfoMsgType* = enum + REPO + SPR + PEERID + +type NodeInfoRequest* = object + operation: NodeInfoMsgType + +proc createShared*(T: type NodeInfoRequest, op: NodeInfoMsgType): ptr type T = + var ret = createShared(T) + ret[].operation = op + return ret + +proc destroyShared(self: ptr NodeInfoRequest) = + deallocShared(self) + +proc getRepo( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + return ok($(codex[].config.dataDir)) + +proc getSpr( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + let spr = codex[].node.discovery.dhtRecord + if spr.isNone: + return err("Failed to get SPR: no SPR record found.") + + return ok(spr.get.toURI) + +proc getPeerId( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + return ok($codex[].node.switch.peerInfo.peerId) + +proc process*( + self: ptr NodeInfoRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of REPO: + let res = (await getRepo(codex)) + if res.isErr: + error "Failed to get REPO.", error = res.error + return err($res.error) + return res + of SPR: + let res = (await getSpr(codex)) + if res.isErr: + error "Failed to get SPR.", error = res.error + return err($res.error) + return res + of PEERID: + let res = (await getPeerId(codex)) + if res.isErr: + error "Failed to get PEERID.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_lifecycle_request.nim b/library/codex_thread_requests/requests/node_lifecycle_request.nim new file mode 100644 index 000000000..0e462aa02 --- /dev/null +++ b/library/codex_thread_requests/requests/node_lifecycle_request.nim @@ -0,0 +1,188 @@ +## This file contains the lifecycle request type that will be handled. +## CREATE_NODE: create a new Codex node with the provided config.json. +## START_NODE: start the provided Codex node. +## STOP_NODE: stop the provided Codex node. + +import std/[options, json, strutils, net, os] +import codexdht/discv5/spr +import stew/shims/parseutils +import contractabi/address +import chronos +import chronicles +import results +import confutils +import confutils/std/net +import confutils/defs +import libp2p +import json_serialization +import json_serialization/std/[options, net] +import ../../alloc +import ../../../codex/conf +import ../../../codex/utils +import ../../../codex/utils/[keyutils, fileutils] +import ../../../codex/units + +from ../../../codex/codex import CodexServer, new, start, stop, close + +logScope: + topics = "codexlib codexliblifecycle" + +type NodeLifecycleMsgType* = enum + CREATE_NODE + START_NODE + STOP_NODE + CLOSE_NODE + +proc readValue*[T: InputFile | InputDir | OutPath | OutDir | OutFile]( + r: var JsonReader, val: var T +) = + val = T(r.readValue(string)) + +proc readValue*(r: var JsonReader, val: var MultiAddress) = + val = MultiAddress.init(r.readValue(string)).get() + +proc readValue*(r: var JsonReader, val: var NatConfig) = + let res = NatConfig.parse(r.readValue(string)) + if res.isErr: + raise + newException(SerializationError, "Cannot parse the NAT config: " & res.error()) + val = res.get() + +proc readValue*(r: var JsonReader, val: var SignedPeerRecord) = + let res = SignedPeerRecord.parse(r.readValue(string)) + if res.isErr: + raise + newException(SerializationError, "Cannot parse the signed peer: " & res.error()) + val = res.get() + +proc readValue*(r: var JsonReader, val: var ThreadCount) = + let res = ThreadCount.parse(r.readValue(string)) + if res.isErr: + raise + newException(SerializationError, "Cannot parse the thread count: " & res.error()) + val = res.get() + +proc readValue*(r: var JsonReader, val: var NBytes) = + let res = NBytes.parse(r.readValue(string)) + if res.isErr: + raise newException(SerializationError, "Cannot parse the NBytes: " & res.error()) + val = res.get() + +proc readValue*(r: var JsonReader, val: var Duration) = + var dur: Duration + let input = r.readValue(string) + let count = parseDuration(input, dur) + if count == 0: + raise newException(SerializationError, "Cannot parse the duration: " & input) + val = dur + +proc readValue*(r: var JsonReader, val: var EthAddress) = + val = EthAddress.init(r.readValue(string)).get() + +type NodeLifecycleRequest* = object + operation: NodeLifecycleMsgType + configJson: cstring + +proc createShared*( + T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = "" +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].configJson = configJson.alloc() + return ret + +proc destroyShared(self: ptr NodeLifecycleRequest) = + deallocShared(self[].configJson) + deallocShared(self) + +proc createCodex( + configJson: cstring +): Future[Result[CodexServer, string]] {.async: (raises: []).} = + var conf: CodexConf + + try: + conf = CodexConf.load( + version = codexFullVersion, + envVarsPrefix = "codex", + cmdLine = @[], + secondarySources = proc( + config: CodexConf, sources: auto + ) {.gcsafe, raises: [ConfigurationError].} = + if configJson.len > 0: + sources.addConfigFileContent(Json, $(configJson)) + , + ) + except ConfigurationError as e: + return err("Failed to create codex: unable to load configuration: " & e.msg) + + conf.setupLogging() + conf.setupMetrics() + + if not (checkAndCreateDataDir((conf.dataDir).string)): + # We are unable to access/create data folder or data folder's + # permissions are insecure. + return err( + "Failed to create codex: unable to access/create data folder or data folder's permissions are insecure." + ) + + if not (checkAndCreateDataDir((conf.dataDir / "repo"))): + # We are unable to access/create data folder or data folder's + # permissions are insecure. + return err( + "Failed to create codex: unable to access/create data folder or data folder's permissions are insecure." + ) + + let keyPath = + if isAbsolute(conf.netPrivKeyFile): + conf.netPrivKeyFile + else: + conf.dataDir / conf.netPrivKeyFile + let privateKey = setupKey(keyPath) + if privateKey.isErr: + return err("Failed to create codex: unable to get the private key.") + let pk = privateKey.get() + + conf.apiBindAddress = string.none + + let server = + try: + CodexServer.new(conf, pk) + except Exception as exc: + return err("Failed to create codex: " & exc.msg) + + return ok(server) + +proc process*( + self: ptr NodeLifecycleRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of CREATE_NODE: + codex[] = ( + await createCodex( + self.configJson # , self.appCallbacks + ) + ).valueOr: + error "Failed to CREATE_NODE.", error = error + return err($error) + of START_NODE: + try: + await codex[].start() + except Exception as e: + error "Failed to START_NODE.", error = e.msg + return err(e.msg) + of STOP_NODE: + try: + await codex[].stop() + except Exception as e: + error "Failed to STOP_NODE.", error = e.msg + return err(e.msg) + of CLOSE_NODE: + try: + await codex[].close() + except Exception as e: + error "Failed to STOP_NODE.", error = e.msg + return err(e.msg) + return ok("") diff --git a/library/codex_thread_requests/requests/node_p2p_request.nim b/library/codex_thread_requests/requests/node_p2p_request.nim new file mode 100644 index 000000000..3bdbbf973 --- /dev/null +++ b/library/codex_thread_requests/requests/node_p2p_request.nim @@ -0,0 +1,95 @@ +{.push raises: [].} + +## This file contains the P2p request type that will be handled. +## CONNECT: connect to a peer with the provided peer ID and optional addresses. + +import std/[options] +import chronos +import chronicles +import libp2p +import ../../alloc +import ../../../codex/node + +from ../../../codex/codex import CodexServer, node + +logScope: + topics = "codexlib codexlibp2p" + +type NodeP2PMsgType* = enum + CONNECT + +type NodeP2PRequest* = object + operation: NodeP2PMsgType + peerId: cstring + peerAddresses: seq[cstring] + +proc createShared*( + T: type NodeP2PRequest, + op: NodeP2PMsgType, + peerId: cstring = "", + peerAddresses: seq[cstring] = @[], +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].peerId = peerId.alloc() + ret[].peerAddresses = peerAddresses + return ret + +proc destroyShared(self: ptr NodeP2PRequest) = + deallocShared(self[].peerId) + deallocShared(self) + +proc connect( + codex: ptr CodexServer, peerId: cstring, peerAddresses: seq[cstring] = @[] +): Future[Result[string, string]] {.async: (raises: []).} = + let node = codex[].node + let res = PeerId.init($peerId) + if res.isErr: + return err("Failed to connect to peer: invalid peer ID: " & $res.error()) + + let id = res.get() + + let addresses = + if peerAddresses.len > 0: + var addrs: seq[MultiAddress] + for addrStr in peerAddresses: + let res = MultiAddress.init($addrStr) + if res.isOk: + addrs.add(res[]) + else: + return err("Failed to connect to peer: invalid address: " & $addrStr) + addrs + else: + try: + let peerRecord = await node.findPeer(id) + if peerRecord.isNone: + return err("Failed to connect to peer: peer not found.") + + peerRecord.get().addresses.mapIt(it.address) + except CancelledError: + return err("Failed to connect to peer: operation cancelled.") + except CatchableError as e: + return err("Failed to connect to peer: " & $e.msg) + + try: + await node.connect(id, addresses) + except CancelledError: + return err("Failed to connect to peer: operation cancelled.") + except CatchableError as e: + return err("Failed to connect to peer: " & $e.msg) + + return ok("") + +proc process*( + self: ptr NodeP2PRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeP2PMsgType.CONNECT: + let res = (await connect(codex, self.peerId, self.peerAddresses)) + if res.isErr: + error "Failed to CONNECT.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_storage_request.nim b/library/codex_thread_requests/requests/node_storage_request.nim new file mode 100644 index 000000000..74d05c4f6 --- /dev/null +++ b/library/codex_thread_requests/requests/node_storage_request.nim @@ -0,0 +1,158 @@ +{.push raises: [].} + +## This file contains the node storage request. +## 4 operations are available: +## - LIST: list all manifests stored in the node. +## - DELETE: Deletes either a single block or an entire dataset from the local node. +## - FETCH: download a file from the network to the local node. +## - SPACE: get the amount of space used by the local node. + +import std/[options] +import chronos +import chronicles +import libp2p/stream/[lpstream] +import serde/json as serde +import ../../alloc +import ../../../codex/units +import ../../../codex/manifest +import ../../../codex/stores/repostore + +from ../../../codex/codex import CodexServer, node, repoStore +from ../../../codex/node import + iterateManifests, fetchManifest, fetchDatasetAsyncTask, delete +from libp2p import Cid, init, `$` + +logScope: + topics = "codexlib codexlibstorage" + +type NodeStorageMsgType* = enum + LIST + DELETE + FETCH + SPACE + +type NodeStorageRequest* = object + operation: NodeStorageMsgType + cid: cstring + +type StorageSpace = object + totalBlocks* {.serialize.}: Natural + quotaMaxBytes* {.serialize.}: NBytes + quotaUsedBytes* {.serialize.}: NBytes + quotaReservedBytes* {.serialize.}: NBytes + +proc createShared*( + T: type NodeStorageRequest, op: NodeStorageMsgType, cid: cstring = "" +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].cid = cid.alloc() + + return ret + +proc destroyShared(self: ptr NodeStorageRequest) = + deallocShared(self[].cid) + deallocShared(self) + +type ManifestWithCid = object + cid {.serialize.}: string + manifest {.serialize.}: Manifest + +proc list( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + var manifests = newSeq[ManifestWithCid]() + proc onManifest(cid: Cid, manifest: Manifest) {.raises: [], gcsafe.} = + manifests.add(ManifestWithCid(cid: $cid, manifest: manifest)) + + try: + let node = codex[].node + await node.iterateManifests(onManifest) + except CancelledError: + return err("Failed to list manifests: cancelled operation.") + except CatchableError as err: + return err("Failed to list manifest: : " & err.msg) + + return ok(serde.toJson(manifests)) + +proc delete( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to delete the data: cannot parse cid: " & $cCid) + + let node = codex[].node + try: + let res = await node.delete(cid.get()) + if res.isErr: + return err("Failed to delete the data: " & res.error.msg) + except CancelledError: + return err("Failed to delete the data: cancelled operation.") + except CatchableError as err: + return err("Failed to delete the data: " & err.msg) + + return ok("") + +proc fetch( + codex: ptr CodexServer, cCid: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to fetch the data: cannot parse cid: " & $cCid) + + try: + let node = codex[].node + let manifest = await node.fetchManifest(cid.get()) + if manifest.isErr: + return err("Failed to fetch the data: " & manifest.error.msg) + + node.fetchDatasetAsyncTask(manifest.get()) + + return ok(serde.toJson(manifest.get())) + except CancelledError: + return err("Failed to fetch the data: download cancelled.") + +proc space( + codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + let repoStore = codex[].repoStore + let space = StorageSpace( + totalBlocks: repoStore.totalBlocks, + quotaMaxBytes: repoStore.quotaMaxBytes, + quotaUsedBytes: repoStore.quotaUsedBytes, + quotaReservedBytes: repoStore.quotaReservedBytes, + ) + return ok(serde.toJson(space)) + +proc process*( + self: ptr NodeStorageRequest, codex: ptr CodexServer +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeStorageMsgType.LIST: + let res = (await list(codex)) + if res.isErr: + error "Failed to LIST.", error = res.error + return err($res.error) + return res + of NodeStorageMsgType.DELETE: + let res = (await delete(codex, self.cid)) + if res.isErr: + error "Failed to DELETE.", error = res.error + return err($res.error) + return res + of NodeStorageMsgType.FETCH: + let res = (await fetch(codex, self.cid)) + if res.isErr: + error "Failed to FETCH.", error = res.error + return err($res.error) + return res + of NodeStorageMsgType.SPACE: + let res = (await space(codex)) + if res.isErr: + error "Failed to SPACE.", error = res.error + return err($res.error) + return res diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim new file mode 100644 index 000000000..fc2537987 --- /dev/null +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -0,0 +1,374 @@ +{.push raises: [].} + +## This file contains the upload request. +## A session is created for each upload allowing to resume, +## pause and cancel uploads (using chunks). +## +## There are two ways to upload a file: +## 1. Via chunks: the filepath parameter is the data filename. Steps are: +## - INIT: creates a new upload session and returns its ID. +## - CHUNK: sends a chunk of data to the upload session. +## - FINALIZE: finalizes the upload and returns the CID of the uploaded file. +## - CANCEL: cancels the upload session. +## +## 2. Directly from a file path: the filepath has to be absolute. +## - INIT: creates a new upload session and returns its ID +## - FILE: starts the upload and returns the CID of the uploaded file +## Cancel is not supported in this mode because the worker will be busy +## uploading the file so it cannot pickup another request to cancel the upload. + +import std/[options, os, mimetypes] +import chronos +import chronicles +import questionable +import questionable/results +import faststreams/inputs +import libp2p/stream/[bufferstream, lpstream] +import ../../alloc +import ../../../codex/units +import ../../../codex/codextypes + +from ../../../codex/codex import CodexServer, node +from ../../../codex/node import store +from libp2p import Cid, `$` + +logScope: + topics = "codexlib codexlibupload" + +type NodeUploadMsgType* = enum + INIT + CHUNK + FINALIZE + CANCEL + FILE + +type OnProgressHandler = proc(bytes: int): void {.gcsafe, raises: [].} + +type NodeUploadRequest* = object + operation: NodeUploadMsgType + sessionId: cstring + filepath: cstring + chunk: seq[byte] + chunkSize: csize_t + +type + UploadSessionId* = string + UploadSessionCount* = int + UploadSession* = object + stream: BufferStream + fut: Future[?!Cid] + filepath: string + chunkSize: int + onProgress: OnProgressHandler + +var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] +var nexUploadSessionCount {.threadvar.}: UploadSessionCount + +proc createShared*( + T: type NodeUploadRequest, + op: NodeUploadMsgType, + sessionId: cstring = "", + filepath: cstring = "", + chunk: seq[byte] = @[], + chunkSize: csize_t = 0, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].sessionId = sessionId.alloc() + ret[].filepath = filepath.alloc() + ret[].chunk = chunk + ret[].chunkSize = chunkSize + + return ret + +proc destroyShared(self: ptr NodeUploadRequest) = + deallocShared(self[].filepath) + deallocShared(self[].sessionId) + deallocShared(self) + +proc init( + codex: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0 +): Future[Result[string, string]] {.async: (raises: []).} = + ## Init a new session upload and return its ID. + ## The session contains the future corresponding to the + ## `node.store` call. + ## The filepath can be: + ## - the filename when uploading via chunks + ## - the absolute path to a file when uploading directly. + ## The mimetype is deduced from the filename extension. + ## + ## The chunkSize matches by default the block size used to store the file. + ## + ## A callback `onBlockStore` is provided to `node.store` to + ## report the progress of the upload. This callback will check + ## that an `onProgress` handler is set in the session + ## and call it with the number of bytes stored each time a block + ## is stored. + + var filenameOpt, mimetypeOpt = string.none + + if isAbsolute($filepath): + if not fileExists($filepath): + return err( + "Failed to create an upload session, the filepath does not exist: " & $filepath + ) + + if filepath != "": + let (_, name, ext) = splitFile($filepath) + + filenameOpt = (name & ext).some + + if ext != "": + let extNoDot = + if ext.len > 0: + ext[1 ..^ 1] + else: + "" + let mime = newMimetypes() + let mimetypeStr = mime.getMimetype(extNoDot, "") + + mimetypeOpt = if mimetypeStr == "": string.none else: mimetypeStr.some + + let sessionId = $nexUploadSessionCount + nexUploadSessionCount.inc() + + let stream = BufferStream.new() + let lpStream = LPStream(stream) + let node = codex[].node + + let onBlockStored = proc(chunk: seq[byte]): void {.gcsafe, raises: [].} = + try: + if uploadSessions.contains($sessionId): + let session = uploadSessions[$sessionId] + if session.onProgress != nil: + session.onProgress(chunk.len) + except KeyError: + error "Failed to push progress update, session is not found: ", + sessionId = $sessionId + + let blockSize = + if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize + let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStored) + + uploadSessions[sessionId] = UploadSession( + stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int + ) + + return ok(sessionId) + +proc chunk( + codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte] +): Future[Result[string, string]] {.async: (raises: []).} = + ## Upload a chunk of data to the session identified by sessionId. + ## The chunk is pushed to the BufferStream of the session. + ## If the chunk size is equal or greater than the session chunkSize, + ## the `onProgress` callback is temporarily set to receive the progress + ## from `onBlockStored` callback. This provide a way to report progress + ## precisely when a block is stored. + ## If the chunk size is smaller than the session chunkSize, + ## the `onProgress` callback is not set because the LPStream will + ## wait until enough data is received to form a block before storing it. + ## The wrapper may then report the progress because the data is in the stream + ## but not yet stored. + + if not uploadSessions.contains($sessionId): + return err("Failed to upload the chunk, the session is not found: " & $sessionId) + + var fut = newFuture[void]() + + try: + let session = uploadSessions[$sessionId] + + if chunk.len >= session.chunkSize: + uploadSessions[$sessionId].onProgress = proc( + bytes: int + ): void {.gcsafe, raises: [].} = + fut.complete() + await session.stream.pushData(chunk) + else: + fut = session.stream.pushData(chunk) + + await fut + + uploadSessions[$sessionId].onProgress = nil + except KeyError: + return err("Failed to upload the chunk, the session is not found: " & $sessionId) + except LPError as e: + return err("Failed to upload the chunk, stream error: " & $e.msg) + except CancelledError: + return err("Failed to upload the chunk, operation cancelled.") + except CatchableError as e: + return err("Failed to upload the chunk: " & $e.msg) + finally: + if not fut.finished(): + fut.cancelSoon() + + return ok("") + +proc finalize( + codex: ptr CodexServer, sessionId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + ## Finalize the upload session identified by sessionId. + ## This closes the BufferStream and waits for the `node.store` future + ## to complete. It returns the CID of the uploaded file. + + if not uploadSessions.contains($sessionId): + return + err("Failed to finalize the upload session, session not found: " & $sessionId) + + var session: UploadSession + try: + session = uploadSessions[$sessionId] + await session.stream.pushEof() + + let res = await session.fut + if res.isErr: + return err("Failed to finalize the upload session: " & res.error().msg) + + return ok($res.get()) + except KeyError: + return + err("Failed to finalize the upload session, invalid session ID: " & $sessionId) + except LPStreamError as e: + return err("Failed to finalize the upload session, stream error: " & $e.msg) + except CancelledError: + return err("Failed to finalize the upload session, operation cancelled") + except CatchableError as e: + return err("Failed to finalize the upload session: " & $e.msg) + finally: + if uploadSessions.contains($sessionId): + uploadSessions.del($sessionId) + + if session.fut != nil and not session.fut.finished(): + session.fut.cancelSoon() + +proc cancel( + codex: ptr CodexServer, sessionId: cstring +): Future[Result[string, string]] {.async: (raises: []).} = + ## Cancel the upload session identified by sessionId. + ## This cancels the `node.store` future and removes the session + ## from the table. + ## This operation is not supported when uploading file because + ## the worker will be busy uploading the file so it cannot pickup + ## another request to cancel the upload. + + if not uploadSessions.contains($sessionId): + return err("Failed to cancel the upload session, session not found: " & $sessionId) + + try: + let session = uploadSessions[$sessionId] + session.fut.cancelSoon() + except KeyError: + return err("Failed to cancel the upload session, invalid session ID: " & $sessionId) + + uploadSessions.del($sessionId) + + return ok("") + +proc streamFile( + filepath: string, stream: BufferStream, chunkSize: int +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = + ## Streams a file from the given filepath using faststream. + ## fsMultiSync cannot be used with chronos because of this warning: + ## Warning: chronos backend uses nested calls to `waitFor` which + ## is not supported by chronos - it is not recommended to use it until + ## this has been resolved. + ## + ## Ideally when it is solved, we should use fsMultiSync or find a way to use async + ## file I/O with chronos, see https://github.com/status-im/nim-chronos/issues/501. + + try: + let inputStreamHandle = filepath.fileInput() + let inputStream = inputStreamHandle.implicitDeref + + var buf = newSeq[byte](chunkSize) + while inputStream.readable: + let read = inputStream.readIntoEx(buf) + if read == 0: + break + await stream.pushData(buf[0 ..< read]) + # let byt = inputStream.read + # await stream.pushData(@[byt]) + return ok() + except IOError, OSError, LPStreamError: + let e = getCurrentException() + return err("Failed to stream the file: " & $e.msg) + +proc file( + codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler +): Future[Result[string, string]] {.async: (raises: []).} = + ## Starts the file upload for the session identified by sessionId. + ## Will call finalize when done and return the CID of the uploaded file. + ## + ## The onProgress callback is called with the number of bytes + ## to report the progress of the upload. + + if not uploadSessions.contains($sessionId): + return err("Failed to upload the file, invalid session ID: " & $sessionId) + + var session: UploadSession + + try: + uploadSessions[$sessionId].onProgress = onProgress + session = uploadSessions[$sessionId] + + let res = await streamFile(session.filepath, session.stream, session.chunkSize) + if res.isErr: + return err("Failed to upload the file: " & res.error) + + return await codex.finalize(sessionId) + except KeyError: + return err("Failed to upload the file, the session is not found: " & $sessionId) + except LPStreamError, IOError: + let e = getCurrentException() + return err("Failed to upload the file: " & $e.msg) + except CancelledError: + return err("Failed to upload the file, the operation is cancelled.") + except CatchableError as e: + return err("Failed to upload the file: " & $e.msg) + finally: + if uploadSessions.contains($sessionId): + uploadSessions.del($sessionId) + + if session.fut != nil and not session.fut.finished(): + session.fut.cancelSoon() + +proc process*( + self: ptr NodeUploadRequest, + codex: ptr CodexServer, + onUploadProgress: OnProgressHandler = nil, +): Future[Result[string, string]] {.async: (raises: []).} = + defer: + destroyShared(self) + + case self.operation + of NodeUploadMsgType.INIT: + let res = (await init(codex, self.filepath, self.chunkSize)) + if res.isErr: + error "Failed to INIT.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.CHUNK: + let res = (await chunk(codex, self.sessionId, self.chunk)) + if res.isErr: + error "Failed to CHUNK.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.FINALIZE: + let res = (await finalize(codex, self.sessionId)) + if res.isErr: + error "Failed to FINALIZE.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.CANCEL: + let res = (await cancel(codex, self.sessionId)) + if res.isErr: + error "Failed to CANCEL.", error = res.error + return err($res.error) + return res + of NodeUploadMsgType.FILE: + let res = (await file(codex, self.sessionId, onUploadProgress)) + if res.isErr: + error "Failed to FILE.", error = res.error + return err($res.error) + return res diff --git a/library/events/json_base_event.nim b/library/events/json_base_event.nim new file mode 100644 index 000000000..743444ed2 --- /dev/null +++ b/library/events/json_base_event.nim @@ -0,0 +1,14 @@ +# JSON Event definition +# +# This file defines de JsonEvent type, which serves as the base +# for all event types in the library +# +# Reference specification: +# https://github.com/vacp2p/rfc/blob/master/content/docs/rfcs/36/README.md#jsonsignal-type + +type JsonEvent* = ref object of RootObj + eventType* {.requiresInit.}: string + +method `$`*(jsonEvent: JsonEvent): string {.base.} = + discard + # All events should implement this diff --git a/library/ffi_types.nim b/library/ffi_types.nim new file mode 100644 index 000000000..1a865eaf1 --- /dev/null +++ b/library/ffi_types.nim @@ -0,0 +1,62 @@ +# FFI Types and Utilities +# +# This file defines the core types and utilities for the library's foreign +# function interface (FFI), enabling interoperability with external code. + +################################################################################ +### Exported types +import results + +type CodexCallback* = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +const RET_OK*: cint = 0 +const RET_ERR*: cint = 1 +const RET_MISSING_CALLBACK*: cint = 2 +const RET_PROGRESS*: cint = 3 + +## Returns RET_OK as acknowledgment and call the callback +## with RET_OK code and the provided message. +proc success*(callback: CodexCallback, msg: string, userData: pointer): cint = + callback(RET_OK, cast[ptr cchar](msg), cast[csize_t](len(msg)), userData) + + return RET_OK + +## Returns RET_ERR as acknowledgment and call the callback +## with RET_ERR code and the provided message. +proc error*(callback: CodexCallback, msg: string, userData: pointer): cint = + let msg = "libcodex error: " & msg + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + + return RET_ERR + +## Returns RET_OK as acknowledgment if the result is ok. +## If not, return RET_ERR and call the callback with the error message. +proc okOrError*[T]( + callback: CodexCallback, res: Result[T, string], userData: pointer +): cint = + if res.isOk: + return RET_OK + + return callback.error($res.error, userData) + +### End of exported types +################################################################################ + +################################################################################ +### FFI utils + +template foreignThreadGc*(body: untyped) = + when declared(setupForeignThreadGc): + setupForeignThreadGc() + + body + + when declared(tearDownForeignThreadGc): + tearDownForeignThreadGc() + +type onDone* = proc() + +### End of FFI utils +################################################################################ diff --git a/library/libcodex.h b/library/libcodex.h new file mode 100644 index 000000000..e63c2ad10 --- /dev/null +++ b/library/libcodex.h @@ -0,0 +1,200 @@ +/** +* libcodex.h - C Interface for Example Library +* +* This header provides the public API for libcodex +* +* To see the auto-generated header by Nim, run `make libcodex` from the +* repository root. The generated file will be created at: +* nimcache/release/libcodex/libcodex.h +*/ + +#ifndef __libcodex__ +#define __libcodex__ + +#include +#include + +// The possible returned values for the functions that return int +#define RET_OK 0 +#define RET_ERR 1 +#define RET_MISSING_CALLBACK 2 +#define RET_PROGRESS 3 + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*CodexCallback) (int callerRet, const char* msg, size_t len, void* userData); + +void* codex_new( + const char* configJson, + CodexCallback callback, + void* userData); + +int codex_version( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_revision( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_repo( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_debug( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_spr( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_peer_id( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_log_level( + void* ctx, + const char* logLevel, + CodexCallback callback, + void* userData); + +int codex_connect( + void* ctx, + const char* peerId, + const char** peerAddresses, + size_t peerAddressesSize, + CodexCallback callback, + void* userData); + +int codex_peer_debug( + void* ctx, + const char* peerId, + CodexCallback callback, + void* userData); + + +int codex_upload_init( + void* ctx, + const char* filepath, + size_t chunkSize, + CodexCallback callback, + void* userData); + +int codex_upload_chunk( + void* ctx, + const char* sessionId, + const uint8_t* chunk, + size_t len, + CodexCallback callback, + void* userData); + +int codex_upload_finalize( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + +int codex_upload_cancel( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + +int codex_upload_file( + void* ctx, + const char* sessionId, + CodexCallback callback, + void* userData); + +int codex_download_stream( + void* ctx, + const char* cid, + size_t chunkSize, + bool local, + const char* filepath, + CodexCallback callback, + void* userData); + +int codex_download_init( + void* ctx, + const char* cid, + size_t chunkSize, + bool local, + CodexCallback callback, + void* userData); + +int codex_download_chunk( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_download_cancel( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_download_manifest( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_storage_list( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_storage_space( + void* ctx, + CodexCallback callback, + void* userData); + +int codex_storage_delete( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_storage_fetch( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_start(void* ctx, + CodexCallback callback, + void* userData); + +int codex_stop(void* ctx, + CodexCallback callback, + void* userData); + +int codex_close(void* ctx, + CodexCallback callback, + void* userData); + +// Destroys an instance of a codex node created with codex_new +int codex_destroy(void* ctx, + CodexCallback callback, + void* userData); + +void codex_set_event_callback(void* ctx, + CodexCallback callback, + void* userData); + +#ifdef __cplusplus +} +#endif + +#endif /* __libcodex__ */ \ No newline at end of file diff --git a/library/libcodex.nim b/library/libcodex.nim new file mode 100644 index 000000000..839137ba4 --- /dev/null +++ b/library/libcodex.nim @@ -0,0 +1,550 @@ +# libcodex.nim - C-exported interface for the Codex shared library +# +# This file implements the public C API for libcodex. +# It acts as the bridge between C programs and the internal Nim implementation. +# +# This file defines: +# - Initialization logic for the Nim runtime (once per process) +# - Thread-safe exported procs callable from C +# - Callback registration and invocation for asynchronous communication + +# cdecl is C declaration calling convention. +# It’s the standard way C compilers expect functions to behave: +# 1- Caller cleans up the stack after the call +# 2- Symbol names are exported in a predictable way +# In other termes, it is a glue that makes Nim functions callable as normal C functions. +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} + +# Ensure code is position-independent so it can be built into a shared library (.so). +# In other terms, the code that can run no matter where it’s placed in memory. +{.passc: "-fPIC".} + +when defined(linux): + # Define the canonical name for this library + {.passl: "-Wl,-soname,libcodex.so".} + +import std/[atomics] +import chronicles +import chronos +import chronos/threadsync +import ./codex_context +import ./codex_thread_requests/codex_thread_request +import ./codex_thread_requests/requests/node_lifecycle_request +import ./codex_thread_requests/requests/node_info_request +import ./codex_thread_requests/requests/node_debug_request +import ./codex_thread_requests/requests/node_p2p_request +import ./codex_thread_requests/requests/node_upload_request +import ./codex_thread_requests/requests/node_download_request +import ./codex_thread_requests/requests/node_storage_request +import ./ffi_types + +from ../codex/conf import codexVersion, updateLogLevel + +logScope: + topics = "codexlib" + +template checkLibcodexParams*( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +) = + if not isNil(ctx): + ctx[].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK + +# From Nim doc: +# "the C targets require you to initialize Nim's internals, which is done calling a NimMain function." +# "The name NimMain can be influenced via the --nimMainPrefix:prefix switch." +# "Use --nimMainPrefix:MyLib and the function to call is named MyLibNimMain." +proc libcodexNimMain() {.importc.} + +# Atomic flag to prevent multiple initializations +var initialized: Atomic[bool] + +if defined(android): + # Redirect chronicles to Android System logs + when compiles(defaultChroniclesStream.outputs[0].writer): + defaultChroniclesStream.outputs[0].writer = proc( + logLevel: LogLevel, msg: LogOutputStr + ) {.raises: [].} = + echo logLevel, msg + +# Initializes the Nim runtime and foreign-thread GC +proc initializeLibrary() {.exported.} = + if not initialized.exchange(true): + ## Every Nim library must call `NimMain()` once + libcodexNimMain() + when declared(setupForeignThreadGc): + setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + +proc codex_new( + configJson: cstring, callback: CodexCallback, userData: pointer +): pointer {.dynlib, exported.} = + initializeLibrary() + + if isNil(callback): + error "Failed to create codex instance: the callback is missing." + return nil + + var ctx = codex_context.createCodexContext().valueOr: + let msg = $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return nil + + ctx.userData = userData + + let reqContent = + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson) + + codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ).isOkOr: + let msg = $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return nil + + return ctx + +proc codex_version( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + callback( + RET_OK, + cast[ptr cchar](conf.codexVersion), + cast[csize_t](len(conf.codexVersion)), + userData, + ) + + return RET_OK + +proc codex_revision( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + callback( + RET_OK, + cast[ptr cchar](conf.codexRevision), + cast[csize_t](len(conf.codexRevision)), + userData, + ) + + return RET_OK + +proc codex_repo( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.REPO) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.INFO, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_debug( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.DEBUG) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DEBUG, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_spr( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.SPR) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.INFO, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_peer_id( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.PEERID) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.INFO, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +## Set the log level of the library at runtime. +## It uses updateLogLevel which is a synchronous proc and +## cannot be used inside an async context because of gcsafe issue. +proc codex_log_level( + ctx: ptr CodexContext, logLevel: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + try: + updateLogLevel($logLevel) + except ValueError as e: + return callback.error(e.msg, userData) + + return callback.success("", userData) + +proc codex_connect( + ctx: ptr CodexContext, + peerId: cstring, + peerAddressesPtr: ptr cstring, + peerAddressesLength: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + var peerAddresses = newSeq[cstring](peerAddressesLength) + let peers = cast[ptr UncheckedArray[cstring]](peerAddressesPtr) + for i in 0 ..< peerAddressesLength: + peerAddresses[i] = peers[i] + + let reqContent = NodeP2PRequest.createShared( + NodeP2PMsgType.CONNECT, peerId = peerId, peerAddresses = peerAddresses + ) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.P2P, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_peer_debug( + ctx: ptr CodexContext, peerId: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeDebugRequest.createShared(NodeDebugMsgType.PEER, peerId = peerId) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DEBUG, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_close( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CLOSE_NODE) + var res = codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ) + if res.isErr: + return callback.error(res.error, userData) + + return callback.okOrError(res, userData) + +proc codex_destroy( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let res = codex_context.destroyCodexContext(ctx) + if res.isErr: + return callback.error(res.error, userData) + + return RET_OK + +proc codex_upload_init( + ctx: ptr CodexContext, + filepath: cstring, + chunkSize: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.INIT, filepath = filepath, chunkSize = chunkSize + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_chunk( + ctx: ptr CodexContext, + sessionId: cstring, + data: ptr byte, + len: csize_t, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let chunk = newSeq[byte](len) + copyMem(addr chunk[0], data, len) + + let reqContent = NodeUploadRequest.createShared( + NodeUploadMsgType.CHUNK, sessionId = sessionId, chunk = chunk + ) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_finalize( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.FINALIZE, sessionId = sessionId) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_cancel( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.CANCEL, sessionId = sessionId) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_upload_file( + ctx: ptr CodexContext, + sessionId: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent = + NodeUploadRequest.createShared(NodeUploadMsgType.FILE, sessionId = sessionId) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.UPLOAD, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_download_init( + ctx: ptr CodexContext, + cid: cstring, + chunkSize: csize_t, + local: bool, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared( + NodeDownloadMsgType.INIT, cid = cid, chunkSize = chunkSize, local = local + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_download_chunk( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CHUNK, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_download_stream( + ctx: ptr CodexContext, + cid: cstring, + chunkSize: csize_t, + local: bool, + filepath: cstring, + callback: CodexCallback, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared( + NodeDownloadMsgType.STREAM, + cid = cid, + chunkSize = chunkSize, + local = local, + filepath = filepath, + ) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_download_cancel( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.CANCEL, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_download_manifest( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeDownloadRequest.createShared(NodeDownloadMsgType.MANIFEST, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.DOWNLOAD, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_storage_list( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.LIST) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_storage_space( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.SPACE) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_storage_delete( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.DELETE, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_storage_fetch( + ctx: ptr CodexContext, cid: cstring, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let req = NodeStorageRequest.createShared(NodeStorageMsgType.FETCH, cid = cid) + + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.STORAGE, req, callback, userData + ) + + result = callback.okOrError(res, userData) + +proc codex_start( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent: ptr NodeLifecycleRequest = + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_stop( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibcodexParams(ctx, callback, userData) + + let reqContent: ptr NodeLifecycleRequest = + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE) + let res = codex_context.sendRequestToCodexThread( + ctx, RequestType.LIFECYCLE, reqContent, callback, userData + ) + + return callback.okOrError(res, userData) + +proc codex_set_event_callback( + ctx: ptr CodexContext, callback: CodexCallback, userData: pointer +) {.dynlib, exportc.} = + initializeLibrary() + ctx[].eventCallback = cast[pointer](callback) + ctx[].eventUserData = userData diff --git a/tests/codex/helpers/mockclock.nim b/tests/codex/helpers/mockclock.nim index be1eb4d2f..764460414 100644 --- a/tests/codex/helpers/mockclock.nim +++ b/tests/codex/helpers/mockclock.nim @@ -33,11 +33,18 @@ proc advance*(clock: MockClock, seconds: int64) = method now*(clock: MockClock): SecondsSince1970 = clock.time -method waitUntil*(clock: MockClock, time: SecondsSince1970) {.async.} = - if time > clock.now(): - let future = newFuture[void]() - clock.waiting.add(Waiting(until: time, future: future)) - await future +method waitUntil*( + clock: MockClock, time: SecondsSince1970 +) {.async: (raises: [CancelledError]).} = + try: + if time > clock.now(): + let future = newFuture[void]() + clock.waiting.add(Waiting(until: time, future: future)) + await future + except CancelledError as e: + raise e + except Exception as e: + discard proc isWaiting*(clock: MockClock): bool = clock.waiting.len > 0 diff --git a/tests/integration/codexprocess.nim b/tests/integration/codexprocess.nim index 3eca5b04e..351a78e2f 100644 --- a/tests/integration/codexprocess.nim +++ b/tests/integration/codexprocess.nim @@ -51,7 +51,8 @@ proc ethAccount*(node: CodexProcess): Address = proc apiUrl*(node: CodexProcess): string = let config = CodexConf.load(cmdLine = node.arguments, quitOnFailure = false) - return "http://" & config.apiBindAddress & ":" & $config.apiPort & "/api/codex/v1" + return + "http://" & config.apiBindAddress.get() & ":" & $config.apiPort & "/api/codex/v1" proc client*(node: CodexProcess): CodexClient = if client =? node.client: