Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a48af48
uses explicit type in "Should retrieve block expiration information" …
marcinczenko Jun 3, 2025
293adcc
adds "format-global" to use globally installed nph
marcinczenko Jun 12, 2025
c9fba17
removes async iter from queryiterhelper (and the related tests)
marcinczenko Jun 16, 2025
b56099e
removes import of the legacy asynciter from mockrepostore
marcinczenko Jun 16, 2025
6957c9d
remove not needed asynciter import in testcontracts
marcinczenko Jun 16, 2025
91f1ed8
removes last references to legacy asynciter in tests
marcinczenko Jun 16, 2025
2356402
removes unneeded asynciter import from treehelper (it uses regular it…
marcinczenko Jun 16, 2025
4b9d967
removes unneeded asynciter import from indexingstrategy (it uses regu…
marcinczenko Jun 16, 2025
b158d43
adds missing mapFuture operation to SafeAsyncIter
marcinczenko Jun 17, 2025
7938024
Removes redundant import of asynciter in node
marcinczenko Jun 17, 2025
9fe7551
updates exports related to async iter in utils
marcinczenko Jun 17, 2025
88c6a72
replaces the old async iter with the new safe async iter in erasure, …
marcinczenko Jun 17, 2025
ba04d59
updates erasure.nim after rebasing to use SafeAsyncIter and checked e…
marcinczenko Jun 17, 2025
bbadf63
renaming: SafeAsyncIter => AsyncResultIterator
marcinczenko Jun 24, 2025
f2f7059
formatting
marcinczenko Jun 24, 2025
e1b7086
AsyncResultIterator: update internal names
marcinczenko Jun 24, 2025
d42625a
changes the module name: safeasynciter => asyncresultiterator
marcinczenko Jun 24, 2025
c5c1f97
refactors "Iter" and "AsyncIter" to be better separated and having ad…
marcinczenko Jun 24, 2025
28ccfb1
renaming: AsyncResultIterator => AsyncResultIter to have it consisten…
marcinczenko Jun 24, 2025
61783db
enables "withLogFile" for repair from remote store only integration test
marcinczenko Jun 25, 2025
4383211
applies review comments: one liners when accessing pendingBlocksIter
marcinczenko Jun 25, 2025
f6a0e9d
review: makes handing ValueError in getPendingBlocks cleaner
marcinczenko Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/actions/nimbus-build-system/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ runs:
sudo apt-get update -qq
sudo DEBIAN_FRONTEND='noninteractive' apt-get install \
--no-install-recommends -yq lcov
sudo apt install libpcre3

- name: APT (Linux i386)
if: inputs.os == 'linux' && inputs.cpu == 'i386'
Expand Down Expand Up @@ -83,7 +84,7 @@ runs:

- name: Install gcc 14 on Linux
# We don't want to install gcc 14 for coverage (Ubuntu 20.04)
if : ${{ inputs.os == 'linux' && inputs.coverage != 'true' }}
if: ${{ inputs.os == 'linux' && inputs.coverage != 'true' }}
shell: ${{ inputs.shell }} {0}
run: |
# Skip for older Ubuntu versions
Expand Down Expand Up @@ -202,7 +203,7 @@ runs:
- name: Restore Nim toolchain binaries from cache
id: nim-cache
uses: actions/cache@v4
if : ${{ inputs.coverage != 'true' }}
if: ${{ inputs.coverage != 'true' }}
with:
path: NimBinaries
key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_version }}-cache-${{ env.cache_nonce }}-${{ github.run_id }}
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ format:
$(NPH) codex/
$(NPH) tests/

format-global:
nph *.nim
nph codex/
nph tests/

clean-nph:
rm -f $(NPH)

Expand Down
97 changes: 48 additions & 49 deletions codex/erasure/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import ../clock
import ../blocktype as bt
import ../utils
import ../utils/asynciter
import ../indexingstrategy
import ../errors
import ../utils/arrayutils
Expand Down Expand Up @@ -122,14 +121,15 @@

proc getPendingBlocks(
self: Erasure, manifest: Manifest, indices: seq[int]
): AsyncIter[(?!bt.Block, int)] =
): AsyncResultIter[(?!bt.Block, int)] =
## Get pending blocks iterator
##
var pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[]

var pendingBlocks: seq[Future[(?!bt.Block, int)].Raising([CancelledError])] = @[]

