Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize retrieval from Filecoin #404

Merged
merged 19 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.11.0
github.com/ipfs/go-unixfs v0.4.5
github.com/ipfs/go-unixfsnode v1.8.0
github.com/ipfs/go-unixfsnode v1.9.0
github.com/ipld/go-car v0.6.1
github.com/ipld/go-car/v2 v2.13.1
github.com/ipld/go-codec-dagpb v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,8 @@ github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVzte
github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU=
github.com/ipfs/go-unixfs v0.4.5 h1:wj8JhxvV1G6CD7swACwSKYa+NgtdWC1RUit+gFnymDU=
github.com/ipfs/go-unixfs v0.4.5/go.mod h1:BIznJNvt/gEx/ooRMI4Us9K8+qeGO7vx1ohnbk8gjFg=
github.com/ipfs/go-unixfsnode v1.8.0 h1:yCkakzuE365glu+YkgzZt6p38CSVEBPgngL9ZkfnyQU=
github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8=
github.com/ipfs/go-unixfsnode v1.9.0 h1:ubEhQhr22sPAKO2DNsyVBW7YB/zA8Zkif25aBvz8rc8=
github.com/ipfs/go-unixfsnode v1.9.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8=
github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs=
github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU=
github.com/ipld/go-car v0.6.1 h1:blWbEHf1j62JMWFIqWE//YR0m7k5ZMw0AuUOU5hjrH8=
Expand Down
1 change: 1 addition & 0 deletions handler/file/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type FilecoinRetriever interface {
Retrieve(ctx context.Context, c cid.Cid, rangeStart int64, rangeEnd int64, sps []string, out io.Writer) error
RetrieveReader(ctx context.Context, c cid.Cid, rangeStart int64, rangeEnd int64, sps []string) (io.ReadCloser, error)
}

