Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share): Implement ODSreader #1377

Merged
merged 11 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipfs-routing v0.2.1
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.7.0
Expand Down Expand Up @@ -183,7 +184,6 @@ require (
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.5 // indirect
github.com/ipfs/go-ipld-legacy v0.1.0 // indirect
github.com/ipfs/go-ipns v0.1.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
Expand Down
98 changes: 98 additions & 0 deletions share/eds/ods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package eds

import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"

cbor "github.com/ipfs/go-ipld-cbor"
"github.com/ipld/go-car"
"github.com/ipld/go-car/util"
)

// bufferedODSReader will read odsSquareSize amount of leaves from reader into the buffer.
// It exposes the buffer to be read by io.Reader interface implementation
type bufferedODSReader struct {
carReader *bufio.Reader
// current is the amount of CARv1 encoded leaves that have been read from reader. When current
// reaches odsSquareSize, bufferedODSReader will prevent further reads by returning io.EOF
current, odsSquareSize int
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
buf *bytes.Buffer
}

// ODSReader reads CARv1 encoded data from io.ReadCloser and limits the reader to the CAR header
// and first quadrant (ODS)
func ODSReader(carReader io.Reader) (io.Reader, error) {
if carReader == nil {
return nil, errors.New("eds: can't create ODSReader over nil reader")
}

odsR := &bufferedODSReader{
carReader: bufio.NewReader(carReader),
buf: new(bytes.Buffer),
}

// first LdRead reads the full CAR header to determine amount of shares in the ODS
data, err := util.LdRead(odsR.carReader)
if err != nil {
return nil, fmt.Errorf("reading header: %v", err)
}

var header car.CarHeader
err = cbor.DecodeInto(data, &header)
if err != nil {
return nil, fmt.Errorf("invalid header: %w", err)
}

// car header contains both row roots and col roots which is why
// we divide by 4 to get the ODSWidth
odsWidth := len(header.Roots) / 4
walldiss marked this conversation as resolved.
Show resolved Hide resolved
odsR.odsSquareSize = odsWidth * odsWidth

// NewCarReader will expect to read the header first, so write it first
return odsR, util.LdWrite(odsR.buf, data)
}

func (r *bufferedODSReader) Read(p []byte) (n int, err error) {
// read leafs to the buffer until it has sufficient data to fill provided container or full ods is
// read
for r.current < r.odsSquareSize && r.buf.Len() < len(p) {
if err := r.readLeaf(); err != nil {
return 0, err
}

r.current++
}

// read buffer to slice
return r.buf.Read(p)
}

// readLeaf reads one leaf from reader into bufferedODSReader buffer
func (r *bufferedODSReader) readLeaf() error {
if _, err := r.carReader.Peek(1); err != nil { // no more blocks, likely clean io.EOF
return err
}

l, err := binary.ReadUvarint(r.carReader)
if err != nil {
if err == io.EOF {
return io.ErrUnexpectedEOF // don't silently pretend this is a clean EOF
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}
return err
}

if l > uint64(util.MaxAllowedSectionSize) { // Don't OOM
return fmt.Errorf("malformed car; header `length`: %v is bigger than %v", l, util.MaxAllowedSectionSize)
}

buf := make([]byte, 8)
n := binary.PutUvarint(buf, l)
r.buf.Write(buf[:n])

_, err = r.buf.ReadFrom(io.LimitReader(r.carReader, int64(l)))
return err
}
94 changes: 94 additions & 0 deletions share/eds/ods_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package eds

import (
"context"
"io"
"testing"

"github.com/ipld/go-car"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
)

// TestODSReader ensures that the reader returned from ODSReader is capable of reading the CAR
// header and ODS.
func TestODSReader(t *testing.T) {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// launch eds store
edsStore, err := newStore(t)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)

// generate random eds data and put it into the store
eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah, eds)
require.NoError(t, err)

// get CAR reader from store
r, err := edsStore.GetCAR(ctx, dah)
assert.NoError(t, err)

// create ODSReader wrapper based on car reader to limit reads to ODS only
odsR, err := ODSReader(r)
assert.NoError(t, err)

// create CAR reader from ODSReader
carReader, err := car.NewCarReader(odsR)
assert.NoError(t, err)

// validate ODS could be obtained from reader
for i := 0; i < 4; i++ {
for j := 0; j < 4; j++ {
// pick share from original eds
original := eds.GetCell(uint(i), uint(j))

// read block from odsReader based reader
block, err := carReader.Next()
assert.NoError(t, err)

// check that original data from eds is same as data from reader
assert.Equal(t, original, block.RawData()[share.NamespaceSize:])
}
}
Wondertan marked this conversation as resolved.
Show resolved Hide resolved

// Make sure no excess data is available to get from reader
_, err = carReader.Next()
assert.Error(t, io.EOF, err)
}

// TestODSReaderReconstruction ensures that the reader returned from ODSReader provides sufficient
// data for EDS reconstruction
func TestODSReaderReconstruction(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// launch eds store
edsStore, err := newStore(t)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)

// generate random eds data and put it into the store
eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah, eds)
require.NoError(t, err)

// get CAR reader from store
r, err := edsStore.GetCAR(ctx, dah)
assert.NoError(t, err)

// create ODSReader wrapper based on car reader to limit reads to ODS only
odsR, err := ODSReader(r)
assert.NoError(t, err)

// reconstruct EDS from ODSReader
loaded, err := ReadEDS(ctx, odsR, dah)
assert.NoError(t, err)
require.Equal(t, eds.RowRoots(), loaded.RowRoots())
require.Equal(t, eds.ColRoots(), loaded.ColRoots())
}