diff --git a/go.mod b/go.mod index 3b3c4546e7..da67d170e6 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/ipfs/go-ipfs-exchange-interface v0.2.0 github.com/ipfs/go-ipfs-exchange-offline v0.3.0 github.com/ipfs/go-ipfs-routing v0.2.1 + github.com/ipfs/go-ipld-cbor v0.0.5 github.com/ipfs/go-ipld-format v0.4.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-merkledag v0.7.0 @@ -183,7 +184,6 @@ require ( github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.2 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect - github.com/ipfs/go-ipld-cbor v0.0.5 // indirect github.com/ipfs/go-ipld-legacy v0.1.0 // indirect github.com/ipfs/go-ipns v0.1.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect diff --git a/share/eds/ods.go b/share/eds/ods.go new file mode 100644 index 0000000000..aa1219d41a --- /dev/null +++ b/share/eds/ods.go @@ -0,0 +1,98 @@ +package eds + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/ipld/go-car" + "github.com/ipld/go-car/util" +) + +// bufferedODSReader will read odsSquareSize amount of leaves from reader into the buffer. +// It exposes the buffer to be read by io.Reader interface implementation +type bufferedODSReader struct { + carReader *bufio.Reader + // current is the amount of CARv1 encoded leaves that have been read from reader. When current + // reaches odsSquareSize, bufferedODSReader will prevent further reads by returning io.EOF + current, odsSquareSize int + buf *bytes.Buffer +} + +// ODSReader reads CARv1 encoded data from io.ReadCloser and limits the reader to the CAR header +// and first quadrant (ODS) +func ODSReader(carReader io.Reader) (io.Reader, error) { + if carReader == nil { + return nil, errors.New("eds: can't create ODSReader over nil reader") + } + + odsR := &bufferedODSReader{ + carReader: bufio.NewReader(carReader), + buf: new(bytes.Buffer), + } + + // first LdRead reads the full CAR header to determine amount of shares in the ODS + data, err := util.LdRead(odsR.carReader) + if err != nil { + return nil, fmt.Errorf("reading header: %v", err) + } + + var header car.CarHeader + err = cbor.DecodeInto(data, &header) + if err != nil { + return nil, fmt.Errorf("invalid header: %w", err) + } + + // car header contains both row roots and col roots which is why + // we divide by 4 to get the ODSWidth + odsWidth := len(header.Roots) / 4 + odsR.odsSquareSize = odsWidth * odsWidth + + // NewCarReader will expect to read the header first, so write it first + return odsR, util.LdWrite(odsR.buf, data) +} + +func (r *bufferedODSReader) Read(p []byte) (n int, err error) { + // read leafs to the buffer until it has sufficient data to fill provided container or full ods is + // read + for r.current < r.odsSquareSize && r.buf.Len() < len(p) { + if err := r.readLeaf(); err != nil { + return 0, err + } + + r.current++ + } + + // read buffer to slice + return r.buf.Read(p) +} + +// readLeaf reads one leaf from reader into bufferedODSReader buffer +func (r *bufferedODSReader) readLeaf() error { + if _, err := r.carReader.Peek(1); err != nil { // no more blocks, likely clean io.EOF + return err + } + + l, err := binary.ReadUvarint(r.carReader) + if err != nil { + if err == io.EOF { + return io.ErrUnexpectedEOF // don't silently pretend this is a clean EOF + } + return err + } + + if l > uint64(util.MaxAllowedSectionSize) { // Don't OOM + return fmt.Errorf("malformed car; header `length`: %v is bigger than %v", l, util.MaxAllowedSectionSize) + } + + buf := make([]byte, 8) + n := binary.PutUvarint(buf, l) + r.buf.Write(buf[:n]) + + _, err = r.buf.ReadFrom(io.LimitReader(r.carReader, int64(l))) + return err +} diff --git a/share/eds/ods_test.go b/share/eds/ods_test.go new file mode 100644 index 0000000000..8509fc04b2 --- /dev/null +++ b/share/eds/ods_test.go @@ -0,0 +1,94 @@ +package eds + +import ( + "context" + "io" + "testing" + + "github.com/ipld/go-car" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" +) + +// TestODSReader ensures that the reader returned from ODSReader is capable of reading the CAR +// header and ODS. +func TestODSReader(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // launch eds store + edsStore, err := newStore(t) + require.NoError(t, err) + err = edsStore.Start(ctx) + require.NoError(t, err) + + // generate random eds data and put it into the store + eds, dah := randomEDS(t) + err = edsStore.Put(ctx, dah, eds) + require.NoError(t, err) + + // get CAR reader from store + r, err := edsStore.GetCAR(ctx, dah) + assert.NoError(t, err) + + // create ODSReader wrapper based on car reader to limit reads to ODS only + odsR, err := ODSReader(r) + assert.NoError(t, err) + + // create CAR reader from ODSReader + carReader, err := car.NewCarReader(odsR) + assert.NoError(t, err) + + // validate ODS could be obtained from reader + for i := 0; i < 4; i++ { + for j := 0; j < 4; j++ { + // pick share from original eds + original := eds.GetCell(uint(i), uint(j)) + + // read block from odsReader based reader + block, err := carReader.Next() + assert.NoError(t, err) + + // check that original data from eds is same as data from reader + assert.Equal(t, original, block.RawData()[share.NamespaceSize:]) + } + } + + // Make sure no excess data is available to get from reader + _, err = carReader.Next() + assert.Error(t, io.EOF, err) +} + +// TestODSReaderReconstruction ensures that the reader returned from ODSReader provides sufficient +// data for EDS reconstruction +func TestODSReaderReconstruction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // launch eds store + edsStore, err := newStore(t) + require.NoError(t, err) + err = edsStore.Start(ctx) + require.NoError(t, err) + + // generate random eds data and put it into the store + eds, dah := randomEDS(t) + err = edsStore.Put(ctx, dah, eds) + require.NoError(t, err) + + // get CAR reader from store + r, err := edsStore.GetCAR(ctx, dah) + assert.NoError(t, err) + + // create ODSReader wrapper based on car reader to limit reads to ODS only + odsR, err := ODSReader(r) + assert.NoError(t, err) + + // reconstruct EDS from ODSReader + loaded, err := ReadEDS(ctx, odsR, dah) + assert.NoError(t, err) + require.Equal(t, eds.RowRoots(), loaded.RowRoots()) + require.Equal(t, eds.ColRoots(), loaded.ColRoots()) +}