type Handler interface {
Expand Down
57 changes: 57 additions & 0 deletions handler/file/range_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package file

import (
"errors"
"io"
)

// rangeReader reads data from one individually retrievable file range.
type rangeReader struct {
// offset is the absolute offset within file where the next read will get
// data from.
offset int64
reader io.ReadCloser
remaining int64
}

func (rr *rangeReader) writeToN(w io.Writer, readLen int64) (int64, error) {
var read int64
for readLen > 0 {
if rr.remaining == 0 {
return read, io.EOF
}
var n int64
var err error

if readLen >= rr.remaining {
// Copy all remaining bytes.
n, err = io.Copy(w, rr.reader)
} else {
// Copy requested number of bytes.
n, err = io.CopyN(w, rr.reader, readLen)
}
if err != nil && !errors.Is(err, io.EOF) {
return 0, err
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}
if n == 0 {
// Must have been EOF.
rr.remaining = 0
return read, io.EOF
}
rr.offset += n
rr.remaining -= n
readLen -= n
read += n
}
return read, nil
}

func (rr *rangeReader) close() error {
var err error
if rr.reader != nil {
rr.remaining = 0
err = rr.reader.Close()
rr.reader = nil
}
return err
}
122 changes: 111 additions & 11 deletions handler/file/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,12 @@ type filecoinReader struct {
offset int64
size int64
id uint64

// Reads remaining data from current range.
rangeReader *rangeReader
}

func (r *filecoinReader) Read(p []byte) (int, error) {
logger.Infof("buffer size: %v", len(p))

buf := bytes.NewBuffer(p)
buf.Reset()

if r.offset >= r.size {
return 0, io.EOF
}
Expand All @@ -114,13 +112,75 @@ func (r *filecoinReader) Read(p []byte) (int, error) {
readLen = remainingBytes
}

buf := bytes.NewBuffer(p)
buf.Reset()

n, err := r.writeToN(buf, readLen)
return int(n), err
}

// WriteTo is implemented in order to directly handle io.Copy operations
// rather than allow small, separate Read operations.
func (r *filecoinReader) WriteTo(w io.Writer) (int64, error) {
gammazero marked this conversation as resolved.
Show resolved Hide resolved
if r.offset >= r.size {
return 0, io.EOF
}
// Read all remaining bytes and write them to w.
return r.writeToN(w, r.size-r.offset)
}

func (r *filecoinReader) writeToN(w io.Writer, readLen int64) (int64, error) {
gammazero marked this conversation as resolved.
Show resolved Hide resolved
var read int64
// If there is a rangeReader from the previous read that can be used to
// continue reading more data, then use it instead of doing another
// findFileRanges and Retrieve for more reads from this same range.
if r.rangeReader != nil {
// If continuing from the previous read, keep reading from this rangeReader.
if r.offset == r.rangeReader.offset {
// Reading data leftover from previous read into w.
n, err := r.rangeReader.writeToN(w, readLen)
if err != nil && !errors.Is(err, io.EOF) {
return 0, err
}
r.offset += n
readLen -= n
read += n
if r.rangeReader.remaining == 0 {
// No data left in range reader.
r.rangeReader.close()
r.rangeReader = nil
}
if readLen == 0 {
// Read all requested data from leftover in rangeReader.
return read, nil
}
// No more leftover data to read, but readLen additional bytes
// still needed. Will read more data from next range(s).
} else {
// Trying to read from outside of rangeReader's range. Must have
// seeked out of current range. Close rangeReader and read new
// range.
r.rangeReader.close()
r.rangeReader = nil
}
}

// Get next range(s) to read from.

// Get all ranges, from current offset, that are covered by readLen.
fileRanges, err := findFileRanges(r.db, r.id, r.offset, r.offset+readLen)
if err != nil {
return 0, fmt.Errorf("failed to retrieve file range deals: %w", err)
}

read := 0
var rr *rangeReader

// Read from each range until readLen bytes read.
for _, fileRange := range fileRanges {
if rr != nil {
rr.close()
rr = nil
}
if readLen == 0 {
// this shouldn't happen
logger.Warnw("retrieval reader retrieved file ranges beyond end of range", "fileRangeStart", fileRange.Offset, "fileRangeEnd", fileRange.Offset+fileRange.Length)
Expand All @@ -135,30 +195,63 @@ func (r *filecoinReader) Read(p []byte) (int, error) {
if rangeReadLen > remainingRange {
rangeReadLen = remainingRange
}
// Range starts at fileRange.Offset, has total length fileRange.Length,
// and has remainingRange bytes left to read. Now read rangeReadLen
// bytes of the remaining bytes this range.

if fileRange.JobID == nil {
return read, UnableToServeRangeError{Start: r.offset, End: r.offset + rangeReadLen, Err: ErrNoJobRecord}
}
providers, err := findProviders(r.db, *fileRange.JobID)
if err != nil || len(providers) == 0 {
return read, UnableToServeRangeError{Start: r.offset, End: r.offset + rangeReadLen, Err: ErrNoFilecoinDeals}
}
err = r.retriever.Retrieve(r.ctx, cid.Cid(fileRange.CID), offsetInRange, offsetInRange+rangeReadLen, providers, buf)

// Get a reader that reads until the end of the range.
rd, err := r.retriever.RetrieveReader(r.ctx, cid.Cid(fileRange.CID), offsetInRange, offsetInRange+remainingRange, providers)
if err != nil {
return read, UnableToServeRangeError{
Start: r.offset,
End: r.offset + rangeReadLen,
Err: fmt.Errorf("unable to retrieve data from filecoin: %w", err),
}
}
r.offset += rangeReadLen
readLen -= rangeReadLen
read += int(rangeReadLen)
rr = &rangeReader{
offset: r.offset,
reader: rd,
remaining: remainingRange,
}

// Reading readLen of the remaining bytes in this range.
n, err := rr.writeToN(w, readLen)
if err != nil && !errors.Is(err, io.EOF) {
rr.close()
return 0, err
}
r.offset += n
readLen -= n
read += n
}

// check for missing file ranges at the end
if readLen > 0 {
if rr != nil {
rr.close()
}
return read, UnableToServeRangeError{Start: r.offset, End: r.offset + readLen, Err: ErrNoFileRangeRecord}
}

if rr != nil {
// Some unread data left over in this range. Save it for next read.
if rr.remaining != 0 {
// Saving leftover rangeReader with rr.remaining bytes left.
r.rangeReader = rr
} else {
// Leftover rangeReader has 0 bytes remaining.
rr.close()
}
}

return read, nil
}

Expand All @@ -172,6 +265,8 @@ func (r *filecoinReader) Seek(offset int64, whence int) (int64, error) {
newOffset = r.offset + offset
case io.SeekEnd:
newOffset = r.size + offset
gammazero marked this conversation as resolved.
Show resolved Hide resolved
default:
return 0, errors.New("unknown seek mode")
}

if newOffset > r.size {
Expand All @@ -184,7 +279,12 @@ func (r *filecoinReader) Seek(offset int64, whence int) (int64, error) {
}

func (r *filecoinReader) Close() error {
return nil
var err error
if r.rangeReader != nil {
err = r.rangeReader.close()
r.rangeReader = nil
}
return err
}

func findFileRanges(db *gorm.DB, id uint64, startRange int64, endRange int64) ([]model.FileRange, error) {
Expand Down
Loading