proc attachIndex(
fut: Future[?!bt.Block], i: int
): Future[(?!bt.Block, int)] {.async.} =
fut: Future[?!bt.Block].Raising([CancelledError]), i: int
): Future[(?!bt.Block, int)] {.async: (raises: [CancelledError]).} =
## avoids closure capture issues
return (await fut, i)

Expand All @@ -141,20 +141,23 @@
proc isFinished(): bool =
pendingBlocks.len == 0

proc genNext(): Future[(?!bt.Block, int)] {.async.} =
let completedFut = await one(pendingBlocks)
if (let i = pendingBlocks.find(completedFut); i >= 0):
pendingBlocks.del(i)
return await completedFut
else:
let (_, index) = await completedFut
raise newException(
CatchableError,
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " &
$index,
)
proc genNext(): Future[?!(?!bt.Block, int)] {.async: (raises: [CancelledError]).} =
try:
let completedFut = await one(pendingBlocks)
if (let i = pendingBlocks.find(completedFut); i >= 0):
pendingBlocks.del(i)
return success(await completedFut)
else:
let (_, index) = await completedFut
return failure(
"Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " &
$index
)
except ValueError as err:
# ValueError is raised by `one` when the pendingBlocks is empty
return failure("iterator finished (pendingBlocks empty) but genNext() was called")

AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
AsyncResultIter[(?!bt.Block, int)].new(genNext, isFinished)

proc prepareEncodingData(
self: Erasure,
Expand All @@ -164,7 +167,7 @@
data: ref seq[seq[byte]],
cids: ref seq[Cid],
emptyBlock: seq[byte],
): Future[?!Natural] {.async.} =
): Future[?!Natural] {.async: (raises: [CancelledError, IndexingError]).} =
## Prepare data for encoding
##

Expand All @@ -178,7 +181,8 @@

var resolved = 0
for fut in pendingBlocksIter:
let (blkOrErr, idx) = await fut
without (blkOrErr, idx) =? (await fut), err:
return failure(err)

Check warning on line 185 in codex/erasure/erasure.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/erasure.nim#L185

Added line #L185 was not covered by tests
without blk =? blkOrErr, err:
warn "Failed retrieving a block", treeCid = manifest.treeCid, idx, msg = err.msg
return failure(err)
Expand Down Expand Up @@ -208,7 +212,7 @@
parityData: ref seq[seq[byte]],
cids: ref seq[Cid],
emptyBlock: seq[byte],
): Future[?!(Natural, Natural)] {.async.} =
): Future[?!(Natural, Natural)] {.async: (raises: [CancelledError, IndexingError]).} =
## Prepare data for decoding
## `encoded` - the encoded manifest
## `step` - the current step
Expand All @@ -235,7 +239,8 @@
if resolved >= encoded.ecK:
break

let (blkOrErr, idx) = await fut
without (blkOrErr, idx) =? (await fut), err:
return failure(err)

Check warning on line 243 in codex/erasure/erasure.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/erasure.nim#L243

Added line #L243 was not covered by tests
without blk =? blkOrErr, err:
trace "Failed retrieving a block", idx, treeCid = encoded.treeCid, msg = err.msg
continue
Expand Down Expand Up @@ -362,7 +367,7 @@

proc encodeData(
self: Erasure, manifest: Manifest, params: EncodingParams
): Future[?!Manifest] {.async.} =
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
## Encode blocks pointed to by the protected manifest
##
## `manifest` - the manifest to encode
Expand Down Expand Up @@ -403,15 +408,12 @@

trace "Erasure coding data", data = data[].len

