From a48af4821043d2afea7fcc27fc2e7268de01d074 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 3 Jun 2025 02:55:08 +0200 Subject: [PATCH 01/22] uses explicit type in "Should retrieve block expiration information" test --- .github/actions/nimbus-build-system/action.yml | 5 +++-- tests/codex/stores/testrepostore.nim | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/actions/nimbus-build-system/action.yml b/.github/actions/nimbus-build-system/action.yml index 4ec8af4b5..36f414567 100644 --- a/.github/actions/nimbus-build-system/action.yml +++ b/.github/actions/nimbus-build-system/action.yml @@ -37,6 +37,7 @@ runs: sudo apt-get update -qq sudo DEBIAN_FRONTEND='noninteractive' apt-get install \ --no-install-recommends -yq lcov + sudo apt install libpcre3 - name: APT (Linux i386) if: inputs.os == 'linux' && inputs.cpu == 'i386' @@ -83,7 +84,7 @@ runs: - name: Install gcc 14 on Linux # We don't want to install gcc 14 for coverage (Ubuntu 20.04) - if : ${{ inputs.os == 'linux' && inputs.coverage != 'true' }} + if: ${{ inputs.os == 'linux' && inputs.coverage != 'true' }} shell: ${{ inputs.shell }} {0} run: | # Skip for older Ubuntu versions @@ -202,7 +203,7 @@ runs: - name: Restore Nim toolchain binaries from cache id: nim-cache uses: actions/cache@v4 - if : ${{ inputs.coverage != 'true' }} + if: ${{ inputs.coverage != 'true' }} with: path: NimBinaries key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_version }}-cache-${{ env.cache_nonce }}-${{ github.run_id }} diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 69f38711f..1a2f58ab0 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -293,10 +293,12 @@ asyncchecksuite "RepoStore": test "Should retrieve block expiration information": proc unpack( - beIter: Future[?!SafeAsyncIter[BlockExpiration]] - ): Future[seq[BlockExpiration]] {.async: (raises: [CatchableError]).} = + beIter: Future[?!SafeAsyncIter[BlockExpiration]].Raising([CancelledError]) + ): Future[seq[BlockExpiration]] {.async: (raises: [CancelledError]).} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: + info "Failed to get BlockExpiration async iterator, returning empty sequence", + err = err.msg return expirations for beFut in toSeq(iter): if value =? (await beFut): From 293adcc6f6caf4d54945d75cb271f1feb8e41ae0 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Thu, 12 Jun 2025 15:25:22 +0200 Subject: [PATCH 02/22] adds "format-global" to use globally installed nph --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile b/Makefile index f39a3394a..18e1f5f86 100644 --- a/Makefile +++ b/Makefile @@ -233,6 +233,11 @@ format: $(NPH) codex/ $(NPH) tests/ +format-global: + nph *.nim + nph codex/ + nph tests/ + clean-nph: rm -f $(NPH) From c9fba1716944aee811b421700f158a996734b062 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 16 Jun 2025 18:16:38 +0200 Subject: [PATCH 03/22] removes async iter from queryiterhelper (and the related tests) --- codex/stores/queryiterhelper.nim | 57 ---------------------- tests/codex/stores/testqueryiterhelper.nim | 9 ++-- 2 files changed, 4 insertions(+), 62 deletions(-) diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index bbc3be698..699eeeffa 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -4,47 +4,12 @@ import pkg/chronos import pkg/chronicles import pkg/datastore/typedds -import ../utils/asynciter import ../utils/safeasynciter {.push raises: [].} type KeyVal*[T] = tuple[key: Key, value: T] -proc toAsyncIter*[T]( - queryIter: QueryIter[T], finishOnErr: bool = true -): Future[?!AsyncIter[?!QueryResponse[T]]] {.async: (raises: [CancelledError]).} = - ## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically - ## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only - ## if the flag finishOnErr is set to true) - ## - - if queryIter.finished: - trace "Disposing iterator" - if error =? (await queryIter.dispose()).errorOption: - return failure(error) - return success(AsyncIter[?!QueryResponse[T]].empty()) - - var errOccurred = false - - proc genNext(): Future[?!QueryResponse[T]] {.async.} = - let queryResOrErr = await queryIter.next() - - if queryResOrErr.isErr: - errOccurred = true - - if queryIter.finished or (errOccurred and finishOnErr): - trace "Disposing iterator" - if error =? (await queryIter.dispose()).errorOption: - return failure(error) - - return queryResOrErr - - proc isFinished(): bool = - queryIter.finished or (errOccurred and finishOnErr) - - AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success - proc toSafeAsyncIter*[T]( queryIter: QueryIter[T], finishOnErr: bool = true ): Future[?!SafeAsyncIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} = @@ -79,28 +44,6 @@ proc toSafeAsyncIter*[T]( SafeAsyncIter[QueryResponse[T]].new(genNext, isFinished).success -proc filterSuccess*[T]( - iter: AsyncIter[?!QueryResponse[T]] -): Future[AsyncIter[tuple[key: Key, value: T]]] {.async: (raises: [CancelledError]).} = - ## Filters out any items that are not success - - proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} = - without res =? resOrErr, error: - error "Error occurred when getting QueryResponse", msg = error.msg - return KeyVal[T].none - - without key =? res.key: - warn "No key for a QueryResponse" - return KeyVal[T].none - - without value =? res.value, error: - error "Error occurred when getting a value from QueryResponse", msg = error.msg - return KeyVal[T].none - - (key: key, value: value).some - - await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping) - proc filterSuccess*[T]( iter: SafeAsyncIter[QueryResponse[T]] ): Future[SafeAsyncIter[tuple[key: Key, value: T]]] {. diff --git a/tests/codex/stores/testqueryiterhelper.nim b/tests/codex/stores/testqueryiterhelper.nim index 4e83dad43..e8f00e9cb 100644 --- a/tests/codex/stores/testqueryiterhelper.nim +++ b/tests/codex/stores/testqueryiterhelper.nim @@ -6,7 +6,7 @@ import pkg/chronos import pkg/datastore/typedds import pkg/datastore/sql/sqliteds import pkg/codex/stores/queryiterhelper -import pkg/codex/utils/asynciter +import pkg/codex/utils/safeasynciter import ../../asynctest import ../helpers @@ -43,15 +43,14 @@ asyncchecksuite "Test QueryIter helper": queryIter.dispose = () => (disposed = true; iterDispose()) let - iter1 = (await toAsyncIter[string](queryIter)).tryGet() + iter1 = (await toSafeAsyncIter[string](queryIter)).tryGet() iter2 = await filterSuccess[string](iter1) var items = initTable[string, string]() for fut in iter2: - let item = await fut - - items[item.key.value] = item.value + if item =? (await fut): + items[item.key.value] = item.value check: items == source From b56099e8ca7ba8f10df6aeec5fcd332360e0eab1 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 16 Jun 2025 18:17:16 +0200 Subject: [PATCH 04/22] removes import of the legacy asynciter from mockrepostore --- tests/codex/helpers/mockrepostore.nim | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index 52e598d96..36784055c 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -14,7 +14,6 @@ import pkg/questionable import pkg/questionable/results import pkg/codex/stores/repostore -import pkg/codex/utils/asynciter import pkg/codex/utils/safeasynciter type MockRepoStore* = ref object of RepoStore From 6957c9d7826f30715a7dfa5e0dfa578919592e49 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 16 Jun 2025 18:28:13 +0200 Subject: [PATCH 05/22] remove not needed asynciter import in testcontracts --- tests/codex/node/testcontracts.nim | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index e8d9c743e..dc6f194ae 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -27,7 +27,6 @@ import pkg/codex/discovery import pkg/codex/erasure import pkg/codex/blocktype as bt import pkg/codex/stores/repostore/coders -import pkg/codex/utils/asynciter import pkg/codex/indexingstrategy import pkg/codex/node {.all.} From 91f1ed87369f6f6921a86257db3c1ee8150e3d97 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 16 Jun 2025 18:49:48 +0200 Subject: [PATCH 06/22] removes last references to legacy asynciter in tests --- tests/codex/testindexingstrategy.nim | 2 +- tests/codex/testutils.nim | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/codex/testindexingstrategy.nim b/tests/codex/testindexingstrategy.nim index 00486849c..cfb8e397b 100644 --- a/tests/codex/testindexingstrategy.nim +++ b/tests/codex/testindexingstrategy.nim @@ -1,7 +1,7 @@ import std/sequtils import pkg/chronos -import pkg/codex/utils/asynciter +import pkg/codex/utils/iter import ../asynctest import ./helpers diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index a5346d481..01acd93ac 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -1,7 +1,6 @@ import ./utils/testoptions import ./utils/testkeyutils import ./utils/testasyncstatemachine -import ./utils/testasynciter import ./utils/testsafeasynciter import ./utils/testtimer import ./utils/testtrackedfutures From 235640267a66fd103f573a31c7a3922fc53d08da Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 16 Jun 2025 18:55:27 +0200 Subject: [PATCH 07/22] removes unneeded asynciter import from treehelper (it uses regular iter and not an async iter) --- codex/stores/treehelper.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex/stores/treehelper.nim b/codex/stores/treehelper.nim index e1f5d48d8..9c5506f8f 100644 --- a/codex/stores/treehelper.nim +++ b/codex/stores/treehelper.nim @@ -20,7 +20,7 @@ import pkg/questionable import pkg/questionable/results import ./blockstore -import ../utils/asynciter +import ../utils/iter import ../merkletree proc putSomeProofs*( From 4b9d967b9322d86dc1de11553b3ac4f79f56e8dd Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Mon, 16 Jun 2025 19:00:56 +0200 Subject: [PATCH 08/22] removes unneeded asynciter import from indexingstrategy (it uses regular iter and not an async iter) --- codex/indexingstrategy.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex/indexingstrategy.nim b/codex/indexingstrategy.nim index 99a5d12dd..acfffc1a0 100644 --- a/codex/indexingstrategy.nim +++ b/codex/indexingstrategy.nim @@ -1,6 +1,6 @@ import ./errors import ./utils -import ./utils/asynciter +import ./utils/iter {.push raises: [].} From b158d437299c017d4ff7874f62e551eba1219420 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 17 Jun 2025 02:27:48 +0200 Subject: [PATCH 09/22] adds missing mapFuture operation to SafeAsyncIter --- codex/utils/safeasynciter.nim | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/codex/utils/safeasynciter.nim b/codex/utils/safeasynciter.nim index d582fec3b..3db0c70f0 100644 --- a/codex/utils/safeasynciter.nim +++ b/codex/utils/safeasynciter.nim @@ -34,6 +34,7 @@ import ./iter ## - next - to get the next item from the async iterator ## - items - to iterate over the async iterator ## - pairs - to iterate over the async iterator and return the index of each item +## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures ## - mapAsync - to convert a regular sync iterator (Iter) to an async iter (SafeAsyncIter) ## - map - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter) ## - mapFilter - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter) and apply filtering at the same time @@ -150,6 +151,12 @@ iterator pairs*[T](self: SafeAsyncIter[T]): auto {.inline.} = yield (i, self.next()) inc(i) +proc mapFuture*[T, U]( + fut: auto, fn: SafeFunction[T, U] +): Future[U] {.async: (raises: [CancelledError]).} = + let t = await fut + await fn(t) + proc mapAsync*[T, U]( iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true ): SafeAsyncIter[U] = From 793802477c8c60fd6e58fe28edd43945ba228bc0 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 17 Jun 2025 02:28:29 +0200 Subject: [PATCH 10/22] Removes redundant import of asynciter in node --- codex/node.nim | 1 - 1 file changed, 1 deletion(-) diff --git a/codex/node.nim b/codex/node.nim index e010b0854..eb6f9bb10 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -44,7 +44,6 @@ import ./indexingstrategy import ./utils import ./errors import ./logutils -import ./utils/asynciter import ./utils/trackedfutures export logutils From 9fe75510de7f718e2c8fbbbcd3a47982c5bdbfe2 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 17 Jun 2025 02:29:04 +0200 Subject: [PATCH 11/22] updates exports related to async iter in utils --- codex/utils.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codex/utils.nim b/codex/utils.nim index 9cea427e0..93427b439 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -18,10 +18,10 @@ import pkg/chronos import ./utils/asyncheapqueue import ./utils/fileutils -import ./utils/asynciter +import ./utils/iter import ./utils/safeasynciter -export asyncheapqueue, fileutils, asynciter, safeasynciter, chronos +export asyncheapqueue, fileutils, iter, safeasynciter, chronos when defined(posix): import os, posix From 88c6a7291ddea43b3bd229aeb7dfb8b15883dcbb Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 17 Jun 2025 02:30:21 +0200 Subject: [PATCH 12/22] replaces the old async iter with the new safe async iter in erasure, builder, and treehelper --- codex/erasure/erasure.nim | 102 +++++++++++++++++--------------- codex/slots/builder/builder.nim | 3 +- codex/stores/treehelper.nim | 8 ++- 3 files changed, 61 insertions(+), 52 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 95516500f..8fb667fb2 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -28,7 +28,6 @@ import ../stores import ../clock import ../blocktype as bt import ../utils -import ../utils/asynciter import ../indexingstrategy import ../errors import ../utils/arrayutils @@ -122,14 +121,19 @@ func indexToPos(steps, idx, step: int): int {.inline.} = proc getPendingBlocks( self: Erasure, manifest: Manifest, indices: seq[int] -): AsyncIter[(?!bt.Block, int)] = +): SafeAsyncIter[(?!bt.Block, int)] = ## Get pending blocks iterator ## + + if indicies.len == 0: + trace "No indicies to fetch blocks for", treeCid = manifest.treeCid + return SafeAsyncIter[(?!bt.Block, int)].empty() + var pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[] proc attachIndex( fut: Future[?!bt.Block], i: int - ): Future[(?!bt.Block, int)] {.async.} = + ): Future[(?!bt.Block, int)] {.async: (raises: [CancelledError]).} = ## avoids closure capture issues return (await fut, i) @@ -141,20 +145,25 @@ proc getPendingBlocks( proc isFinished(): bool = pendingBlocks.len == 0 - proc genNext(): Future[(?!bt.Block, int)] {.async.} = - let completedFut = await one(pendingBlocks) - if (let i = pendingBlocks.find(completedFut); i >= 0): - pendingBlocks.del(i) - return await completedFut - else: - let (_, index) = await completedFut - raise newException( - CatchableError, - "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & - $index, - ) + proc genNext(): Future[?!(?!bt.Block, int)] {.async: (raises: [CancelledError]).} = + try: + let completedFut = await one(pendingBlocks) + if (let i = pendingBlocks.find(completedFut); i >= 0): + pendingBlocks.del(i) + return success(await completedFut) + else: + let (_, index) = await completedFut + return failure( + "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & + $index + ) + except ValueError as err: + # ValueError is raised by `one` when the pendingBlocks is empty - + # but we check for that at the very beginning - + # thus, if this happens, we raise an assert + raiseAssert("fatal: pendingBlocks is empty - this should never happen") - AsyncIter[(?!bt.Block, int)].new(genNext, isFinished) + SafeAsyncIter[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( self: Erasure, @@ -164,7 +173,7 @@ proc prepareEncodingData( data: ref seq[seq[byte]], cids: ref seq[Cid], emptyBlock: seq[byte], -): Future[?!Natural] {.async.} = +): Future[?!Natural] {.async: (raises: [CancelledError, IndexingError]).} = ## Prepare data for encoding ## @@ -178,7 +187,9 @@ proc prepareEncodingData( var resolved = 0 for fut in pendingBlocksIter: - let (blkOrErr, idx) = await fut + let pendingBlocksRes = await fut + without (blkOrErr, idx) =? pendingBlocksRes, err: + return failure(err) without blk =? blkOrErr, err: warn "Failed retrieving a block", treeCid = manifest.treeCid, idx, msg = err.msg return failure(err) @@ -208,7 +219,7 @@ proc prepareDecodingData( parityData: ref seq[seq[byte]], cids: ref seq[Cid], emptyBlock: seq[byte], -): Future[?!(Natural, Natural)] {.async.} = +): Future[?!(Natural, Natural)] {.async: (raises: [CancelledError, IndexingError]).} = ## Prepare data for decoding ## `encoded` - the encoded manifest ## `step` - the current step @@ -235,7 +246,9 @@ proc prepareDecodingData( if resolved >= encoded.ecK: break - let (blkOrErr, idx) = await fut + let pendingBlocksRes = await fut + without (blkOrErr, idx) =? pendingBlocksRes, err: + return failure(err) without blk =? blkOrErr, err: trace "Failed retrieving a block", idx, treeCid = encoded.treeCid, msg = err.msg continue @@ -362,7 +375,7 @@ proc asyncEncode*( proc encodeData( self: Erasure, manifest: Manifest, params: EncodingParams -): Future[?!Manifest] {.async.} = +): Future[?!Manifest] {.async: (raises: [CancelledError]).} = ## Encode blocks pointed to by the protected manifest ## ## `manifest` - the manifest to encode @@ -403,15 +416,12 @@ proc encodeData( trace "Erasure coding data", data = data[].len - try: - if err =? ( - await self.asyncEncode( - manifest.blockSize.int, params.ecK, params.ecM, data, parity - ) - ).errorOption: - return failure(err) - except CancelledError as exc: - raise exc + if err =? ( + await self.asyncEncode( + manifest.blockSize.int, params.ecK, params.ecM, data, parity + ) + ).errorOption: + return failure(err) var idx = params.rounded + step for j in 0 ..< params.ecM: @@ -451,9 +461,9 @@ proc encodeData( except CancelledError as exc: trace "Erasure coding encoding cancelled" raise exc # cancellation needs to be propagated - except CatchableError as exc: - trace "Erasure coding encoding error", exc = exc.msg - return failure(exc) + except IndexingError as err: + trace "Erasure coding encoding indexing error", error = err.msg + return failure(err) proc encode*( self: Erasure, @@ -461,7 +471,7 @@ proc encode*( blocks: Natural, parity: Natural, strategy = SteppedStrategy, -): Future[?!Manifest] {.async.} = +): Future[?!Manifest] {.async: (raises: [CancelledError]).} = ## Encode a manifest into one that is erasure protected. ## ## `manifest` - the original manifest to be encoded @@ -554,7 +564,8 @@ proc asyncDecode*( proc decodeInternal( self: Erasure, encoded: Manifest -): Future[?!(ref seq[Cid], seq[Natural])] {.async.} = +): Future[?!(ref seq[Cid], seq[Natural])] {.async: (raises: [CancelledError]).} = + logScope: steps = encoded.steps rounded_blocks = encoded.rounded @@ -597,15 +608,12 @@ proc decodeInternal( continue trace "Erasure decoding data" - try: - if err =? ( - await self.asyncDecode( - encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered - ) - ).errorOption: - return failure(err) - except CancelledError as exc: - raise exc + if err =? ( + await self.asyncDecode( + encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered + ) + ).errorOption: + return failure(err) for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step @@ -630,9 +638,9 @@ proc decodeInternal( except CancelledError as exc: trace "Erasure coding decoding cancelled" raise exc # cancellation needs to be propagated - except CatchableError as exc: - trace "Erasure coding decoding error", exc = exc.msg - return failure(exc) + except IndexingError as err: + trace "Erasure coding decoding indexing error", error = err.msg + return failure(err) finally: decoder.release() diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 5fbb0fe19..9222df152 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -24,12 +24,11 @@ import ../../utils import ../../stores import ../../manifest import ../../merkletree -import ../../utils/asynciter import ../../indexingstrategy import ../converters -export converters, asynciter +export converters, iter logScope: topics = "codex slotsbuilder" diff --git a/codex/stores/treehelper.nim b/codex/stores/treehelper.nim index 9c5506f8f..96aab34f4 100644 --- a/codex/stores/treehelper.nim +++ b/codex/stores/treehelper.nim @@ -25,7 +25,7 @@ import ../merkletree proc putSomeProofs*( store: BlockStore, tree: CodexTree, iter: Iter[int] -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = without treeCid =? tree.rootCid, err: return failure(err) @@ -51,8 +51,10 @@ proc putSomeProofs*( proc putSomeProofs*( store: BlockStore, tree: CodexTree, iter: Iter[Natural] -): Future[?!void] = +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = store.putSomeProofs(tree, iter.map((i: Natural) => i.ord)) -proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] = +proc putAllProofs*( + store: BlockStore, tree: CodexTree +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = store.putSomeProofs(tree, Iter[int].new(0 ..< tree.leavesCount)) From ba04d59192a5c90fd427a24d85616af1429f9ca8 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 17 Jun 2025 03:24:01 +0200 Subject: [PATCH 13/22] updates erasure.nim after rebasing to use SafeAsyncIter and checked exceptions --- codex/erasure/erasure.nim | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 8fb667fb2..d28aca912 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -125,14 +125,14 @@ proc getPendingBlocks( ## Get pending blocks iterator ## - if indicies.len == 0: + if indices.len == 0: trace "No indicies to fetch blocks for", treeCid = manifest.treeCid return SafeAsyncIter[(?!bt.Block, int)].empty() - var pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[] + var pendingBlocks: seq[Future[(?!bt.Block, int)].Raising([CancelledError])] = @[] proc attachIndex( - fut: Future[?!bt.Block], i: int + fut: Future[?!bt.Block].Raising([CancelledError]), i: int ): Future[(?!bt.Block, int)] {.async: (raises: [CancelledError]).} = ## avoids closure capture issues return (await fut, i) @@ -162,7 +162,7 @@ proc getPendingBlocks( # but we check for that at the very beginning - # thus, if this happens, we raise an assert raiseAssert("fatal: pendingBlocks is empty - this should never happen") - + SafeAsyncIter[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( From bbadf63f6b74d93b16bb035df25d9763758854e2 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 24 Jun 2025 12:48:05 +0200 Subject: [PATCH 14/22] renaming: SafeAsyncIter => AsyncResultIterator --- codex/erasure/erasure.nim | 8 +-- codex/stores/blockstore.nim | 2 +- codex/stores/cachestore.nim | 4 +- codex/stores/networkstore.nim | 2 +- codex/stores/queryiterhelper.nim | 14 ++-- codex/stores/repostore/store.nim | 10 +-- codex/utils/safeasynciter.nim | 76 +++++++++++----------- tests/codex/helpers/mockrepostore.nim | 4 +- tests/codex/stores/testqueryiterhelper.nim | 2 +- tests/codex/stores/testrepostore.nim | 2 +- tests/codex/utils/testsafeasynciter.nim | 32 ++++----- 11 files changed, 78 insertions(+), 78 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index d28aca912..6a51b9e7f 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -121,13 +121,13 @@ func indexToPos(steps, idx, step: int): int {.inline.} = proc getPendingBlocks( self: Erasure, manifest: Manifest, indices: seq[int] -): SafeAsyncIter[(?!bt.Block, int)] = +): AsyncResultIterator[(?!bt.Block, int)] = ## Get pending blocks iterator ## if indices.len == 0: - trace "No indicies to fetch blocks for", treeCid = manifest.treeCid - return SafeAsyncIter[(?!bt.Block, int)].empty() + trace "No indices to fetch blocks for", treeCid = manifest.treeCid + return AsyncResultIterator[(?!bt.Block, int)].empty() var pendingBlocks: seq[Future[(?!bt.Block, int)].Raising([CancelledError])] = @[] @@ -163,7 +163,7 @@ proc getPendingBlocks( # thus, if this happens, we raise an assert raiseAssert("fatal: pendingBlocks is empty - this should never happen") - SafeAsyncIter[(?!bt.Block, int)].new(genNext, isFinished) + AsyncResultIterator[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( self: Erasure, diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index e436577c1..f841f3457 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -154,7 +154,7 @@ method hasBlock*( method listBlocks*( self: BlockStore, blockType = BlockType.Manifest -): Future[?!SafeAsyncIter[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} = +): Future[?!AsyncResultIterator[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index ff3fd6df9..7e53655ca 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -139,7 +139,7 @@ func cids(self: CacheStore): (iterator (): Cid {.gcsafe, raises: [].}) = method listBlocks*( self: CacheStore, blockType = BlockType.Manifest -): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} = +): Future[?!AsyncResultIterator[Cid]] {.async: (raises: [CancelledError]).} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## @@ -152,7 +152,7 @@ method listBlocks*( success(cids()) let iter = await ( - SafeAsyncIter[Cid].new(genNext, isFinished).filter( + AsyncResultIterator[Cid].new(genNext, isFinished).filter( proc(cid: ?!Cid): Future[bool] {.async: (raises: [CancelledError]).} = without cid =? cid, err: trace "Cannot get Cid from the iterator", err = err.msg diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 06b96b778..f8f5d2af7 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -127,7 +127,7 @@ method ensureExpiry*( method listBlocks*( self: NetworkStore, blockType = BlockType.Manifest -): Future[?!SafeAsyncIter[Cid]] {.async: (raw: true, raises: [CancelledError]).} = +): Future[?!AsyncResultIterator[Cid]] {.async: (raw: true, raises: [CancelledError]).} = self.localStore.listBlocks(blockType) method delBlock*( diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index 699eeeffa..21399fa0c 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -10,10 +10,10 @@ import ../utils/safeasynciter type KeyVal*[T] = tuple[key: Key, value: T] -proc toSafeAsyncIter*[T]( +proc toAsyncResultIterator*[T]( queryIter: QueryIter[T], finishOnErr: bool = true -): Future[?!SafeAsyncIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} = - ## Converts `QueryIter[T]` to `SafeAsyncIter[QueryResponse[T]]` and automatically +): Future[?!AsyncResultIterator[QueryResponse[T]]] {.async: (raises: [CancelledError]).} = + ## Converts `QueryIter[T]` to `AsyncResultIterator[QueryResponse[T]]` and automatically ## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only ## if the flag finishOnErr is set to true) ## @@ -22,7 +22,7 @@ proc toSafeAsyncIter*[T]( trace "Disposing iterator" if error =? (await queryIter.dispose()).errorOption: return failure(error) - return success(SafeAsyncIter[QueryResponse[T]].empty()) + return success(AsyncResultIterator[QueryResponse[T]].empty()) var errOccurred = false @@ -42,11 +42,11 @@ proc toSafeAsyncIter*[T]( proc isFinished(): bool = queryIter.finished - SafeAsyncIter[QueryResponse[T]].new(genNext, isFinished).success + AsyncResultIterator[QueryResponse[T]].new(genNext, isFinished).success proc filterSuccess*[T]( - iter: SafeAsyncIter[QueryResponse[T]] -): Future[SafeAsyncIter[tuple[key: Key, value: T]]] {. + iter: AsyncResultIterator[QueryResponse[T]] +): Future[AsyncResultIterator[tuple[key: Key, value: T]]] {. async: (raises: [CancelledError]) .} = ## Filters out any items that are not success diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index bea2971c7..87065ace0 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -295,12 +295,12 @@ method hasBlock*( method listBlocks*( self: RepoStore, blockType = BlockType.Manifest -): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} = +): Future[?!AsyncResultIterator[Cid]] {.async: (raises: [CancelledError]).} = ## Get the list of blocks in the RepoStore. ## This is an intensive operation ## - var iter = SafeAsyncIter[Cid]() + var iter = AsyncResultIterator[Cid]() let key = case blockType @@ -346,7 +346,7 @@ proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} = method getBlockExpirations*( self: RepoStore, maxNumber: int, offset: int -): Future[?!SafeAsyncIter[BlockExpiration]] {. +): Future[?!AsyncResultIterator[BlockExpiration]] {. async: (raises: [CancelledError]), base, gcsafe .} = ## Get iterator with block expirations @@ -360,11 +360,11 @@ method getBlockExpirations*( error "Unable to execute block expirations query", err = err.msg return failure(err) - without asyncQueryIter =? (await queryIter.toSafeAsyncIter()), err: + without asyncQueryIter =? (await queryIter.toAsyncResultIterator()), err: error "Unable to convert QueryIter to AsyncIter", err = err.msg return failure(err) - let filteredIter: SafeAsyncIter[KeyVal[BlockMetadata]] = + let filteredIter: AsyncResultIterator[KeyVal[BlockMetadata]] = await asyncQueryIter.filterSuccess() proc mapping( diff --git a/codex/utils/safeasynciter.nim b/codex/utils/safeasynciter.nim index 3db0c70f0..838474652 100644 --- a/codex/utils/safeasynciter.nim +++ b/codex/utils/safeasynciter.nim @@ -17,7 +17,7 @@ import pkg/chronos import ./iter -## SafeAsyncIter[T] is similar to `AsyncIter[Future[T]]` +## AsyncResultIterator[T] is similar to `AsyncIter[Future[T]]` ## but does not throw exceptions others than CancelledError. ## It is thus way easier to use with checked exceptions ## @@ -28,19 +28,19 @@ import ./iter ## - next - allows to set a custom function to be called when the next item is requested ## ## Operations: -## - new - to create a new async iterator (SafeAsyncIter) +## - new - to create a new async iterator (AsyncResultIterator) ## - finish - to finish the async iterator ## - finished - to check if the async iterator is finished ## - next - to get the next item from the async iterator ## - items - to iterate over the async iterator ## - pairs - to iterate over the async iterator and return the index of each item ## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures -## - mapAsync - to convert a regular sync iterator (Iter) to an async iter (SafeAsyncIter) -## - map - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter) -## - mapFilter - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter) and apply filtering at the same time -## - filter - to filter an async iterator (SafeAsyncIter) returning another async iterator (SafeAsyncIter) +## - mapAsync - to convert a regular sync iterator (Iter) to an async iter (AsyncResultIterator) +## - map - to convert one async iterator (AsyncResultIterator) to another async iter (AsyncResultIterator) +## - mapFilter - to convert one async iterator (AsyncResultIterator) to another async iter (AsyncResultIterator) and apply filtering at the same time +## - filter - to filter an async iterator (AsyncResultIterator) returning another async iterator (AsyncResultIterator) ## - delayBy - to delay each item returned by async iter by a given duration -## - empty - to create an empty async iterator (SafeAsyncIter) +## - empty - to create an empty async iterator (AsyncResultIterator) type SafeFunction[T, U] = @@ -48,7 +48,7 @@ type SafeIsFinished = proc(): bool {.raises: [], gcsafe, closure.} SafeGenNext[T] = proc(): Future[T] {.async: (raises: [CancelledError]), gcsafe.} - SafeAsyncIter*[T] = ref object + AsyncResultIterator*[T] = ref object finished: bool next*: SafeGenNext[?!T] @@ -65,20 +65,20 @@ proc flatMap[T, U]( await fn(t) ######################################################################## -## SafeAsyncIter public interface methods +## AsyncResultIterator public interface methods ######################################################################## proc new*[T]( - _: type SafeAsyncIter[T], + _: type AsyncResultIterator[T], genNext: SafeGenNext[?!T], isFinished: IsFinished, finishOnErr: bool = true, -): SafeAsyncIter[T] = +): AsyncResultIterator[T] = ## Creates a new Iter using elements returned by supplier function `genNext`. ## Iter is finished whenever `isFinished` returns true. ## - var iter = SafeAsyncIter[T]() + var iter = AsyncResultIterator[T]() proc next(): Future[?!T] {.async: (raises: [CancelledError]).} = try: @@ -91,7 +91,7 @@ proc new*[T]( iter.finished = true return item else: - return failure("SafeAsyncIter is finished but next item was requested") + return failure("AsyncResultIterator is finished but next item was requested") except CancelledError as err: iter.finished = true raise err @@ -105,11 +105,11 @@ proc new*[T]( # forward declaration proc mapAsync*[T, U]( iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true -): SafeAsyncIter[U] +): AsyncResultIterator[U] proc new*[U, V: Ordinal]( - _: type SafeAsyncIter[U], slice: HSlice[U, V], finishOnErr: bool = true -): SafeAsyncIter[U] = + _: type AsyncResultIterator[U], slice: HSlice[U, V], finishOnErr: bool = true +): AsyncResultIterator[U] = ## Creates new Iter from a slice ## @@ -122,8 +122,8 @@ proc new*[U, V: Ordinal]( ) proc new*[U, V, S: Ordinal]( - _: type SafeAsyncIter[U], a: U, b: V, step: S = 1, finishOnErr: bool = true -): SafeAsyncIter[U] = + _: type AsyncResultIterator[U], a: U, b: V, step: S = 1, finishOnErr: bool = true +): AsyncResultIterator[U] = ## Creates new Iter in range a..b with specified step (default 1) ## @@ -135,17 +135,17 @@ proc new*[U, V, S: Ordinal]( finishOnErr = finishOnErr, ) -proc finish*[T](self: SafeAsyncIter[T]): void = +proc finish*[T](self: AsyncResultIterator[T]): void = self.finished = true -proc finished*[T](self: SafeAsyncIter[T]): bool = +proc finished*[T](self: AsyncResultIterator[T]): bool = self.finished -iterator items*[T](self: SafeAsyncIter[T]): auto {.inline.} = +iterator items*[T](self: AsyncResultIterator[T]): auto {.inline.} = while not self.finished: yield self.next() -iterator pairs*[T](self: SafeAsyncIter[T]): auto {.inline.} = +iterator pairs*[T](self: AsyncResultIterator[T]): auto {.inline.} = var i = 0 while not self.finished: yield (i, self.next()) @@ -159,27 +159,27 @@ proc mapFuture*[T, U]( proc mapAsync*[T, U]( iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true -): SafeAsyncIter[U] = - SafeAsyncIter[U].new( +): AsyncResultIterator[U] = + AsyncResultIterator[U].new( genNext = () => fn(iter.next()), isFinished = () => iter.finished(), finishOnErr = finishOnErr, ) proc map*[T, U]( - iter: SafeAsyncIter[T], fn: SafeFunction[?!T, ?!U], finishOnErr: bool = true -): SafeAsyncIter[U] = - SafeAsyncIter[U].new( + iter: AsyncResultIterator[T], fn: SafeFunction[?!T, ?!U], finishOnErr: bool = true +): AsyncResultIterator[U] = + AsyncResultIterator[U].new( genNext = () => iter.next().flatMap(fn), isFinished = () => iter.finished, finishOnErr = finishOnErr, ) proc mapFilter*[T, U]( - iter: SafeAsyncIter[T], + iter: AsyncResultIterator[T], mapPredicate: SafeFunction[?!T, Option[?!U]], finishOnErr: bool = true, -): Future[SafeAsyncIter[U]] {.async: (raises: [CancelledError]).} = +): Future[AsyncResultIterator[U]] {.async: (raises: [CancelledError]).} = var nextU: Option[?!U] proc filter(): Future[void] {.async: (raises: [CancelledError]).} = @@ -199,11 +199,11 @@ proc mapFilter*[T, U]( nextU.isNone await filter() - SafeAsyncIter[U].new(genNext, isFinished, finishOnErr = finishOnErr) + AsyncResultIterator[U].new(genNext, isFinished, finishOnErr = finishOnErr) proc filter*[T]( - iter: SafeAsyncIter[T], predicate: SafeFunction[?!T, bool], finishOnErr: bool = true -): Future[SafeAsyncIter[T]] {.async: (raises: [CancelledError]).} = + iter: AsyncResultIterator[T], predicate: SafeFunction[?!T, bool], finishOnErr: bool = true +): Future[AsyncResultIterator[T]] {.async: (raises: [CancelledError]).} = proc wrappedPredicate( t: ?!T ): Future[Option[?!T]] {.async: (raises: [CancelledError]).} = @@ -215,8 +215,8 @@ proc filter*[T]( await mapFilter[T, T](iter, wrappedPredicate, finishOnErr = finishOnErr) proc delayBy*[T]( - iter: SafeAsyncIter[T], d: Duration, finishOnErr: bool = true -): SafeAsyncIter[T] = + iter: AsyncResultIterator[T], d: Duration, finishOnErr: bool = true +): AsyncResultIterator[T] = ## Delays emitting each item by given duration ## @@ -228,14 +228,14 @@ proc delayBy*[T]( finishOnErr = finishOnErr, ) -proc empty*[T](_: type SafeAsyncIter[T]): SafeAsyncIter[T] = - ## Creates an empty SafeAsyncIter +proc empty*[T](_: type AsyncResultIterator[T]): AsyncResultIterator[T] = + ## Creates an empty AsyncResultIterator ## proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} = - T.failure("Next item requested from an empty SafeAsyncIter") + T.failure("Next item requested from an empty AsyncResultIterator") proc isFinished(): bool = true - SafeAsyncIter[T].new(genNext, isFinished) + AsyncResultIterator[T].new(genNext, isFinished) diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index 36784055c..ec46e767f 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -32,7 +32,7 @@ method delBlock*( method getBlockExpirations*( self: MockRepoStore, maxNumber: int, offset: int -): Future[?!SafeAsyncIter[BlockExpiration]] {.async: (raises: [CancelledError]).} = +): Future[?!AsyncResultIterator[BlockExpiration]] {.async: (raises: [CancelledError]).} = self.getBeMaxNumber = maxNumber self.getBeOffset = offset @@ -41,7 +41,7 @@ method getBlockExpirations*( limit = min(offset + maxNumber, len(testBlockExpirationsCpy)) let - iter1 = SafeAsyncIter[int].new(offset ..< limit) + iter1 = AsyncResultIterator[int].new(offset ..< limit) iter2 = map[int, BlockExpiration]( iter1, proc(i: ?!int): Future[?!BlockExpiration] {.async: (raises: [CancelledError]).} = diff --git a/tests/codex/stores/testqueryiterhelper.nim b/tests/codex/stores/testqueryiterhelper.nim index e8f00e9cb..6eef4f3bb 100644 --- a/tests/codex/stores/testqueryiterhelper.nim +++ b/tests/codex/stores/testqueryiterhelper.nim @@ -43,7 +43,7 @@ asyncchecksuite "Test QueryIter helper": queryIter.dispose = () => (disposed = true; iterDispose()) let - iter1 = (await toSafeAsyncIter[string](queryIter)).tryGet() + iter1 = (await toAsyncResultIterator[string](queryIter)).tryGet() iter2 = await filterSuccess[string](iter1) var items = initTable[string, string]() diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 1a2f58ab0..cd007c83e 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -293,7 +293,7 @@ asyncchecksuite "RepoStore": test "Should retrieve block expiration information": proc unpack( - beIter: Future[?!SafeAsyncIter[BlockExpiration]].Raising([CancelledError]) + beIter: Future[?!AsyncResultIterator[BlockExpiration]].Raising([CancelledError]) ): Future[seq[BlockExpiration]] {.async: (raises: [CancelledError]).} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: diff --git a/tests/codex/utils/testsafeasynciter.nim b/tests/codex/utils/testsafeasynciter.nim index 1aeba4d2e..ebdc9eaff 100644 --- a/tests/codex/utils/testsafeasynciter.nim +++ b/tests/codex/utils/testsafeasynciter.nim @@ -7,9 +7,9 @@ import pkg/codex/utils/safeasynciter import ../../asynctest import ../helpers -asyncchecksuite "Test SafeAsyncIter": +asyncchecksuite "Test AsyncResultIterator": test "Should be finished": - let iter = SafeAsyncIter[int].empty() + let iter = AsyncResultIterator[int].empty() check: iter.finished == true @@ -24,7 +24,7 @@ asyncchecksuite "Test SafeAsyncIter": fut.complete(success(intIter.next())) return fut - let iter = SafeAsyncIter[int].new(asyncGen, () => intIter.finished) + let iter = AsyncResultIterator[int].new(asyncGen, () => intIter.finished) var collected: seq[int] for iFut in iter: @@ -37,10 +37,10 @@ asyncchecksuite "Test SafeAsyncIter": check collected == expectedSeq let nextRes = await iter.next() assert nextRes.isFailure - check nextRes.error.msg == "SafeAsyncIter is finished but next item was requested" + check nextRes.error.msg == "AsyncResultIterator is finished but next item was requested" test "getting async iter for simple sync range iterator": - let iter1 = SafeAsyncIter[int].new(0 ..< 5) + let iter1 = AsyncResultIterator[int].new(0 ..< 5) var collected: seq[int] for iFut in iter1: @@ -53,7 +53,7 @@ asyncchecksuite "Test SafeAsyncIter": collected == @[0, 1, 2, 3, 4] test "Should map each item using `map`": - let iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + let iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) let iter2 = map[int, string]( iter1, @@ -77,7 +77,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Should leave only odd items using `filter`": let - iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) iter2 = await filter[int]( iter1, proc(i: ?!int): Future[bool] {.async: (raises: [CancelledError]).} = @@ -100,7 +100,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Should leave only odd items using `mapFilter`": let - iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = @@ -123,7 +123,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Collecting errors on `map` when finish on error is true": let - iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -151,7 +151,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Collecting errors on `map` when finish on error is false": let - iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -180,7 +180,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Collecting errors on `map` when errors are mixed with successes": let - iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -209,7 +209,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Collecting errors on `mapFilter` when finish on error is true": let - iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = @@ -239,7 +239,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Collecting errors on `mapFilter` when finish on error is false": let - iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = @@ -270,7 +270,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Collecting errors on `filter` when finish on error is false": let - iter1 = SafeAsyncIter[int].new(0 ..< 5) + iter1 = AsyncResultIterator[int].new(0 ..< 5) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -313,7 +313,7 @@ asyncchecksuite "Test SafeAsyncIter": test "Collecting errors on `filter` when finish on error is true": let - iter1 = SafeAsyncIter[int].new(0 ..< 5) + iter1 = AsyncResultIterator[int].new(0 ..< 5) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -384,7 +384,7 @@ asyncchecksuite "Test SafeAsyncIter": let fut: Future[Option[?!string]].Raising([CancelledError]) = Future[Option[?!string]].Raising([CancelledError]).init("testsafeasynciter") - let iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + let iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) let iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = From f2f7059a7828cfabc20e52126e86e5d8d8294efa Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 24 Jun 2025 12:55:15 +0200 Subject: [PATCH 15/22] formatting --- codex/erasure/erasure.nim | 1 - codex/stores/blockstore.nim | 4 +++- tests/codex/utils/testsafeasynciter.nim | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 6a51b9e7f..8cfd676ce 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -565,7 +565,6 @@ proc asyncDecode*( proc decodeInternal( self: Erasure, encoded: Manifest ): Future[?!(ref seq[Cid], seq[Natural])] {.async: (raises: [CancelledError]).} = - logScope: steps = encoded.steps rounded_blocks = encoded.rounded diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index f841f3457..2c8967a2d 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -154,7 +154,9 @@ method hasBlock*( method listBlocks*( self: BlockStore, blockType = BlockType.Manifest -): Future[?!AsyncResultIterator[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} = +): Future[?!AsyncResultIterator[Cid]] {. + base, async: (raises: [CancelledError]), gcsafe +.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## diff --git a/tests/codex/utils/testsafeasynciter.nim b/tests/codex/utils/testsafeasynciter.nim index ebdc9eaff..d405273f4 100644 --- a/tests/codex/utils/testsafeasynciter.nim +++ b/tests/codex/utils/testsafeasynciter.nim @@ -37,7 +37,8 @@ asyncchecksuite "Test AsyncResultIterator": check collected == expectedSeq let nextRes = await iter.next() assert nextRes.isFailure - check nextRes.error.msg == "AsyncResultIterator is finished but next item was requested" + check nextRes.error.msg == + "AsyncResultIterator is finished but next item was requested" test "getting async iter for simple sync range iterator": let iter1 = AsyncResultIterator[int].new(0 ..< 5) From e1b7086343944ac43a0a9ee3da899bdc9e174c9e Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 24 Jun 2025 12:58:01 +0200 Subject: [PATCH 16/22] AsyncResultIterator: update internal names --- codex/utils/safeasynciter.nim | 38 ++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/codex/utils/safeasynciter.nim b/codex/utils/safeasynciter.nim index 838474652..95132050f 100644 --- a/codex/utils/safeasynciter.nim +++ b/codex/utils/safeasynciter.nim @@ -17,10 +17,11 @@ import pkg/chronos import ./iter -## AsyncResultIterator[T] is similar to `AsyncIter[Future[T]]` +## AsyncResultIterator[T] is similar to `AsyncIterator[Future[T]]` ## but does not throw exceptions others than CancelledError. -## It is thus way easier to use with checked exceptions -## +## +## Instead of throwing exception, it uses Result to communicate errors ( +## thus the name AsyncResultIterator). ## ## Public interface: ## @@ -43,23 +44,24 @@ import ./iter ## - empty - to create an empty async iterator (AsyncResultIterator) type - SafeFunction[T, U] = + AsyncResultIteratorFunc[T, U] = proc(fut: T): Future[U] {.async: (raises: [CancelledError]), gcsafe, closure.} - SafeIsFinished = proc(): bool {.raises: [], gcsafe, closure.} - SafeGenNext[T] = proc(): Future[T] {.async: (raises: [CancelledError]), gcsafe.} + AsyncResultIteratorIsFinished = proc(): bool {.raises: [], gcsafe, closure.} + AsyncResultIteratorGenNext[T] = + proc(): Future[T] {.async: (raises: [CancelledError]), gcsafe.} AsyncResultIterator*[T] = ref object finished: bool - next*: SafeGenNext[?!T] + next*: AsyncResultIteratorGenNext[?!T] proc flatMap[T, U]( - fut: auto, fn: SafeFunction[?!T, ?!U] + fut: auto, fn: AsyncResultIteratorFunc[?!T, ?!U] ): Future[?!U] {.async: (raises: [CancelledError]).} = let t = await fut await fn(t) proc flatMap[T, U]( - fut: auto, fn: SafeFunction[?!T, Option[?!U]] + fut: auto, fn: AsyncResultIteratorFunc[?!T, Option[?!U]] ): Future[Option[?!U]] {.async: (raises: [CancelledError]).} = let t = await fut await fn(t) @@ -70,7 +72,7 @@ proc flatMap[T, U]( proc new*[T]( _: type AsyncResultIterator[T], - genNext: SafeGenNext[?!T], + genNext: AsyncResultIteratorGenNext[?!T], isFinished: IsFinished, finishOnErr: bool = true, ): AsyncResultIterator[T] = @@ -104,7 +106,7 @@ proc new*[T]( # forward declaration proc mapAsync*[T, U]( - iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true + iter: Iter[T], fn: AsyncResultIteratorFunc[T, ?!U], finishOnErr: bool = true ): AsyncResultIterator[U] proc new*[U, V: Ordinal]( @@ -152,13 +154,13 @@ iterator pairs*[T](self: AsyncResultIterator[T]): auto {.inline.} = inc(i) proc mapFuture*[T, U]( - fut: auto, fn: SafeFunction[T, U] + fut: auto, fn: AsyncResultIteratorFunc[T, U] ): Future[U] {.async: (raises: [CancelledError]).} = let t = await fut await fn(t) proc mapAsync*[T, U]( - iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true + iter: Iter[T], fn: AsyncResultIteratorFunc[T, ?!U], finishOnErr: bool = true ): AsyncResultIterator[U] = AsyncResultIterator[U].new( genNext = () => fn(iter.next()), @@ -167,7 +169,9 @@ proc mapAsync*[T, U]( ) proc map*[T, U]( - iter: AsyncResultIterator[T], fn: SafeFunction[?!T, ?!U], finishOnErr: bool = true + iter: AsyncResultIterator[T], + fn: AsyncResultIteratorFunc[?!T, ?!U], + finishOnErr: bool = true, ): AsyncResultIterator[U] = AsyncResultIterator[U].new( genNext = () => iter.next().flatMap(fn), @@ -177,7 +181,7 @@ proc map*[T, U]( proc mapFilter*[T, U]( iter: AsyncResultIterator[T], - mapPredicate: SafeFunction[?!T, Option[?!U]], + mapPredicate: AsyncResultIteratorFunc[?!T, Option[?!U]], finishOnErr: bool = true, ): Future[AsyncResultIterator[U]] {.async: (raises: [CancelledError]).} = var nextU: Option[?!U] @@ -202,7 +206,9 @@ proc mapFilter*[T, U]( AsyncResultIterator[U].new(genNext, isFinished, finishOnErr = finishOnErr) proc filter*[T]( - iter: AsyncResultIterator[T], predicate: SafeFunction[?!T, bool], finishOnErr: bool = true + iter: AsyncResultIterator[T], + predicate: AsyncResultIteratorFunc[?!T, bool], + finishOnErr: bool = true, ): Future[AsyncResultIterator[T]] {.async: (raises: [CancelledError]).} = proc wrappedPredicate( t: ?!T From d42625a4a53c7c59ec4a50a2282a8541ea1fd6df Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 24 Jun 2025 13:03:46 +0200 Subject: [PATCH 17/22] changes the module name: safeasynciter => asyncresultiterator --- codex/stores/maintenance.nim | 2 +- codex/stores/networkstore.nim | 2 +- codex/stores/queryiterhelper.nim | 2 +- codex/utils.nim | 4 ++-- codex/utils/{safeasynciter.nim => asyncresultiterator.nim} | 0 tests/codex/helpers/mockrepostore.nim | 2 +- tests/codex/stores/testqueryiterhelper.nim | 2 +- tests/codex/stores/testrepostore.nim | 2 +- tests/codex/testutils.nim | 2 +- .../{testsafeasynciter.nim => testasyncresultiterator.nim} | 4 ++-- 10 files changed, 11 insertions(+), 11 deletions(-) rename codex/utils/{safeasynciter.nim => asyncresultiterator.nim} (100%) rename tests/codex/utils/{testsafeasynciter.nim => testasyncresultiterator.nim} (99%) diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 1d1090313..848499339 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -18,7 +18,7 @@ import pkg/questionable/results import ./repostore import ../utils/timer -import ../utils/safeasynciter +import ../utils/asyncresultiterator import ../clock import ../logutils import ../systemclock diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index f8f5d2af7..becb870f7 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -19,7 +19,7 @@ import ../blockexchange import ../logutils import ../merkletree import ../utils/asyncheapqueue -import ../utils/safeasynciter +import ../utils/asyncresultiterator import ./blockstore export blockstore, blockexchange, asyncheapqueue diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index 21399fa0c..5741d4ce2 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -4,7 +4,7 @@ import pkg/chronos import pkg/chronicles import pkg/datastore/typedds -import ../utils/safeasynciter +import ../utils/asyncresultiterator {.push raises: [].} diff --git a/codex/utils.nim b/codex/utils.nim index 93427b439..8d8aeb223 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -19,9 +19,9 @@ import pkg/chronos import ./utils/asyncheapqueue import ./utils/fileutils import ./utils/iter -import ./utils/safeasynciter +import ./utils/asyncresultiterator -export asyncheapqueue, fileutils, iter, safeasynciter, chronos +export asyncheapqueue, fileutils, iter, asyncresultiterator, chronos when defined(posix): import os, posix diff --git a/codex/utils/safeasynciter.nim b/codex/utils/asyncresultiterator.nim similarity index 100% rename from codex/utils/safeasynciter.nim rename to codex/utils/asyncresultiterator.nim diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index ec46e767f..dc63d7664 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -14,7 +14,7 @@ import pkg/questionable import pkg/questionable/results import pkg/codex/stores/repostore -import pkg/codex/utils/safeasynciter +import pkg/codex/utils/asyncresultiterator type MockRepoStore* = ref object of RepoStore delBlockCids*: seq[Cid] diff --git a/tests/codex/stores/testqueryiterhelper.nim b/tests/codex/stores/testqueryiterhelper.nim index 6eef4f3bb..6f932dbca 100644 --- a/tests/codex/stores/testqueryiterhelper.nim +++ b/tests/codex/stores/testqueryiterhelper.nim @@ -6,7 +6,7 @@ import pkg/chronos import pkg/datastore/typedds import pkg/datastore/sql/sqliteds import pkg/codex/stores/queryiterhelper -import pkg/codex/utils/safeasynciter +import pkg/codex/utils/asyncresultiterator import ../../asynctest import ../helpers diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index cd007c83e..5dd21306e 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -15,7 +15,7 @@ import pkg/codex/stores import pkg/codex/stores/repostore/operations import pkg/codex/blocktype as bt import pkg/codex/clock -import pkg/codex/utils/safeasynciter +import pkg/codex/utils/asyncresultiterator import pkg/codex/merkletree/codex import ../../asynctest diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index 01acd93ac..a0892ee43 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -1,7 +1,7 @@ import ./utils/testoptions import ./utils/testkeyutils import ./utils/testasyncstatemachine -import ./utils/testsafeasynciter +import ./utils/testasyncresultiterator import ./utils/testtimer import ./utils/testtrackedfutures diff --git a/tests/codex/utils/testsafeasynciter.nim b/tests/codex/utils/testasyncresultiterator.nim similarity index 99% rename from tests/codex/utils/testsafeasynciter.nim rename to tests/codex/utils/testasyncresultiterator.nim index d405273f4..7f89f0827 100644 --- a/tests/codex/utils/testsafeasynciter.nim +++ b/tests/codex/utils/testasyncresultiterator.nim @@ -2,7 +2,7 @@ import std/sugar import pkg/questionable import pkg/chronos import pkg/codex/utils/iter -import pkg/codex/utils/safeasynciter +import pkg/codex/utils/asyncresultiterator import ../../asynctest import ../helpers @@ -383,7 +383,7 @@ asyncchecksuite "Test AsyncResultIterator": # cancellation of the async predicate function. let fut: Future[Option[?!string]].Raising([CancelledError]) = - Future[Option[?!string]].Raising([CancelledError]).init("testsafeasynciter") + Future[Option[?!string]].Raising([CancelledError]).init("testasyncresultiterator") let iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) let iter2 = await mapFilter[int, string]( From c5c1f979c0c3d3003a1f5be4e4c7dd731389a39f Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 24 Jun 2025 15:16:44 +0200 Subject: [PATCH 18/22] refactors "Iter" and "AsyncIter" to be better separated and having adequate annotations --- codex/utils/asynciter.nim | 131 +++++++++++++++++++++++++------------- codex/utils/iter.nim | 97 ++++++++++++++++++---------- 2 files changed, 152 insertions(+), 76 deletions(-) diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index d87ff67f3..12df2f40b 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -1,3 +1,14 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + import std/sugar import pkg/questionable @@ -5,43 +16,52 @@ import pkg/chronos import ./iter -export iter - -## AsyncIter[T] is similar to `Iter[Future[T]]` with addition of methods specific to asynchronous processing +## AsyncIter[T] is similar to `Iter[Future[T]]` with +## addition of methods specific to asynchronous processing. ## - -type AsyncIter*[T] = ref object - finished: bool - next*: GenNext[Future[T]] - -proc finish*[T](self: AsyncIter[T]): void = - self.finished = true - -proc finished*[T](self: AsyncIter[T]): bool = - self.finished - -iterator items*[T](self: AsyncIter[T]): Future[T] = - while not self.finished: - yield self.next() - -iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} = - var i = 0 - while not self.finished: - yield (i, self.next()) - inc(i) - -proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} = - let t = await fut - fn(t) - -proc flatMap*[T, U](fut: Future[T], fn: Function[T, Future[U]]): Future[U] {.async.} = +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new async iterator (AsyncIter) +## - finish - to finish the async iterator +## - finished - to check if the async iterator is finished +## - next - to get the next item from the async iterator +## - items - to iterate over the async iterator +## - pairs - to iterate over the async iterator and return the index of each item +## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures +## - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncIter) +## - map - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) +## - mapFilter - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) and apply filtering at the same time +## - filter - to filter an async iterator (AsyncIter) and return another async iterator (AsyncIter) +## - delayBy - to delay each item returned by async iterator by a given duration +## - empty - to create an empty async iterator (AsyncIter) + +type + AsyncIterFunc[T, U] = + proc(fut: T): Future[U] {.async.} + AsyncIterIsFinished = proc(): bool {.raises: [], gcsafe.} + AsyncIterGenNext[T] = + proc(): Future[T] {.async.} + + AsyncIter*[T] = ref object + finished: bool + next*: AsyncIterGenNext[T] + +proc flatMap[T, U](fut: Future[T], fn: AsyncIterFunc[T, U]): Future[U] {.async.} = let t = await fut await fn(t) +######################################################################## +## AsyncIter public interface methods +######################################################################## + proc new*[T]( _: type AsyncIter[T], - genNext: GenNext[Future[T]], - isFinished: IsFinished, + genNext: AsyncIterGenNext[T], + isFinished: AsyncIterIsFinished, finishOnErr: bool = true, ): AsyncIter[T] = ## Creates a new Iter using elements returned by supplier function `genNext`. @@ -77,8 +97,8 @@ proc new*[T]( iter.next = next return iter -proc mapAsync*[T, U](iter: Iter[T], fn: Function[T, Future[U]]): AsyncIter[U] = - AsyncIter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished()) +# forward declaration +proc mapAsync*[T, U](iter: Iter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] proc new*[U, V: Ordinal](_: type AsyncIter[U], slice: HSlice[U, V]): AsyncIter[U] = ## Creates new Iter from a slice @@ -104,25 +124,36 @@ proc new*[U, V, S: Ordinal]( i, ) -proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] = - ## Creates an empty AsyncIter - ## +proc finish*[T](self: AsyncIter[T]): void = + self.finished = true - proc genNext(): Future[T] {.raises: [CatchableError].} = - raise newException(CatchableError, "Next item requested from an empty AsyncIter") +proc finished*[T](self: AsyncIter[T]): bool = + self.finished - proc isFinished(): bool = - true +iterator items*[T](self: AsyncIter[T]): Future[T] = + while not self.finished: + yield self.next() + +iterator pairs*[T](self: AsyncIter[T]): tuple[key: int, val: Future[T]] {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) + +proc mapFuture*[T, U](fut: Future[T], fn: AsyncIterFunc[T, U]): Future[U] {.async.} = + let t = await fut + fn(t) - AsyncIter[T].new(genNext, isFinished) +proc mapAsync*[T, U](iter: Iter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] = + AsyncIter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished()) -proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] = +proc map*[T, U](iter: AsyncIter[T], fn: AsyncIterFunc[T, U]): AsyncIter[U] = AsyncIter[U].new( genNext = () => iter.next().flatMap(fn), isFinished = () => iter.finished ) proc mapFilter*[T, U]( - iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]] + iter: AsyncIter[T], mapPredicate: AsyncIterFunc[T, Option[U]] ): Future[AsyncIter[U]] {.async: (raises: [CancelledError]).} = var nextFutU: Option[Future[U]] @@ -156,7 +187,7 @@ proc mapFilter*[T, U]( AsyncIter[U].new(genNext, isFinished) proc filter*[T]( - iter: AsyncIter[T], predicate: Function[T, Future[bool]] + iter: AsyncIter[T], predicate: AsyncIterFunc[T, bool] ): Future[AsyncIter[T]] {.async: (raises: [CancelledError]).} = proc wrappedPredicate(t: T): Future[Option[T]] {.async.} = if await predicate(t): @@ -176,3 +207,15 @@ proc delayBy*[T](iter: AsyncIter[T], d: Duration): AsyncIter[T] = await sleepAsync(d) t, ) + +proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] = + ## Creates an empty AsyncIter + ## + + proc genNext(): Future[T] {.async.} = + raise newException(CatchableError, "Next item requested from an empty AsyncIter") + + proc isFinished(): bool = + true + + AsyncIter[T].new(genNext, isFinished) \ No newline at end of file diff --git a/codex/utils/iter.nim b/codex/utils/iter.nim index 9afd6c129..56c0f206d 100644 --- a/codex/utils/iter.nim +++ b/codex/utils/iter.nim @@ -1,37 +1,54 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + import std/sugar import pkg/questionable import pkg/questionable/results +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new iterator (Iter) +## - finish - to finish the iterator +## - finished - to check if the iterator is finished +## - next - to get the next item from the iterator +## - items - to iterate over the iterator +## - pairs - to iterate over the iterator and return the index of each item +## - map - to convert one iterator (Iter) to another iterator (Iter) +## - mapFilter - to convert one iterator (Iter) to another iterator (Iter) and apply filtering at the same time +## - filter - to filter an iterator (Iter) and return another iterator (Iter) +## - empty - to create an empty async iterator (AsyncIter) + type - Function*[T, U] = proc(fut: T): U {.raises: [CatchableError], gcsafe, closure.} - IsFinished* = proc(): bool {.raises: [], gcsafe, closure.} - GenNext*[T] = proc(): T {.raises: [CatchableError], gcsafe.} + IterFunction[T, U] = proc(value: T): U {.raises: [CatchableError], gcsafe.} + IterIsFinished = proc(): bool {.raises: [], gcsafe.} + IterGenNext[T] = proc(): T {.raises: [CatchableError], gcsafe.} Iterator[T] = iterator (): T + Iter*[T] = ref object finished: bool - next*: GenNext[T] - -proc finish*[T](self: Iter[T]): void = - self.finished = true + next*: IterGenNext[T] -proc finished*[T](self: Iter[T]): bool = - self.finished - -iterator items*[T](self: Iter[T]): T = - while not self.finished: - yield self.next() - -iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} = - var i = 0 - while not self.finished: - yield (i, self.next()) - inc(i) +######################################################################## +## Iter public interface methods +######################################################################## proc new*[T]( _: type Iter[T], - genNext: GenNext[T], - isFinished: IsFinished, + genNext: IterGenNext[T], + isFinished: IterIsFinished, finishOnErr: bool = true, ): Iter[T] = ## Creates a new Iter using elements returned by supplier function `genNext`. @@ -121,22 +138,26 @@ proc new*[T](_: type Iter[T], iter: Iterator[T]): Iter[T] = tryNext() Iter[T].new(genNext, isFinished) -proc empty*[T](_: type Iter[T]): Iter[T] = - ## Creates an empty Iter - ## +proc finish*[T](self: Iter[T]): void = + self.finished = true - proc genNext(): T {.raises: [CatchableError].} = - raise newException(CatchableError, "Next item requested from an empty Iter") +proc finished*[T](self: Iter[T]): bool = + self.finished - proc isFinished(): bool = - true +iterator items*[T](self: Iter[T]): T = + while not self.finished: + yield self.next() - Iter[T].new(genNext, isFinished) +iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) -proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = +proc map*[T, U](iter: Iter[T], fn: IterFunction[T, U]): Iter[U] = Iter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished) -proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter[U] = +proc mapFilter*[T, U](iter: Iter[T], mapPredicate: IterFunction[T, Option[U]]): Iter[U] = var nextUOrErr: Option[?!U] proc tryFetch(): void = @@ -167,7 +188,7 @@ proc mapFilter*[T, U](iter: Iter[T], mapPredicate: Function[T, Option[U]]): Iter tryFetch() Iter[U].new(genNext, isFinished) -proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = +proc filter*[T](iter: Iter[T], predicate: IterFunction[T, bool]): Iter[T] = proc wrappedPredicate(t: T): Option[T] = if predicate(t): some(t) @@ -175,3 +196,15 @@ proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = T.none mapFilter[T, T](iter, wrappedPredicate) + +proc empty*[T](_: type Iter[T]): Iter[T] = + ## Creates an empty Iter + ## + + proc genNext(): T {.raises: [CatchableError].} = + raise newException(CatchableError, "Next item requested from an empty Iter") + + proc isFinished(): bool = + true + + Iter[T].new(genNext, isFinished) \ No newline at end of file From 28ccfb17449f547f37b73a93aeec2ba0e5037824 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 24 Jun 2025 15:24:55 +0200 Subject: [PATCH 19/22] renaming: AsyncResultIterator => AsyncResultIter to have it consistent with other iterators --- codex/erasure/erasure.nim | 8 +- codex/stores/blockstore.nim | 4 +- codex/stores/cachestore.nim | 4 +- codex/stores/maintenance.nim | 2 +- codex/stores/networkstore.nim | 4 +- codex/stores/queryiterhelper.nim | 16 +-- codex/stores/repostore/store.nim | 10 +- codex/utils.nim | 4 +- codex/utils/asynciter.nim | 8 +- ...resultiterator.nim => asyncresultiter.nim} | 111 +++++++++--------- codex/utils/iter.nim | 6 +- tests/codex/helpers/mockrepostore.nim | 6 +- tests/codex/stores/testqueryiterhelper.nim | 4 +- tests/codex/stores/testrepostore.nim | 4 +- tests/codex/testutils.nim | 2 +- ...ltiterator.nim => testasyncresultiter.nim} | 37 +++--- 16 files changed, 113 insertions(+), 117 deletions(-) rename codex/utils/{asyncresultiterator.nim => asyncresultiter.nim} (60%) rename tests/codex/utils/{testasyncresultiterator.nim => testasyncresultiter.nim} (91%) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 8cfd676ce..cf14626fe 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -121,13 +121,13 @@ func indexToPos(steps, idx, step: int): int {.inline.} = proc getPendingBlocks( self: Erasure, manifest: Manifest, indices: seq[int] -): AsyncResultIterator[(?!bt.Block, int)] = +): AsyncResultIter[(?!bt.Block, int)] = ## Get pending blocks iterator ## if indices.len == 0: trace "No indices to fetch blocks for", treeCid = manifest.treeCid - return AsyncResultIterator[(?!bt.Block, int)].empty() + return AsyncResultIter[(?!bt.Block, int)].empty() var pendingBlocks: seq[Future[(?!bt.Block, int)].Raising([CancelledError])] = @[] @@ -162,8 +162,8 @@ proc getPendingBlocks( # but we check for that at the very beginning - # thus, if this happens, we raise an assert raiseAssert("fatal: pendingBlocks is empty - this should never happen") - - AsyncResultIterator[(?!bt.Block, int)].new(genNext, isFinished) + + AsyncResultIter[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( self: Erasure, diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 2c8967a2d..1aab695b5 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -154,9 +154,7 @@ method hasBlock*( method listBlocks*( self: BlockStore, blockType = BlockType.Manifest -): Future[?!AsyncResultIterator[Cid]] {. - base, async: (raises: [CancelledError]), gcsafe -.} = +): Future[?!AsyncResultIter[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index 7e53655ca..957aeb121 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -139,7 +139,7 @@ func cids(self: CacheStore): (iterator (): Cid {.gcsafe, raises: [].}) = method listBlocks*( self: CacheStore, blockType = BlockType.Manifest -): Future[?!AsyncResultIterator[Cid]] {.async: (raises: [CancelledError]).} = +): Future[?!AsyncResultIter[Cid]] {.async: (raises: [CancelledError]).} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## @@ -152,7 +152,7 @@ method listBlocks*( success(cids()) let iter = await ( - AsyncResultIterator[Cid].new(genNext, isFinished).filter( + AsyncResultIter[Cid].new(genNext, isFinished).filter( proc(cid: ?!Cid): Future[bool] {.async: (raises: [CancelledError]).} = without cid =? cid, err: trace "Cannot get Cid from the iterator", err = err.msg diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 848499339..d2ff5d77a 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -18,7 +18,7 @@ import pkg/questionable/results import ./repostore import ../utils/timer -import ../utils/asyncresultiterator +import ../utils/asyncresultiter import ../clock import ../logutils import ../systemclock diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index becb870f7..e8f357740 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -19,7 +19,7 @@ import ../blockexchange import ../logutils import ../merkletree import ../utils/asyncheapqueue -import ../utils/asyncresultiterator +import ../utils/asyncresultiter import ./blockstore export blockstore, blockexchange, asyncheapqueue @@ -127,7 +127,7 @@ method ensureExpiry*( method listBlocks*( self: NetworkStore, blockType = BlockType.Manifest -): Future[?!AsyncResultIterator[Cid]] {.async: (raw: true, raises: [CancelledError]).} = +): Future[?!AsyncResultIter[Cid]] {.async: (raw: true, raises: [CancelledError]).} = self.localStore.listBlocks(blockType) method delBlock*( diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index 5741d4ce2..48fbda64b 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -4,16 +4,16 @@ import pkg/chronos import pkg/chronicles import pkg/datastore/typedds -import ../utils/asyncresultiterator +import ../utils/asyncresultiter {.push raises: [].} type KeyVal*[T] = tuple[key: Key, value: T] -proc toAsyncResultIterator*[T]( +proc toAsyncResultIter*[T]( queryIter: QueryIter[T], finishOnErr: bool = true -): Future[?!AsyncResultIterator[QueryResponse[T]]] {.async: (raises: [CancelledError]).} = - ## Converts `QueryIter[T]` to `AsyncResultIterator[QueryResponse[T]]` and automatically +): Future[?!AsyncResultIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} = + ## Converts `QueryIter[T]` to `AsyncResultIter[QueryResponse[T]]` and automatically ## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only ## if the flag finishOnErr is set to true) ## @@ -22,7 +22,7 @@ proc toAsyncResultIterator*[T]( trace "Disposing iterator" if error =? (await queryIter.dispose()).errorOption: return failure(error) - return success(AsyncResultIterator[QueryResponse[T]].empty()) + return success(AsyncResultIter[QueryResponse[T]].empty()) var errOccurred = false @@ -42,11 +42,11 @@ proc toAsyncResultIterator*[T]( proc isFinished(): bool = queryIter.finished - AsyncResultIterator[QueryResponse[T]].new(genNext, isFinished).success + AsyncResultIter[QueryResponse[T]].new(genNext, isFinished).success proc filterSuccess*[T]( - iter: AsyncResultIterator[QueryResponse[T]] -): Future[AsyncResultIterator[tuple[key: Key, value: T]]] {. + iter: AsyncResultIter[QueryResponse[T]] +): Future[AsyncResultIter[tuple[key: Key, value: T]]] {. async: (raises: [CancelledError]) .} = ## Filters out any items that are not success diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 87065ace0..1fcec2fa8 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -295,12 +295,12 @@ method hasBlock*( method listBlocks*( self: RepoStore, blockType = BlockType.Manifest -): Future[?!AsyncResultIterator[Cid]] {.async: (raises: [CancelledError]).} = +): Future[?!AsyncResultIter[Cid]] {.async: (raises: [CancelledError]).} = ## Get the list of blocks in the RepoStore. ## This is an intensive operation ## - var iter = AsyncResultIterator[Cid]() + var iter = AsyncResultIter[Cid]() let key = case blockType @@ -346,7 +346,7 @@ proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} = method getBlockExpirations*( self: RepoStore, maxNumber: int, offset: int -): Future[?!AsyncResultIterator[BlockExpiration]] {. +): Future[?!AsyncResultIter[BlockExpiration]] {. async: (raises: [CancelledError]), base, gcsafe .} = ## Get iterator with block expirations @@ -360,11 +360,11 @@ method getBlockExpirations*( error "Unable to execute block expirations query", err = err.msg return failure(err) - without asyncQueryIter =? (await queryIter.toAsyncResultIterator()), err: + without asyncQueryIter =? (await queryIter.toAsyncResultIter()), err: error "Unable to convert QueryIter to AsyncIter", err = err.msg return failure(err) - let filteredIter: AsyncResultIterator[KeyVal[BlockMetadata]] = + let filteredIter: AsyncResultIter[KeyVal[BlockMetadata]] = await asyncQueryIter.filterSuccess() proc mapping( diff --git a/codex/utils.nim b/codex/utils.nim index 8d8aeb223..013b6a5b2 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -19,9 +19,9 @@ import pkg/chronos import ./utils/asyncheapqueue import ./utils/fileutils import ./utils/iter -import ./utils/asyncresultiterator +import ./utils/asyncresultiter -export asyncheapqueue, fileutils, iter, asyncresultiterator, chronos +export asyncheapqueue, fileutils, iter, asyncresultiter, chronos when defined(posix): import os, posix diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index 12df2f40b..7b0c42f6f 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -40,11 +40,9 @@ import ./iter ## - empty - to create an empty async iterator (AsyncIter) type - AsyncIterFunc[T, U] = - proc(fut: T): Future[U] {.async.} + AsyncIterFunc[T, U] = proc(fut: T): Future[U] {.async.} AsyncIterIsFinished = proc(): bool {.raises: [], gcsafe.} - AsyncIterGenNext[T] = - proc(): Future[T] {.async.} + AsyncIterGenNext[T] = proc(): Future[T] {.async.} AsyncIter*[T] = ref object finished: bool @@ -218,4 +216,4 @@ proc empty*[T](_: type AsyncIter[T]): AsyncIter[T] = proc isFinished(): bool = true - AsyncIter[T].new(genNext, isFinished) \ No newline at end of file + AsyncIter[T].new(genNext, isFinished) diff --git a/codex/utils/asyncresultiterator.nim b/codex/utils/asyncresultiter.nim similarity index 60% rename from codex/utils/asyncresultiterator.nim rename to codex/utils/asyncresultiter.nim index 95132050f..0920e41dd 100644 --- a/codex/utils/asyncresultiterator.nim +++ b/codex/utils/asyncresultiter.nim @@ -17,11 +17,11 @@ import pkg/chronos import ./iter -## AsyncResultIterator[T] is similar to `AsyncIterator[Future[T]]` +## AsyncResultIter[T] is similar to `AsyncIterator[Future[T]]` ## but does not throw exceptions others than CancelledError. ## ## Instead of throwing exception, it uses Result to communicate errors ( -## thus the name AsyncResultIterator). +## thus the name AsyncResultIter). ## ## Public interface: ## @@ -29,58 +29,57 @@ import ./iter ## - next - allows to set a custom function to be called when the next item is requested ## ## Operations: -## - new - to create a new async iterator (AsyncResultIterator) +## - new - to create a new async iterator (AsyncResultIter) ## - finish - to finish the async iterator ## - finished - to check if the async iterator is finished ## - next - to get the next item from the async iterator ## - items - to iterate over the async iterator ## - pairs - to iterate over the async iterator and return the index of each item ## - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures -## - mapAsync - to convert a regular sync iterator (Iter) to an async iter (AsyncResultIterator) -## - map - to convert one async iterator (AsyncResultIterator) to another async iter (AsyncResultIterator) -## - mapFilter - to convert one async iterator (AsyncResultIterator) to another async iter (AsyncResultIterator) and apply filtering at the same time -## - filter - to filter an async iterator (AsyncResultIterator) returning another async iterator (AsyncResultIterator) -## - delayBy - to delay each item returned by async iter by a given duration -## - empty - to create an empty async iterator (AsyncResultIterator) +## - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncResultIter) +## - map - to convert one async iterator (AsyncResultIter) to another async iterator (AsyncResultIter) +## - mapFilter - to convert one async iterator (AsyncResultIter) to another async iterator (AsyncResultIter) and apply filtering at the same time +## - filter - to filter an async iterator (AsyncResultIter) and return another async iterator (AsyncResultIter) +## - delayBy - to delay each item returned by async iterator by a given duration +## - empty - to create an empty async iterator (AsyncResultIter) type - AsyncResultIteratorFunc[T, U] = - proc(fut: T): Future[U] {.async: (raises: [CancelledError]), gcsafe, closure.} - AsyncResultIteratorIsFinished = proc(): bool {.raises: [], gcsafe, closure.} - AsyncResultIteratorGenNext[T] = - proc(): Future[T] {.async: (raises: [CancelledError]), gcsafe.} + AsyncResultIterFunc[T, U] = + proc(fut: T): Future[U] {.async: (raises: [CancelledError]).} + AsyncResultIterIsFinished = proc(): bool {.raises: [], gcsafe.} + AsyncResultIterGenNext[T] = proc(): Future[T] {.async: (raises: [CancelledError]).} - AsyncResultIterator*[T] = ref object + AsyncResultIter*[T] = ref object finished: bool - next*: AsyncResultIteratorGenNext[?!T] + next*: AsyncResultIterGenNext[?!T] proc flatMap[T, U]( - fut: auto, fn: AsyncResultIteratorFunc[?!T, ?!U] + fut: auto, fn: AsyncResultIterFunc[?!T, ?!U] ): Future[?!U] {.async: (raises: [CancelledError]).} = let t = await fut await fn(t) proc flatMap[T, U]( - fut: auto, fn: AsyncResultIteratorFunc[?!T, Option[?!U]] + fut: auto, fn: AsyncResultIterFunc[?!T, Option[?!U]] ): Future[Option[?!U]] {.async: (raises: [CancelledError]).} = let t = await fut await fn(t) ######################################################################## -## AsyncResultIterator public interface methods +## AsyncResultIter public interface methods ######################################################################## proc new*[T]( - _: type AsyncResultIterator[T], - genNext: AsyncResultIteratorGenNext[?!T], - isFinished: IsFinished, + _: type AsyncResultIter[T], + genNext: AsyncResultIterGenNext[?!T], + isFinished: AsyncResultIterIsFinished, finishOnErr: bool = true, -): AsyncResultIterator[T] = +): AsyncResultIter[T] = ## Creates a new Iter using elements returned by supplier function `genNext`. ## Iter is finished whenever `isFinished` returns true. ## - var iter = AsyncResultIterator[T]() + var iter = AsyncResultIter[T]() proc next(): Future[?!T] {.async: (raises: [CancelledError]).} = try: @@ -93,7 +92,7 @@ proc new*[T]( iter.finished = true return item else: - return failure("AsyncResultIterator is finished but next item was requested") + return failure("AsyncResultIter is finished but next item was requested") except CancelledError as err: iter.finished = true raise err @@ -106,12 +105,12 @@ proc new*[T]( # forward declaration proc mapAsync*[T, U]( - iter: Iter[T], fn: AsyncResultIteratorFunc[T, ?!U], finishOnErr: bool = true -): AsyncResultIterator[U] + iter: Iter[T], fn: AsyncResultIterFunc[T, ?!U], finishOnErr: bool = true +): AsyncResultIter[U] proc new*[U, V: Ordinal]( - _: type AsyncResultIterator[U], slice: HSlice[U, V], finishOnErr: bool = true -): AsyncResultIterator[U] = + _: type AsyncResultIter[U], slice: HSlice[U, V], finishOnErr: bool = true +): AsyncResultIter[U] = ## Creates new Iter from a slice ## @@ -124,8 +123,8 @@ proc new*[U, V: Ordinal]( ) proc new*[U, V, S: Ordinal]( - _: type AsyncResultIterator[U], a: U, b: V, step: S = 1, finishOnErr: bool = true -): AsyncResultIterator[U] = + _: type AsyncResultIter[U], a: U, b: V, step: S = 1, finishOnErr: bool = true +): AsyncResultIter[U] = ## Creates new Iter in range a..b with specified step (default 1) ## @@ -137,53 +136,53 @@ proc new*[U, V, S: Ordinal]( finishOnErr = finishOnErr, ) -proc finish*[T](self: AsyncResultIterator[T]): void = +proc finish*[T](self: AsyncResultIter[T]): void = self.finished = true -proc finished*[T](self: AsyncResultIterator[T]): bool = +proc finished*[T](self: AsyncResultIter[T]): bool = self.finished -iterator items*[T](self: AsyncResultIterator[T]): auto {.inline.} = +iterator items*[T](self: AsyncResultIter[T]): auto {.inline.} = while not self.finished: yield self.next() -iterator pairs*[T](self: AsyncResultIterator[T]): auto {.inline.} = +iterator pairs*[T](self: AsyncResultIter[T]): auto {.inline.} = var i = 0 while not self.finished: yield (i, self.next()) inc(i) proc mapFuture*[T, U]( - fut: auto, fn: AsyncResultIteratorFunc[T, U] + fut: auto, fn: AsyncResultIterFunc[T, U] ): Future[U] {.async: (raises: [CancelledError]).} = let t = await fut await fn(t) proc mapAsync*[T, U]( - iter: Iter[T], fn: AsyncResultIteratorFunc[T, ?!U], finishOnErr: bool = true -): AsyncResultIterator[U] = - AsyncResultIterator[U].new( + iter: Iter[T], fn: AsyncResultIterFunc[T, ?!U], finishOnErr: bool = true +): AsyncResultIter[U] = + AsyncResultIter[U].new( genNext = () => fn(iter.next()), isFinished = () => iter.finished(), finishOnErr = finishOnErr, ) proc map*[T, U]( - iter: AsyncResultIterator[T], - fn: AsyncResultIteratorFunc[?!T, ?!U], + iter: AsyncResultIter[T], + fn: AsyncResultIterFunc[?!T, ?!U], finishOnErr: bool = true, -): AsyncResultIterator[U] = - AsyncResultIterator[U].new( +): AsyncResultIter[U] = + AsyncResultIter[U].new( genNext = () => iter.next().flatMap(fn), isFinished = () => iter.finished, finishOnErr = finishOnErr, ) proc mapFilter*[T, U]( - iter: AsyncResultIterator[T], - mapPredicate: AsyncResultIteratorFunc[?!T, Option[?!U]], + iter: AsyncResultIter[T], + mapPredicate: AsyncResultIterFunc[?!T, Option[?!U]], finishOnErr: bool = true, -): Future[AsyncResultIterator[U]] {.async: (raises: [CancelledError]).} = +): Future[AsyncResultIter[U]] {.async: (raises: [CancelledError]).} = var nextU: Option[?!U] proc filter(): Future[void] {.async: (raises: [CancelledError]).} = @@ -203,13 +202,13 @@ proc mapFilter*[T, U]( nextU.isNone await filter() - AsyncResultIterator[U].new(genNext, isFinished, finishOnErr = finishOnErr) + AsyncResultIter[U].new(genNext, isFinished, finishOnErr = finishOnErr) proc filter*[T]( - iter: AsyncResultIterator[T], - predicate: AsyncResultIteratorFunc[?!T, bool], + iter: AsyncResultIter[T], + predicate: AsyncResultIterFunc[?!T, bool], finishOnErr: bool = true, -): Future[AsyncResultIterator[T]] {.async: (raises: [CancelledError]).} = +): Future[AsyncResultIter[T]] {.async: (raises: [CancelledError]).} = proc wrappedPredicate( t: ?!T ): Future[Option[?!T]] {.async: (raises: [CancelledError]).} = @@ -221,8 +220,8 @@ proc filter*[T]( await mapFilter[T, T](iter, wrappedPredicate, finishOnErr = finishOnErr) proc delayBy*[T]( - iter: AsyncResultIterator[T], d: Duration, finishOnErr: bool = true -): AsyncResultIterator[T] = + iter: AsyncResultIter[T], d: Duration, finishOnErr: bool = true +): AsyncResultIter[T] = ## Delays emitting each item by given duration ## @@ -234,14 +233,14 @@ proc delayBy*[T]( finishOnErr = finishOnErr, ) -proc empty*[T](_: type AsyncResultIterator[T]): AsyncResultIterator[T] = - ## Creates an empty AsyncResultIterator +proc empty*[T](_: type AsyncResultIter[T]): AsyncResultIter[T] = + ## Creates an empty AsyncResultIter ## proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} = - T.failure("Next item requested from an empty AsyncResultIterator") + T.failure("Next item requested from an empty AsyncResultIter") proc isFinished(): bool = true - AsyncResultIterator[T].new(genNext, isFinished) + AsyncResultIter[T].new(genNext, isFinished) diff --git a/codex/utils/iter.nim b/codex/utils/iter.nim index 56c0f206d..607332c55 100644 --- a/codex/utils/iter.nim +++ b/codex/utils/iter.nim @@ -157,7 +157,9 @@ iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} = proc map*[T, U](iter: Iter[T], fn: IterFunction[T, U]): Iter[U] = Iter[U].new(genNext = () => fn(iter.next()), isFinished = () => iter.finished) -proc mapFilter*[T, U](iter: Iter[T], mapPredicate: IterFunction[T, Option[U]]): Iter[U] = +proc mapFilter*[T, U]( + iter: Iter[T], mapPredicate: IterFunction[T, Option[U]] +): Iter[U] = var nextUOrErr: Option[?!U] proc tryFetch(): void = @@ -207,4 +209,4 @@ proc empty*[T](_: type Iter[T]): Iter[T] = proc isFinished(): bool = true - Iter[T].new(genNext, isFinished) \ No newline at end of file + Iter[T].new(genNext, isFinished) diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index dc63d7664..2ad4d26dc 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -14,7 +14,7 @@ import pkg/questionable import pkg/questionable/results import pkg/codex/stores/repostore -import pkg/codex/utils/asyncresultiterator +import pkg/codex/utils/asyncresultiter type MockRepoStore* = ref object of RepoStore delBlockCids*: seq[Cid] @@ -32,7 +32,7 @@ method delBlock*( method getBlockExpirations*( self: MockRepoStore, maxNumber: int, offset: int -): Future[?!AsyncResultIterator[BlockExpiration]] {.async: (raises: [CancelledError]).} = +): Future[?!AsyncResultIter[BlockExpiration]] {.async: (raises: [CancelledError]).} = self.getBeMaxNumber = maxNumber self.getBeOffset = offset @@ -41,7 +41,7 @@ method getBlockExpirations*( limit = min(offset + maxNumber, len(testBlockExpirationsCpy)) let - iter1 = AsyncResultIterator[int].new(offset ..< limit) + iter1 = AsyncResultIter[int].new(offset ..< limit) iter2 = map[int, BlockExpiration]( iter1, proc(i: ?!int): Future[?!BlockExpiration] {.async: (raises: [CancelledError]).} = diff --git a/tests/codex/stores/testqueryiterhelper.nim b/tests/codex/stores/testqueryiterhelper.nim index 6f932dbca..7fd6d71b6 100644 --- a/tests/codex/stores/testqueryiterhelper.nim +++ b/tests/codex/stores/testqueryiterhelper.nim @@ -6,7 +6,7 @@ import pkg/chronos import pkg/datastore/typedds import pkg/datastore/sql/sqliteds import pkg/codex/stores/queryiterhelper -import pkg/codex/utils/asyncresultiterator +import pkg/codex/utils/asyncresultiter import ../../asynctest import ../helpers @@ -43,7 +43,7 @@ asyncchecksuite "Test QueryIter helper": queryIter.dispose = () => (disposed = true; iterDispose()) let - iter1 = (await toAsyncResultIterator[string](queryIter)).tryGet() + iter1 = (await toAsyncResultIter[string](queryIter)).tryGet() iter2 = await filterSuccess[string](iter1) var items = initTable[string, string]() diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 5dd21306e..c4cc123fa 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -15,7 +15,7 @@ import pkg/codex/stores import pkg/codex/stores/repostore/operations import pkg/codex/blocktype as bt import pkg/codex/clock -import pkg/codex/utils/asyncresultiterator +import pkg/codex/utils/asyncresultiter import pkg/codex/merkletree/codex import ../../asynctest @@ -293,7 +293,7 @@ asyncchecksuite "RepoStore": test "Should retrieve block expiration information": proc unpack( - beIter: Future[?!AsyncResultIterator[BlockExpiration]].Raising([CancelledError]) + beIter: Future[?!AsyncResultIter[BlockExpiration]].Raising([CancelledError]) ): Future[seq[BlockExpiration]] {.async: (raises: [CancelledError]).} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index a0892ee43..148464e95 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -1,7 +1,7 @@ import ./utils/testoptions import ./utils/testkeyutils import ./utils/testasyncstatemachine -import ./utils/testasyncresultiterator +import ./utils/testasyncresultiter import ./utils/testtimer import ./utils/testtrackedfutures diff --git a/tests/codex/utils/testasyncresultiterator.nim b/tests/codex/utils/testasyncresultiter.nim similarity index 91% rename from tests/codex/utils/testasyncresultiterator.nim rename to tests/codex/utils/testasyncresultiter.nim index 7f89f0827..54d2d99ca 100644 --- a/tests/codex/utils/testasyncresultiterator.nim +++ b/tests/codex/utils/testasyncresultiter.nim @@ -2,14 +2,14 @@ import std/sugar import pkg/questionable import pkg/chronos import pkg/codex/utils/iter -import pkg/codex/utils/asyncresultiterator +import pkg/codex/utils/asyncresultiter import ../../asynctest import ../helpers -asyncchecksuite "Test AsyncResultIterator": +asyncchecksuite "Test AsyncResultIter": test "Should be finished": - let iter = AsyncResultIterator[int].empty() + let iter = AsyncResultIter[int].empty() check: iter.finished == true @@ -24,7 +24,7 @@ asyncchecksuite "Test AsyncResultIterator": fut.complete(success(intIter.next())) return fut - let iter = AsyncResultIterator[int].new(asyncGen, () => intIter.finished) + let iter = AsyncResultIter[int].new(asyncGen, () => intIter.finished) var collected: seq[int] for iFut in iter: @@ -37,11 +37,10 @@ asyncchecksuite "Test AsyncResultIterator": check collected == expectedSeq let nextRes = await iter.next() assert nextRes.isFailure - check nextRes.error.msg == - "AsyncResultIterator is finished but next item was requested" + check nextRes.error.msg == "AsyncResultIter is finished but next item was requested" test "getting async iter for simple sync range iterator": - let iter1 = AsyncResultIterator[int].new(0 ..< 5) + let iter1 = AsyncResultIter[int].new(0 ..< 5) var collected: seq[int] for iFut in iter1: @@ -54,7 +53,7 @@ asyncchecksuite "Test AsyncResultIterator": collected == @[0, 1, 2, 3, 4] test "Should map each item using `map`": - let iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + let iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) let iter2 = map[int, string]( iter1, @@ -78,7 +77,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Should leave only odd items using `filter`": let - iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) iter2 = await filter[int]( iter1, proc(i: ?!int): Future[bool] {.async: (raises: [CancelledError]).} = @@ -101,7 +100,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Should leave only odd items using `mapFilter`": let - iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = @@ -124,7 +123,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Collecting errors on `map` when finish on error is true": let - iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -152,7 +151,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Collecting errors on `map` when finish on error is false": let - iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -181,7 +180,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Collecting errors on `map` when errors are mixed with successes": let - iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -210,7 +209,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Collecting errors on `mapFilter` when finish on error is true": let - iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = @@ -240,7 +239,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Collecting errors on `mapFilter` when finish on error is false": let - iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = @@ -271,7 +270,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Collecting errors on `filter` when finish on error is false": let - iter1 = AsyncResultIterator[int].new(0 ..< 5) + iter1 = AsyncResultIter[int].new(0 ..< 5) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -314,7 +313,7 @@ asyncchecksuite "Test AsyncResultIterator": test "Collecting errors on `filter` when finish on error is true": let - iter1 = AsyncResultIterator[int].new(0 ..< 5) + iter1 = AsyncResultIter[int].new(0 ..< 5) iter2 = map[int, string]( iter1, proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = @@ -383,9 +382,9 @@ asyncchecksuite "Test AsyncResultIterator": # cancellation of the async predicate function. let fut: Future[Option[?!string]].Raising([CancelledError]) = - Future[Option[?!string]].Raising([CancelledError]).init("testasyncresultiterator") + Future[Option[?!string]].Raising([CancelledError]).init("testasyncresultiter") - let iter1 = AsyncResultIterator[int].new(0 ..< 5).delayBy(10.millis) + let iter1 = AsyncResultIter[int].new(0 ..< 5).delayBy(10.millis) let iter2 = await mapFilter[int, string]( iter1, proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = From 61783dba19c5128a3f83c2f1c016bef4c4333587 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Wed, 25 Jun 2025 13:16:22 +0200 Subject: [PATCH 20/22] enables "withLogFile" for repair from remote store only integration test --- .../integration/30_minutes/testslotrepair.nim | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index f7d8dba7b..fa070cb22 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -162,15 +162,17 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): test "repair from local and remote store", NodeConfigs( - clients: CodexConfigs.init(nodes = 1) - # .debug() - # .withLogTopics("node", "erasure") - .some, - providers: CodexConfigs.init(nodes = 3) - # .debug() - # .withLogFile() - # .withLogTopics("marketplace", "sales", "statemachine", "reservations") - .some, + clients: CodexConfigs + .init(nodes = 1) + # .debug() + # .withLogTopics("node", "erasure") + .withLogFile().some, + providers: CodexConfigs + .init(nodes = 3) + # .debug() + .withLogFile() + # .withLogTopics("marketplace", "sales", "statemachine", "reservations") + .some, ): let client0 = clients()[0] let provider0 = providers()[0] @@ -241,7 +243,7 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): NodeConfigs( clients: CodexConfigs.init(nodes = 1) # .debug() - # .withLogFile() + # .withLogFile() # .withLogTopics("node", "erasure") .some, providers: CodexConfigs.init(nodes = 3) From 4383211b8d5520d2bbf72c962c435f04a728c2de Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Wed, 25 Jun 2025 15:42:17 +0200 Subject: [PATCH 21/22] applies review comments: one liners when accessing pendingBlocksIter --- codex/erasure/erasure.nim | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index cf14626fe..a875847c0 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -187,8 +187,7 @@ proc prepareEncodingData( var resolved = 0 for fut in pendingBlocksIter: - let pendingBlocksRes = await fut - without (blkOrErr, idx) =? pendingBlocksRes, err: + without (blkOrErr, idx) =? (await fut), err: return failure(err) without blk =? blkOrErr, err: warn "Failed retrieving a block", treeCid = manifest.treeCid, idx, msg = err.msg @@ -246,8 +245,7 @@ proc prepareDecodingData( if resolved >= encoded.ecK: break - let pendingBlocksRes = await fut - without (blkOrErr, idx) =? pendingBlocksRes, err: + without (blkOrErr, idx) =? (await fut), err: return failure(err) without blk =? blkOrErr, err: trace "Failed retrieving a block", idx, treeCid = encoded.treeCid, msg = err.msg From f6a0e9dcf870e34380a24b82c56a592ae19a7e0f Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Wed, 25 Jun 2025 19:10:52 +0200 Subject: [PATCH 22/22] review: makes handing ValueError in getPendingBlocks cleaner --- codex/erasure/erasure.nim | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index a875847c0..d678e70dd 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -125,10 +125,6 @@ proc getPendingBlocks( ## Get pending blocks iterator ## - if indices.len == 0: - trace "No indices to fetch blocks for", treeCid = manifest.treeCid - return AsyncResultIter[(?!bt.Block, int)].empty() - var pendingBlocks: seq[Future[(?!bt.Block, int)].Raising([CancelledError])] = @[] proc attachIndex( @@ -158,10 +154,8 @@ proc getPendingBlocks( $index ) except ValueError as err: - # ValueError is raised by `one` when the pendingBlocks is empty - - # but we check for that at the very beginning - - # thus, if this happens, we raise an assert - raiseAssert("fatal: pendingBlocks is empty - this should never happen") + # ValueError is raised by `one` when the pendingBlocks is empty + return failure("iterator finished (pendingBlocks empty) but genNext() was called") AsyncResultIter[(?!bt.Block, int)].new(genNext, isFinished)