Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import std/sequtils
import std/strformat
import std/sugar
import times
import std/streams

import pkg/taskpools
import pkg/questionable
Expand All @@ -37,6 +38,7 @@ import ./merkletree
import ./stores
import ./blockexchange
import ./streams
import ./streams/rangestream
import ./erasure
import ./discovery
import ./contracts
Expand Down Expand Up @@ -325,10 +327,78 @@ proc retrieve*(
if err of AsyncTimeoutError:
return failure(err)

# Not a manifest, must be a single block
return await self.streamSingleBlock(cid)

await self.streamEntireDataset(manifest, cid)

proc retrieveLocalRange*(
self: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int
): Future[?!LPStream] {.async.} =
## Retrieve a specific byte range efficiently using only the local block store.
without manifest =? (await self.fetchManifest(cid)), err:
# Manifest fetch might still go to network if not local, but block reads won't.
return failure(err)

let
totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int
clampedEnd = min(rangeEnd, totalSize - 1) # Ensure end is within bounds
contentLength = if clampedEnd >= rangeStart: clampedEnd - rangeStart + 1 else: 0

if contentLength <= 0:
# Requested range is impossible or zero length
warn "Invalid or zero-length local range requested", cid, rangeStart, rangeEnd, totalSize
let emptyStream = BufferStream.new() # Use BufferStream
await emptyStream.pushEof() # Mark as immediately finished
# Return with correct type signature but preserve instance identity
return success(LPStream(emptyStream))

let stream = RangeStream.new(
self.blockStore(), # Use local store
manifest,
rangeStart,
contentLength,
pad = false # Assuming padding is not needed for local retrieval
)
# Initialize the stream to ensure its objName is set to enable tracing
stream.initStream()
trace "Local range stream created", cid, rangeStart, rangeEnd, contentLength
# Return with correct type signature but preserve instance identity
return success(LPStream(stream))

proc retrieveNetworkRange*(
self: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int
): Future[?!LPStream] {.async.} =
## Retrieve a specific byte range efficiently using the network store.
without manifest =? (await self.fetchManifest(cid)), err:
return failure(err)

let
totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int
clampedEnd = min(rangeEnd, totalSize - 1) # Ensure end is within bounds
contentLength = if clampedEnd >= rangeStart: clampedEnd - rangeStart + 1 else: 0

if contentLength <= 0:
# Requested range is impossible or zero length
warn "Invalid or zero-length network range requested", cid, rangeStart, rangeEnd, totalSize
let emptyStream = BufferStream.new() # Use BufferStream
await emptyStream.pushEof() # Mark as immediately finished
# Return with correct type signature but preserve instance identity
return success(LPStream(emptyStream))

let stream = RangeStream.new(
self.networkStore, # Use network store
manifest,
rangeStart,
contentLength,
pad = false # Assuming padding is not needed here either
)
# Initialize the stream to ensure its objName is set to enable tracing
stream.initStream()
trace "Network range stream created", cid, rangeStart, rangeEnd, contentLength
# Return with correct type signature but preserve instance identity
return success(LPStream(stream))

proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
if err =? (await self.networkStore.delBlock(cid)).errorOption:
error "Error deleting block", cid, err = err.msg
Expand Down
Loading