try:
if err =? (
await self.asyncEncode(
manifest.blockSize.int, params.ecK, params.ecM, data, parity
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
if err =? (
await self.asyncEncode(
manifest.blockSize.int, params.ecK, params.ecM, data, parity
)
).errorOption:
return failure(err)

var idx = params.rounded + step
for j in 0 ..< params.ecM:
Expand Down Expand Up @@ -451,17 +453,17 @@
except CancelledError as exc:
trace "Erasure coding encoding cancelled"
raise exc # cancellation needs to be propagated
except CatchableError as exc:
trace "Erasure coding encoding error", exc = exc.msg
return failure(exc)
except IndexingError as err:
trace "Erasure coding encoding indexing error", error = err.msg

Check warning on line 457 in codex/erasure/erasure.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/erasure.nim#L456-L457

Added lines #L456 - L457 were not covered by tests
return failure(err)

proc encode*(
self: Erasure,
manifest: Manifest,
blocks: Natural,
parity: Natural,
strategy = SteppedStrategy,
): Future[?!Manifest] {.async.} =
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
## Encode a manifest into one that is erasure protected.
##
## `manifest` - the original manifest to be encoded
Expand Down Expand Up @@ -554,7 +556,7 @@

proc decodeInternal(
self: Erasure, encoded: Manifest
): Future[?!(ref seq[Cid], seq[Natural])] {.async.} =
): Future[?!(ref seq[Cid], seq[Natural])] {.async: (raises: [CancelledError]).} =
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
Expand Down Expand Up @@ -597,15 +599,12 @@
continue

trace "Erasure decoding data"
try:
if err =? (
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
if err =? (
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)

for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
Expand All @@ -630,9 +629,9 @@
except CancelledError as exc:
trace "Erasure coding decoding cancelled"
raise exc # cancellation needs to be propagated
except CatchableError as exc:
trace "Erasure coding decoding error", exc = exc.msg
return failure(exc)
except IndexingError as err:
trace "Erasure coding decoding indexing error", error = err.msg
return failure(err)

Check warning on line 634 in codex/erasure/erasure.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/erasure.nim#L632-L634

Added lines #L632 - L634 were not covered by tests
finally:
decoder.release()

Expand Down
2 changes: 1 addition & 1 deletion codex/indexingstrategy.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import ./errors
import ./utils
import ./utils/asynciter
import ./utils/iter

{.push raises: [].}

Expand Down
1 change: 0 additions & 1 deletion codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import ./indexingstrategy
import ./utils
import ./errors
import ./logutils
import ./utils/asynciter
import ./utils/trackedfutures

export logutils
Expand Down
3 changes: 1 addition & 2 deletions codex/slots/builder/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import ../../utils
import ../../stores
import ../../manifest
import ../../merkletree
import ../../utils/asynciter
import ../../indexingstrategy

import ../converters

export converters, asynciter
export converters, iter

logScope:
topics = "codex slotsbuilder"
Expand Down
2 changes: 1 addition & 1 deletion codex/stores/blockstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ method hasBlock*(

method listBlocks*(
self: BlockStore, blockType = BlockType.Manifest
): Future[?!SafeAsyncIter[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} =
): Future[?!AsyncResultIter[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##

Expand Down
4 changes: 2 additions & 2 deletions codex/stores/cachestore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func cids(self: CacheStore): (iterator (): Cid {.gcsafe, raises: [].}) =

method listBlocks*(
self: CacheStore, blockType = BlockType.Manifest
): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} =
): Future[?!AsyncResultIter[Cid]] {.async: (raises: [CancelledError]).} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##

Expand All @@ -152,7 +152,7 @@ method listBlocks*(
success(cids())

let iter = await (
SafeAsyncIter[Cid].new(genNext, isFinished).filter(
AsyncResultIter[Cid].new(genNext, isFinished).filter(
proc(cid: ?!Cid): Future[bool] {.async: (raises: [CancelledError]).} =
without cid =? cid, err:
trace "Cannot get Cid from the iterator", err = err.msg
Expand Down
2 changes: 1 addition & 1 deletion codex/stores/maintenance.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import pkg/questionable/results

import ./repostore
import ../utils/timer
import ../utils/safeasynciter
import ../utils/asyncresultiter
import ../clock
import ../logutils
import ../systemclock
Expand Down
4 changes: 2 additions & 2 deletions codex/stores/networkstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import ../logutils
import ../merkletree
import ../utils/asyncheapqueue
import ../utils/safeasynciter
import ../utils/asyncresultiter
import ./blockstore

export blockstore, blockexchange, asyncheapqueue
Expand Down Expand Up @@ -127,7 +127,7 @@

method listBlocks*(
self: NetworkStore, blockType = BlockType.Manifest
): Future[?!SafeAsyncIter[Cid]] {.async: (raw: true, raises: [CancelledError]).} =
): Future[?!AsyncResultIter[Cid]] {.async: (raw: true, raises: [CancelledError]).} =

Check warning on line 130 in codex/stores/networkstore.nim

View check run for this annotation

Codecov / codecov/patch

codex/stores/networkstore.nim#L130

Added line #L130 was not covered by tests
self.localStore.listBlocks(blockType)

method delBlock*(
Expand Down
Loading
Loading