diff --git a/codex/node.nim b/codex/node.nim index fb653c0d7..4cb7c5a49 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -14,6 +14,7 @@ import std/sequtils import std/strformat import std/sugar import times +import std/streams import pkg/taskpools import pkg/questionable @@ -37,6 +38,7 @@ import ./merkletree import ./stores import ./blockexchange import ./streams +import ./streams/rangestream import ./erasure import ./discovery import ./contracts @@ -325,10 +327,78 @@ proc retrieve*( if err of AsyncTimeoutError: return failure(err) + # Not a manifest, must be a single block return await self.streamSingleBlock(cid) await self.streamEntireDataset(manifest, cid) +proc retrieveLocalRange*( + self: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int +): Future[?!LPStream] {.async.} = + ## Retrieve a specific byte range efficiently using only the local block store. + without manifest =? (await self.fetchManifest(cid)), err: + # Manifest fetch might still go to network if not local, but block reads won't. + return failure(err) + + let + totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int + clampedEnd = min(rangeEnd, totalSize - 1) # Ensure end is within bounds + contentLength = if clampedEnd >= rangeStart: clampedEnd - rangeStart + 1 else: 0 + + if contentLength <= 0: + # Requested range is impossible or zero length + warn "Invalid or zero-length local range requested", cid, rangeStart, rangeEnd, totalSize + let emptyStream = BufferStream.new() # Use BufferStream + await emptyStream.pushEof() # Mark as immediately finished + # Return with correct type signature but preserve instance identity + return success(LPStream(emptyStream)) + + let stream = RangeStream.new( + self.blockStore(), # Use local store + manifest, + rangeStart, + contentLength, + pad = false # Assuming padding is not needed for local retrieval + ) + # Initialize the stream to ensure its objName is set to enable tracing + stream.initStream() + trace "Local range stream created", cid, rangeStart, rangeEnd, contentLength + # Return with correct type signature but preserve instance identity + return success(LPStream(stream)) + +proc retrieveNetworkRange*( + self: CodexNodeRef, cid: Cid, rangeStart: int, rangeEnd: int +): Future[?!LPStream] {.async.} = + ## Retrieve a specific byte range efficiently using the network store. + without manifest =? (await self.fetchManifest(cid)), err: + return failure(err) + + let + totalSize = (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int + clampedEnd = min(rangeEnd, totalSize - 1) # Ensure end is within bounds + contentLength = if clampedEnd >= rangeStart: clampedEnd - rangeStart + 1 else: 0 + + if contentLength <= 0: + # Requested range is impossible or zero length + warn "Invalid or zero-length network range requested", cid, rangeStart, rangeEnd, totalSize + let emptyStream = BufferStream.new() # Use BufferStream + await emptyStream.pushEof() # Mark as immediately finished + # Return with correct type signature but preserve instance identity + return success(LPStream(emptyStream)) + + let stream = RangeStream.new( + self.networkStore, # Use network store + manifest, + rangeStart, + contentLength, + pad = false # Assuming padding is not needed here either + ) + # Initialize the stream to ensure its objName is set to enable tracing + stream.initStream() + trace "Network range stream created", cid, rangeStart, rangeEnd, contentLength + # Return with correct type signature but preserve instance identity + return success(LPStream(stream)) + proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = if err =? (await self.networkStore.delBlock(cid)).errorOption: error "Error deleting block", cid, err = err.msg diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 0d9e5d802..e84ad0f7a 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -15,6 +15,7 @@ push: import std/sequtils import std/mimetypes import std/os +import std/strformat import pkg/questionable import pkg/questionable/results @@ -39,6 +40,8 @@ import ../manifest import ../streams/asyncstreamwrapper import ../stores import ../utils/options +import ../streams/seekablestream +import ../streams/rangestream import ./coders import ./json @@ -46,6 +49,45 @@ import ./json logScope: topics = "codex restapi" +proc parseRangeHeader(rangeHeader: string): Result[(int, Option[int]), string] = + ## Parses a "Range: bytes=start-end" or "Range: bytes=start-" header. + ## Returns Ok((start, end)) or Err(message). + ## 'end' is inclusive. If '-' is used for end, returns none. + ## Very basic implementation, only supports single ranges starting with "bytes=". + if not rangeHeader.startsWith("bytes="): + return err("Invalid Range header format: Does not start with 'bytes='") + + let parts = rangeHeader[6..^1].split('-') + if parts.len != 2: + return err("Invalid Range header format: Expected 'start-end' or 'start-'") + + let startStr = parts[0].strip() + let endStr = parts[1].strip() + + var startPos: int + try: + startPos = parseInt(startStr) + if startPos < 0: + return err("Invalid Range header format: Start position cannot be negative") + except ValueError: + return err("Invalid Range header format: Invalid start position number") + + if endStr == "": + # Format "bytes=start-" + return ok((startPos, none(int))) + else: + # Format "bytes=start-end" + var endPos: int + try: + endPos = parseInt(endStr) + except ValueError: + return err("Invalid Range header format: Invalid end position number") + + if endPos < startPos: + return err("Invalid Range header format: End position cannot be less than start position") + + return ok((startPos, some(endPos))) + declareCounter(codex_api_uploads, "codex API uploads") declareCounter(codex_api_downloads, "codex API downloads") @@ -71,18 +113,30 @@ proc isPending(resp: HttpResponseRef): bool = ## sendBody(resp: HttpResponseRef, ...) twice, which is illegal. return resp.getResponseState() == HttpResponseState.Empty +proc getDefaultRangeError(): Result[(int, Option[int]), string] = + ## Returns a default error Result for range parsing. + return err("No range requested") + proc retrieveCid( - node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef + node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef, range: Result[(int, Option[int]), string] = getDefaultRangeError() ): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = - ## Download a file from the node in a streaming - ## manner + ## Download a file from the node in a streaming manner. + ## Supports HTTP Range requests (e.g., "Range: bytes=100-200" or "Range: bytes=100-"). + ## Invalid range headers are ignored, resulting in a full download. ## var stream: LPStream - - var bytes = 0 + var sentBytes = 0 + var isRangeRequest = false + var rangeStart = 0 + var rangeEnd = 0 + var responseFinishedOrFailed = false # Flag to track response state + try: - without stream =? (await node.retrieve(cid, local)), error: + # Always indicate acceptance of range requests + resp.setHeader("Accept-Ranges", "bytes") + + without manifest =? (await node.fetchManifest(cid)), error: if error of BlockNotFoundError: resp.status = Http404 await resp.sendBody( @@ -94,13 +148,7 @@ proc retrieveCid( await resp.sendBody(error.msg) return - # It is ok to fetch again the manifest because it will hit the cache - without manifest =? (await node.fetchManifest(cid)), err: - error "Failed to fetch manifest", err = err.msg - resp.status = Http404 - await resp.sendBody(err.msg) - return - + # Set content type and disposition headers if manifest.mimetype.isSome: resp.setHeader("Content-Type", manifest.mimetype.get()) else: @@ -117,37 +165,173 @@ proc retrieveCid( # For erasure-coded datasets, we need to return the _original_ length; i.e., # the length of the non-erasure-coded dataset, as that's what we will be # returning to the client. - let contentLength = - if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize - resp.setHeader("Content-Length", $(contentLength.int)) - - await resp.prepare(HttpResponseStreamType.Plain) + let totalSize = + (if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize).int + + rangeStart = 0 + rangeEnd = totalSize - 1 # Inclusive + isRangeRequest = false + + if range.isOk: + let (startReq, endReqOpt) = range.get() + # Validate the requested range + if startReq < totalSize: + isRangeRequest = true + rangeStart = startReq + if endReq =? endReqOpt: + # bytes=start-end (inclusive end) + rangeEnd = min(endReq, totalSize - 1) + else: + # bytes=start- (rangeEnd remains totalSize - 1) + rangeEnd = totalSize - 1 + + debug "Range request", start=rangeStart, endPos=rangeEnd, totalSize=totalSize + + # Ensure end >= start after validation/clamping + if rangeEnd < rangeStart: + # Requested range is impossible (e.g., start=100, end=50, totalSize=1000) + # or fully outside the content (e.g., start=1000, totalSize=500) + # Respond with 416 Range Not Satisfiable + warn "Invalid range request", start=rangeStart, endPos=rangeEnd, totalSize=totalSize + resp.status = Http416 + resp.setHeader("Content-Range", "bytes */" & $totalSize) + await resp.sendBody("Requested range not satisfiable") + return - while not stream.atEof: - var - buff = newSeqUninitialized[byte](DefaultBlockSize.int) - len = await stream.readOnce(addr buff[0], buff.len) + let contentLength = rangeEnd - rangeStart + 1 + + # Set appropriate headers for the response + if isRangeRequest: + resp.status = Http206 + resp.setHeader("Content-Range", "bytes " & $rangeStart & "-" & $rangeEnd & "/" & $totalSize) + resp.setHeader("Content-Length", $contentLength) + + # Get the appropriate range stream from the node + var rangeStreamResult: Future[?!LPStream] + if local: + trace "Requesting local range stream", cid=cid, start=rangeStart, endPos=rangeEnd + rangeStreamResult = node.retrieveLocalRange(cid, rangeStart, rangeEnd) + else: + trace "Requesting network range stream", cid=cid, start=rangeStart, endPos=rangeEnd + rangeStreamResult = node.retrieveNetworkRange(cid, rangeStart, rangeEnd) + + let awaitedResult = await rangeStreamResult + if awaitedResult.isErr: + let error = awaitedResult.error + error "Failed to create range stream", cid=cid, error=error.msg, local=local + resp.status = Http500 + await resp.sendBody("Internal error: Failed to create range stream") + responseFinishedOrFailed = true + return + + let rangestream = awaitedResult.get() + stream = rangestream + debug "Assigned rangestream to stream in retrieveCid", streamType = $typeof(stream), objectId = (if stream.isNil: "nil-oid" else: $stream.oid), isNil = stream.isNil + else: + # Full request - get the entire file + without fullStream =? (await node.retrieve(cid, local)), error: + resp.status = Http500 + await resp.sendBody(error.msg) + return + + stream = fullStream + debug "Assigned fullStream to stream in retrieveCid", streamType = $typeof(stream), objectId = (if stream.isNil: "nil-oid" else: $stream.oid), isNil = stream.isNil - buff.setLen(len) - if buff.len <= 0: - break + await resp.prepare(HttpResponseStreamType.Plain) - bytes += buff.len + # *** CRASH HAPPENS SOMEWHERE BETWEEN stream assignment AND HERE (or during resp.prepare) *** + + var bytesToSend = contentLength + debug "Preparing to send data", cid=cid, bytesToSend=bytesToSend, streamIsNil=stream.isNil, streamAtEofInitial= (not stream.isNil and stream.atEof), isRangeRequest=isRangeRequest, rangeStartReport=rangeStart, rangeEndReport=rangeEnd, totalSizeReport=totalSize + try: # <-- Start of inner streaming try block + while bytesToSend > 0 and not stream.atEof: + var + buff = newSeqUninitialized[byte](DefaultBlockSize.int) + maxRead = min(buff.len, bytesToSend) + + debug "Attempting stream.readOnce", cid=cid, maxRead=maxRead, currentBytesToSend=bytesToSend, streamAtEofBeforeRead=stream.atEof + + var readLen = await stream.readOnce(addr buff[0], maxRead) + + debug "Stream readOnce returned", cid=cid, readLen=readLen, requestedRead=maxRead, streamAtEofAfterRead=stream.atEof, currentSentBytes=sentBytes + + buff.setLen(readLen) + if buff.len <= 0: + debug "Stream read returned 0 or negative, or buff became empty. Breaking loop.", cid=cid, readLen=readLen, buffLen=buff.len, remainingBytesToSend=bytesToSend + break + + sentBytes += buff.len + bytesToSend -= buff.len + + await resp.send(addr buff[0], buff.len) + + if bytesToSend > 0 and not stream.atEof: + warn "Stream ended prematurely while sending content", cid=cid, expected=contentLength, sent=sentBytes, missing=bytesToSend + # Consider setting responseFinishedOrFailed = true here? Or let finish() handle it? + + responseFinishedOrFailed = true + await resp.finish() + codex_api_downloads.inc() + except HttpWriteError as writeErr: + responseFinishedOrFailed = true + warn "Client disconnected during download (inner try)", cid=cid, sent=sentBytes, expected=contentLength, error=writeErr.msg + except CancelledError as streamCancelledErr: + responseFinishedOrFailed = true + warn "Streaming cancelled (inner try)", cid=cid, sent=sentBytes, error=streamCancelledErr.msg + raise streamCancelledErr + except CatchableError as streamErr: + responseFinishedOrFailed = true + warn "Error during streaming (inner try)", cid=cid, sent=sentBytes, error=streamErr.msg + # Attempt to send a 500 if the response is still pending + if resp.isPending(): + resp.status = Http500 + await resp.sendBody(streamErr.msg) - await resp.send(addr buff[0], buff.len) - await resp.finish() - codex_api_downloads.inc() + except AssertionDefect as assertExc: + # ADDED: Catch AssertionDefect specifically + responseFinishedOrFailed = true + let excTypeStr = $typeof(assertExc) + warn "AssertionDefect in retrieveCid (outer try)", cid=cid, errorContext="AssertionDefect", excMsg=assertExc.msg, excType=excTypeStr + if resp.isPending(): + try: + resp.status = Http500 + await resp.sendBody("Assertion Failed: " & assertExc.msg) + except HttpWriteError: + warn "Unable to send error response (outer AssertionDefect), client likely disconnected", cid=cid except CancelledError as exc: + responseFinishedOrFailed = true + warn "retrieveCid cancelled (outer try)", cid=cid, error=exc.msg raise exc - except CatchableError as exc: - warn "Error streaming blocks", exc = exc.msg - resp.status = Http500 + except Exception as exc: # Broaden catch from CatchableError to Exception (This is the intended change) + responseFinishedOrFailed = true # Set it unconditionally here + let excTypeStr = $typeof(exc) + warn "Error in retrieveCid (outer try)", cid=cid, errorContext="Outer Exception", excMsg=exc.msg, excType=excTypeStr if resp.isPending(): - await resp.sendBody(exc.msg) + try: + resp.status = Http500 + await resp.sendBody(exc.msg) + except HttpWriteError: + warn "Unable to send error response (outer try), client likely disconnected", cid=cid finally: - info "Sent bytes", cid = cid, bytes - if not stream.isNil: - await stream.close() + # Determine stream type for logging + let streamType = if stream.isNil: "nil" else: $typeof(stream) + info "Finally block reached", cid=cid, streamType=streamType, isStreamNil=(stream.isNil), responseFinishedOrFailed=responseFinishedOrFailed + + # Original info log + info "Sent bytes", cid=cid, bytes=sentBytes, local=local, rangeRequested=isRangeRequest, rangeStart=(if isRangeRequest: rangeStart else: 0), rangeEnd=(if isRangeRequest: rangeEnd else: 0) + + # Safely close the stream only if it wasn't already handled by finish/failure + if not stream.isNil and not responseFinishedOrFailed: + info "Attempting to close potentially orphaned stream", cid=cid, streamType=streamType + try: + await stream.close() + info "Orphaned stream closed successfully", cid=cid, streamType=streamType + except CatchableError as closeExc: # Reverted to CatchableError + discard + elif not stream.isNil and responseFinishedOrFailed: + trace "Skipping stream.close() because response finished or failed.", cid=cid, streamType=streamType + elif stream.isNil: + trace "Finally stream check", msg="Stream was nil in finally block, nothing to close." proc buildCorsHeaders( httpMethod: string, allowedOrigin: Option[string] @@ -282,7 +466,16 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute resp.setCorsHeaders("GET", corsOrigin) resp.setHeader("Access-Control-Headers", "X-Requested-With") - await node.retrieveCid(cid.get(), local = true, resp = resp) + # Parse Range header if present + var requestedRange: Result[(int, Option[int]), string] = getDefaultRangeError() + let rangeHeader = request.headers.getString("Range", "") + if rangeHeader != "": + requestedRange = parseRangeHeader(rangeHeader) + if requestedRange.isErr: + warn "Invalid Range header received", header = rangeHeader, error = requestedRange.error + requestedRange = getDefaultRangeError() # Reset to indicate no valid range + + await retrieveCid(node, cid.get(), local = true, resp = resp, range = requestedRange) router.api(MethodDelete, "/api/codex/v1/data/{cid}") do( cid: Cid, resp: HttpResponseRef @@ -329,12 +522,9 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute router.api(MethodGet, "/api/codex/v1/data/{cid}/network/stream") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: + var headers = buildCorsHeaders("GET", allowedOrigin) ## Download a file from the network in a streaming ## manner - ## - - var headers = buildCorsHeaders("GET", allowedOrigin) - if cid.isErr: return RestApiResponse.error(Http400, $cid.error(), headers = headers) @@ -342,8 +532,16 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute resp.setCorsHeaders("GET", corsOrigin) resp.setHeader("Access-Control-Headers", "X-Requested-With") - resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition") - await node.retrieveCid(cid.get(), local = false, resp = resp) + # Parse Range header if present + var requestedRange: Result[(int, Option[int]), string] = getDefaultRangeError() + let rangeHeader = request.headers.getString("Range", "") + if rangeHeader != "": + requestedRange = parseRangeHeader(rangeHeader) + if requestedRange.isErr: + warn "Invalid Range header received", header = rangeHeader, error = requestedRange.error + requestedRange = getDefaultRangeError() # Reset to indicate no valid range + + await retrieveCid(node, cid.get(), local = false, resp = resp, range = requestedRange) router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do( cid: Cid, resp: HttpResponseRef diff --git a/codex/streams/rangestream.nim b/codex/streams/rangestream.nim new file mode 100644 index 000000000..7abedfc8c --- /dev/null +++ b/codex/streams/rangestream.nim @@ -0,0 +1,193 @@ +## Nim-Codex +## Copyright (c) 2021-2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils +import pkg/chronos +import pkg/libbacktrace +import pkg/questionable +import pkg/questionable/results +# import pkg/stew/results as stew_results # Removed deprecated import + +import ../blocktype as bt +# import ./asyncstreamwrapper # No longer needed +import ./storestream +import ./seekablestream +import ../logutils +import ../stores +import ../manifest + +logScope: + topics = "codex rangestream" + +type + RangeStream* = ref object of SeekableStream + store: BlockStore + manifest: Manifest + streamStartPos: int # Start position in original dataset + streamLength: int # Length of the streamed range + currentPos: int # Current position relative to streamStartPos (0 to streamLength-1) + leftToProcess: int # Bytes left to process from the stream + storeStream: StoreStream # Underlying stream for reading data + pad: bool + +proc getBlocksForRange( + self: RangeStream, + offset: int, + length: int +): Future[?!seq[int]] {.async.} = + ## Get the block indices needed to satisfy a range request + ## Note: This function seems unused within RangeStream itself. + ## Keeping it for now in case it's used externally. + let + blockSize = self.manifest.blockSize.int + firstBlock = offset div blockSize + lastBlock = min((offset + length - 1) div blockSize, self.manifest.blocksCount - 1) + + var blockIndices: seq[int] = @[] + for i in firstBlock..lastBlock: + blockIndices.add(i) + + return success(blockIndices) + +proc new*( + T: type RangeStream, + store: BlockStore, + manifest: Manifest, + startPos: int, + length: int, + pad: bool = false +): RangeStream = + ## Create a range stream that efficiently retrieves only the necessary blocks + ## for the requested byte range + + let stream = RangeStream( + store: store, + manifest: manifest, + streamStartPos: startPos, + streamLength: length, + currentPos: 0, + leftToProcess: length, + storeStream: nil, # Initialize lazily + pad: pad + ) + + # Initialize the base LPStream object + stream.initStream() + + return stream + +method atEof*(self: RangeStream): bool {.raises: [].} = + self.leftToProcess <= 0 + +# Helper proc to initialize the underlying storeStream if needed +proc ensureStoreStream(self: RangeStream) = + if self.storeStream.isNil: + self.storeStream = StoreStream.new(self.store, self.manifest, self.pad) + # Set initial position + self.storeStream.setPos(self.streamStartPos + self.currentPos) # Start at current absolute position + debug "RangeStream initialized underlying StoreStream", startPos=self.streamStartPos, currentPos=self.currentPos, storeStreamPos=self.storeStream.offset + +method read*( + self: RangeStream, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]), base.} = + ## Read bytes from the specified range within the underlying data. + + if self.atEof: + return 0 + + # Ensure underlying stream is ready + self.ensureStoreStream() + + # Read only as many bytes as needed for the range + let bytesToRead = min(nbytes, self.leftToProcess) + if bytesToRead == 0: + return 0 + + var readBytes = 0 + try: + # Use StoreStream's async readOnce + readBytes = await self.storeStream.readOnce(pbytes, bytesToRead) + trace "RangeStream read bytes", got=readBytes, requested=bytesToRead, left=self.leftToProcess-readBytes + except LPStreamEOFError: + # StoreStream reached EOF unexpectedly before range end? Log and treat as 0 bytes read for the range. + warn "StoreStream EOF encountered while reading range", requested=bytesToRead, got=0, rangeLeft=self.leftToProcess + readBytes = 0 + except LPStreamError as exc: + warn "StoreStream error while reading range", msg=exc.msg, requested=bytesToRead, rangeLeft=self.leftToProcess + raise exc + + if readBytes > 0: + self.currentPos += readBytes + self.leftToProcess -= readBytes + elif self.leftToProcess > 0: + # If readOnce returned 0 but we still expected bytes, it's effectively EOF for the range + debug "RangeStream read 0 bytes, marking as EOF", left=self.leftToProcess + self.leftToProcess = 0 # Mark as EOF + + return readBytes + +method close*( + self: RangeStream +): Future[void] {.async: (raises: [])} = + ## Close the RangeStream and its underlying StoreStream if initialized. + trace "Closing RangeStream", currentPos=self.currentPos, left=self.leftToProcess + if not self.storeStream.isNil: + await self.storeStream.close() # Use the async close + self.storeStream = nil # Clear the reference + # Call base close implementation + await procCall LPStream(self).closeImpl() + +method readOnce*( + self: RangeStream, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError])} = + # Delegate to the main read method + return await self.read(pbytes, nbytes) + +method write*( + self: RangeStream, msg: seq[byte] +): Future[void] {.async: (raises: [CancelledError, LPStreamError])} = + # Range streams are read-only + raise newException(LPStreamError, "RangeStream is read-only") + +method getPosition*(self: RangeStream): int {.base.} = + ## Get the current position within the defined range (0 to streamLength-1) + return self.currentPos + +method setPos*(self: RangeStream, pos: int): bool {.base.} = # No longer async + ## Set the position within the defined range (0 to streamLength-1) + + # Validate position is within the allowed range [0, streamLength) + if pos < 0 or pos >= self.streamLength: + warn "Attempted to seek outside RangeStream bounds", requested=pos, length=self.streamLength + return false + + # Ensure underlying stream is ready + self.ensureStoreStream() + + # Calculate absolute position in the original dataset + let absolutePos = self.streamStartPos + pos + + # Set position in underlying StoreStream (synchronous) + self.storeStream.setPos(absolutePos) + + # Update RangeStream state + self.currentPos = pos + self.leftToProcess = self.streamLength - pos + + debug "RangeStream position set", rangePos=pos, absPos=absolutePos, left=self.leftToProcess + return true + +method truncate*(self: RangeStream, size: int): Future[bool] {.async: (raises: [CancelledError, LPStreamError]), base.} = + # Range streams are read-only + raise newException(LPStreamError, "RangeStream is read-only") + # return false # Previous behavior + +method getLengthSync*(self: RangeStream): int {.base.} = + ## Get the total length of the defined range. + return self.streamLength \ No newline at end of file diff --git a/codex/streams/seekablestream.nim b/codex/streams/seekablestream.nim index c48ec28f0..08549a657 100644 --- a/codex/streams/seekablestream.nim +++ b/codex/streams/seekablestream.nim @@ -21,7 +21,9 @@ type SeekableStream* = ref object of LPStream offset*: int method `size`*(self: SeekableStream): int {.base.} = - raiseAssert("method unimplemented") + # Base implementation returns -1 to indicate unknown size + # Subclasses should override this method to provide the actual size + return -1 proc setPos*(self: SeekableStream, pos: int) = self.offset = pos