diff --git a/share/empty.go b/share/empty.go index 0cb83e7a95..224479445b 100644 --- a/share/empty.go +++ b/share/empty.go @@ -11,10 +11,16 @@ import ( "github.com/celestiaorg/rsmt2d" ) +// EmptyEDSDataHash returns DataHash of the empty block EDS. +func EmptyEDSDataHash() DataHash { + initEmpty() + return emptyBlockDataHash +} + // EmptyEDSRoots returns AxisRoots of the empty block EDS. func EmptyEDSRoots() *AxisRoots { initEmpty() - return emptyBlockRoot + return emptyBlockRoots } // EmptyEDS returns the EDS of the empty block data square. @@ -30,17 +36,18 @@ func EmptyBlockShares() []Share { } var ( - emptyMu sync.Mutex - emptyBlockRoot *AxisRoots - emptyBlockEDS *rsmt2d.ExtendedDataSquare - emptyBlockShares []Share + emptyMu sync.Mutex + emptyBlockDataHash DataHash + emptyBlockRoots *AxisRoots + emptyBlockEDS *rsmt2d.ExtendedDataSquare + emptyBlockShares []Share ) // initEmpty enables lazy initialization for constant empty block data. func initEmpty() { emptyMu.Lock() defer emptyMu.Unlock() - if emptyBlockRoot != nil { + if emptyBlockRoots != nil { return } @@ -54,16 +61,16 @@ func initEmpty() { } emptyBlockEDS = eds - emptyBlockRoot, err = NewAxisRoots(eds) + emptyBlockRoots, err = NewAxisRoots(eds) if err != nil { panic(fmt.Errorf("failed to create empty DAH: %w", err)) } minDAH := da.MinDataAvailabilityHeader() - if !bytes.Equal(minDAH.Hash(), emptyBlockRoot.Hash()) { + if !bytes.Equal(minDAH.Hash(), emptyBlockRoots.Hash()) { panic(fmt.Sprintf("mismatch in calculated minimum DAH and minimum DAH from celestia-app, "+ - "expected %s, got %s", minDAH.String(), emptyBlockRoot.String())) + "expected %s, got %s", minDAH.String(), emptyBlockRoots.String())) } // precompute Hash, so it's cached internally to avoid potential races - emptyBlockRoot.Hash() + emptyBlockDataHash = emptyBlockRoots.Hash() } diff --git a/share/new_eds/accessor.go b/share/new_eds/accessor.go index 447001e24a..ca0b475bb2 100644 --- a/share/new_eds/accessor.go +++ b/share/new_eds/accessor.go @@ -10,6 +10,9 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) +// EmptyAccessor is an accessor of an empty EDS block. +var EmptyAccessor = &Rsmt2D{ExtendedDataSquare: share.EmptyEDS()} + // Accessor is an interface for accessing extended data square data. type Accessor interface { // Size returns square size of the Accessor. diff --git a/share/root.go b/share/root.go index bad14b472f..e8d16ccf0a 100644 --- a/share/root.go +++ b/share/root.go @@ -38,7 +38,7 @@ func (dh DataHash) String() string { // IsEmptyEDS check whether DataHash corresponds to the root of an empty block EDS. func (dh DataHash) IsEmptyEDS() bool { - return bytes.Equal(EmptyEDSRoots().Hash(), dh) + return bytes.Equal(EmptyEDSDataHash(), dh) } // NewSHA256Hasher returns a new instance of a SHA-256 hasher. diff --git a/store/file/file.go b/store/file/file.go new file mode 100644 index 0000000000..a6dba43ad0 --- /dev/null +++ b/store/file/file.go @@ -0,0 +1,8 @@ +package file + +const ( + // writeBufferSize defines buffer size for optimized batched writes into the file system. + // TODO(@Wondertan): Consider making it configurable + writeBufferSize = 64 << 10 + filePermissions = 0o600 +) diff --git a/store/file/header.go b/store/file/header.go index 9946041f46..4c7b081fbc 100644 --- a/store/file/header.go +++ b/store/file/header.go @@ -2,7 +2,6 @@ package file import ( "encoding/binary" - "errors" "fmt" "io" @@ -37,7 +36,7 @@ type fileType uint8 const ( ods fileType = iota - q1q4 + q4 ) func readHeader(r io.Reader) (*headerV0, error) { @@ -45,9 +44,6 @@ func readHeader(r io.Reader) (*headerV0, error) { var version headerVersion err := binary.Read(r, binary.LittleEndian, &version) if err != nil { - if errors.Is(err, io.EOF) { - return nil, ErrEmptyFile - } return nil, fmt.Errorf("readHeader: %w", err) } @@ -70,6 +66,14 @@ func writeHeader(w io.Writer, h *headerV0) error { return err } +func (h *headerV0) SquareSize() int { + return int(h.squareSize) +} + +func (h *headerV0) ShareSize() int { + return int(h.shareSize) +} + func (h *headerV0) Size() int { // header size + 1 byte for header fileVersion return headerVOSize + 1 diff --git a/store/file/ods.go b/store/file/ods.go index 1e6af4af22..e6e01bd5fc 100644 --- a/store/file/ods.go +++ b/store/file/ods.go @@ -16,20 +16,14 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap" ) -var _ eds.AccessorStreamer = (*ODSFile)(nil) +var _ eds.AccessorStreamer = (*ODS)(nil) -// writeBufferSize defines buffer size for optimized batched writes into the file system. -// TODO(@Wondertan): Consider making it configurable -const writeBufferSize = 64 << 10 - -// ErrEmptyFile signals that the ODS file is empty. -// This helps avoid storing empty block EDSes. -var ErrEmptyFile = errors.New("file is empty") - -type ODSFile struct { - path string - hdr *headerV0 - fl *os.File +// ODS implements eds.Accessor as an FS file. +// It stores the original data square(ODS), which is the first quadrant of EDS, +// and it's metadata in file's header. +type ODS struct { + hdr *headerV0 + fl *os.File lock sync.RWMutex // ods stores an in-memory cache of the original data square to enhance read performance. This @@ -46,93 +40,68 @@ type ODSFile struct { disableCache bool } -// OpenODSFile opens an existing file. File has to be closed after usage. -func OpenODSFile(path string) (*ODSFile, error) { - f, err := os.Open(path) - if err != nil { - return nil, err - } - - h, err := readHeader(f) - if err != nil { - return nil, err - } - - return &ODSFile{ - path: path, - hdr: h, - fl: f, - }, nil -} - -// CreateODSFile creates a new file. File has to be closed after usage. -func CreateODSFile( +// CreateODS creates a new file under given FS path and +// writes the ODS into it out of given EDS. +// It ensures FS is synced after writing finishes. +// It may leave partially written file if any of the writes fail. +func CreateODS( path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare, -) (*ODSFile, error) { +) error { mod := os.O_RDWR | os.O_CREATE | os.O_EXCL // ensure we fail if already exist - f, err := os.OpenFile(path, mod, 0o666) - if err != nil { - return nil, fmt.Errorf("file create: %w", err) - } - - // buffering gives us ~4x speed up - buf := bufio.NewWriterSize(f, writeBufferSize) - - h, err := writeODSFile(buf, eds, roots) + f, err := os.OpenFile(path, mod, filePermissions) if err != nil { - return nil, fmt.Errorf("writing ODS file: %w", err) + return fmt.Errorf("creating file: %w", err) } - err = buf.Flush() - if err != nil { - return nil, fmt.Errorf("flushing ODS file: %w", err) + hdr := &headerV0{ + fileVersion: fileV0, + fileType: ods, + shareSize: share.Size, + squareSize: uint16(eds.Width()), + datahash: roots.Hash(), } - err = f.Sync() - if err != nil { - return nil, fmt.Errorf("syncing file: %w", err) + err = writeODSFile(f, roots, eds, hdr) + if errClose := f.Close(); errClose != nil { + err = errors.Join(err, fmt.Errorf("closing created ODS file: %w", errClose)) } - return &ODSFile{ - path: path, - fl: f, - hdr: h, - }, nil + return err } -func writeODSFile(w io.Writer, eds *rsmt2d.ExtendedDataSquare, axisRoots *share.AxisRoots) (*headerV0, error) { - // write header - h := &headerV0{ - fileVersion: fileV0, - fileType: ods, - shareSize: share.Size, - squareSize: uint16(eds.Width()), - datahash: axisRoots.Hash(), +// writeQ4File full ODS content into OS File. +func writeODSFile(f *os.File, axisRoots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare, hdr *headerV0) error { + // buffering gives us ~4x speed up + buf := bufio.NewWriterSize(f, writeBufferSize) + + if err := writeHeader(f, hdr); err != nil { + return fmt.Errorf("writing header: %w", err) } - err := writeHeader(w, h) - if err != nil { - return nil, fmt.Errorf("writing header: %w", err) + + if err := writeAxisRoots(buf, axisRoots); err != nil { + return fmt.Errorf("writing axis roots: %w", err) } - err = writeAxisRoots(w, axisRoots) - if err != nil { - return nil, fmt.Errorf("writing axis roots: %w", err) + if err := writeODS(buf, eds); err != nil { + return fmt.Errorf("writing ODS: %w", err) } - // write quadrants - err = writeQ1(w, eds) - if err != nil { - return nil, fmt.Errorf("writing Q1: %w", err) + if err := buf.Flush(); err != nil { + return fmt.Errorf("flushing ODS file: %w", err) } - return h, nil + if err := f.Sync(); err != nil { + return fmt.Errorf("syncing file: %w", err) + } + + return nil } -// writeQ1 writes the first quadrant of the square to the writer. It writes the quadrant in row-major +// writeODS writes the first quadrant(ODS) of the square to the writer. It writes the quadrant in row-major // order -func writeQ1(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error { +func writeODS(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error { for i := range eds.Width() / 2 { for j := range eds.Width() / 2 { shr := eds.GetCell(i, j) // TODO: Avoid copying inside GetCell @@ -162,36 +131,58 @@ func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error { return nil } -// Size returns square size of the Accessor. -func (f *ODSFile) Size(context.Context) int { - return f.size() +// OpenODS opens an existing ODS file under given FS path. +// It only reads the header with metadata. The other content +// of the File is read lazily. +// If file is empty, the ErrEmptyFile is returned. +// File must be closed after usage. +func OpenODS(path string) (*ODS, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + + h, err := readHeader(f) + if err != nil { + return nil, err + } + + return &ODS{ + hdr: h, + fl: f, + }, nil } -func (f *ODSFile) size() int { - return int(f.hdr.squareSize) +// Size returns EDS size stored in file's header. +func (o *ODS) Size(context.Context) int { + return o.size() +} + +func (o *ODS) size() int { + return int(o.hdr.squareSize) } // DataHash returns root hash of Accessor's underlying EDS. -func (f *ODSFile) DataHash(context.Context) (share.DataHash, error) { - return f.hdr.datahash, nil +func (o *ODS) DataHash(context.Context) (share.DataHash, error) { + return o.hdr.datahash, nil } // AxisRoots reads AxisRoots stored in the file. AxisRoots are stored after the header and before the // ODS data. -func (f *ODSFile) AxisRoots(context.Context) (*share.AxisRoots, error) { - roots := make([]byte, f.axisRootsSize()) - n, err := f.fl.ReadAt(roots, int64(f.hdr.Size())) +func (o *ODS) AxisRoots(context.Context) (*share.AxisRoots, error) { + roots := make([]byte, o.axisRootsSize()) + n, err := o.fl.ReadAt(roots, int64(o.hdr.Size())) if err != nil { return nil, fmt.Errorf("reading axis roots: %w", err) } if n != len(roots) { return nil, fmt.Errorf("reading axis roots: expected %d bytes, got %d", len(roots), n) } - rowRoots := make([][]byte, f.size()) - colRoots := make([][]byte, f.size()) - for i := 0; i < f.size(); i++ { + rowRoots := make([][]byte, o.size()) + colRoots := make([][]byte, o.size()) + for i := 0; i < o.size(); i++ { rowRoots[i] = roots[i*share.AxisRootSize : (i+1)*share.AxisRootSize] - colRoots[i] = roots[(f.size()+i)*share.AxisRootSize : (f.size()+i+1)*share.AxisRootSize] + colRoots[i] = roots[(o.size()+i)*share.AxisRootSize : (o.size()+i+1)*share.AxisRootSize] } axisRoots := &share.AxisRoots{ RowRoots: rowRoots, @@ -201,14 +192,14 @@ func (f *ODSFile) AxisRoots(context.Context) (*share.AxisRoots, error) { } // Close closes the file. -func (f *ODSFile) Close() error { - return f.fl.Close() +func (o *ODS) Close() error { + return o.fl.Close() } // Sample returns share and corresponding proof for row and column indices. Implementation can // choose which axis to use for proof. Chosen axis for proof should be indicated in the returned // Sample. -func (f *ODSFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { +func (o *ODS) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { // Sample proof axis is selected to optimize read performance. // - For the first and second quadrants, we read the row axis because it is more efficient to read // single row than reading full ODS to calculate single column @@ -217,11 +208,11 @@ func (f *ODSFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, // - For the fourth quadrant, it does not matter which axis we read because we need to read full ODS // to calculate the sample axisType, axisIdx, shrIdx := rsmt2d.Row, rowIdx, colIdx - if colIdx < f.size()/2 && rowIdx >= f.size()/2 { + if colIdx < o.size()/2 && rowIdx >= o.size()/2 { axisType, axisIdx, shrIdx = rsmt2d.Col, colIdx, rowIdx } - axis, err := f.axis(ctx, axisType, axisIdx) + axis, err := o.axis(ctx, axisType, axisIdx) if err != nil { return shwap.Sample{}, fmt.Errorf("reading axis: %w", err) } @@ -231,11 +222,11 @@ func (f *ODSFile) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, // AxisHalf returns half of shares axis of the given type and index. Side is determined by // implementation. Implementations should indicate the side in the returned AxisHalf. -func (f *ODSFile) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { +func (o *ODS) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { // Read the axis from the file if the axis is a row and from the top half of the square, or if the // axis is a column and from the left half of the square. - if axisIdx < f.size()/2 { - half, err := f.readAxisHalf(axisType, axisIdx) + if axisIdx < o.size()/2 { + half, err := o.readAxisHalf(axisType, axisIdx) if err != nil { return eds.AxisHalf{}, fmt.Errorf("reading axis half: %w", err) } @@ -243,7 +234,7 @@ func (f *ODSFile) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) } // if axis is from the second half of the square, read full ODS and compute the axis half - ods, err := f.readODS() + ods, err := o.readODS() if err != nil { return eds.AxisHalf{}, err } @@ -256,12 +247,12 @@ func (f *ODSFile) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) } // RowNamespaceData returns data for the given namespace and row index. -func (f *ODSFile) RowNamespaceData( +func (o *ODS) RowNamespaceData( ctx context.Context, namespace share.Namespace, rowIdx int, ) (shwap.RowNamespaceData, error) { - shares, err := f.axis(ctx, rsmt2d.Row, rowIdx) + shares, err := o.axis(ctx, rsmt2d.Row, rowIdx) if err != nil { return shwap.RowNamespaceData{}, err } @@ -269,8 +260,8 @@ func (f *ODSFile) RowNamespaceData( } // Shares returns data shares extracted from the Accessor. -func (f *ODSFile) Shares(context.Context) ([]share.Share, error) { - ods, err := f.readODS() +func (o *ODS) Shares(context.Context) ([]share.Share, error) { + ods, err := o.readODS() if err != nil { return nil, err } @@ -279,94 +270,115 @@ func (f *ODSFile) Shares(context.Context) ([]share.Share, error) { // Reader returns binary reader for the file. It reads the shares from the ODS part of the square // row by row. -func (f *ODSFile) Reader() (io.Reader, error) { - f.lock.RLock() - ods := f.ods - f.lock.RUnlock() +func (o *ODS) Reader() (io.Reader, error) { + o.lock.RLock() + ods := o.ods + o.lock.RUnlock() if ods != nil { return ods.reader() } - offset := f.sharesOffset() - total := int64(f.hdr.shareSize) * int64(f.size()*f.size()/4) - reader := io.NewSectionReader(f.fl, int64(offset), total) + offset := o.sharesOffset() + total := int64(o.hdr.shareSize) * int64(o.size()*o.size()/4) + reader := io.NewSectionReader(o.fl, int64(offset), total) return reader, nil } -func (f *ODSFile) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { - f.lock.RLock() - ods := f.ods - f.lock.RUnlock() +func (o *ODS) axis(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { + half, err := o.AxisHalf(ctx, axisType, axisIdx) + if err != nil { + return nil, err + } + + axis, err := half.Extended() + if err != nil { + return nil, fmt.Errorf("extending axis half: %w", err) + } + + return axis, nil +} + +func (o *ODS) readAxisHalf(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { + o.lock.RLock() + ods := o.ods + o.lock.RUnlock() if ods != nil { - return f.ods.axisHalf(axisType, axisIdx) + return o.ods.axisHalf(axisType, axisIdx) } - offset := f.sharesOffset() - switch axisType { - case rsmt2d.Col: - col, err := readCol(f.fl, f.hdr, offset, 0, axisIdx) - return eds.AxisHalf{ - Shares: col, - IsParity: false, - }, err - case rsmt2d.Row: - row, err := readRow(f.fl, f.hdr, offset, 0, axisIdx) - return eds.AxisHalf{ - Shares: row, - IsParity: false, - }, err + axisHalf, err := readAxisHalf( + o.fl, + axisType, + o.hdr.ShareSize(), + o.size(), + o.sharesOffset(), + axisIdx, + ) + if err != nil { + return eds.AxisHalf{}, fmt.Errorf("reading axis half: %w", err) } - return eds.AxisHalf{}, fmt.Errorf("unknown axis") + + return eds.AxisHalf{ + Shares: axisHalf, + IsParity: false, + }, nil } -func (f *ODSFile) sharesOffset() int { - return f.hdr.Size() + f.axisRootsSize() +func (o *ODS) sharesOffset() int { + return o.hdr.Size() + o.axisRootsSize() } -func (f *ODSFile) axisRootsSize() int { +func (o *ODS) axisRootsSize() int { // axis roots are stored in two parts: row roots and column roots, each part has size equal to // the square size. Thus, the total amount of roots is equal to the square size * 2. - return share.AxisRootSize * 2 * f.size() + return share.AxisRootSize * 2 * o.size() } -func (f *ODSFile) readODS() (square, error) { - f.lock.RLock() - ods := f.ods - f.lock.RUnlock() +func (o *ODS) readODS() (square, error) { + o.lock.RLock() + ods := o.ods + o.lock.RUnlock() if ods != nil { return ods, nil } // reset file pointer to the beginning of the file shares data - offset := f.hdr.Size() + f.axisRootsSize() - _, err := f.fl.Seek(int64(offset), io.SeekStart) + offset := o.sharesOffset() + _, err := o.fl.Seek(int64(offset), io.SeekStart) if err != nil { return nil, fmt.Errorf("discarding header: %w", err) } - square, err := readSquare(f.fl, share.Size, f.size()) + square, err := readSquare(o.fl, share.Size, o.size()) if err != nil { return nil, fmt.Errorf("reading ODS: %w", err) } - if !f.disableCache { - f.lock.Lock() - f.ods = square - f.lock.Unlock() + if !o.disableCache { + o.lock.Lock() + o.ods = square + o.lock.Unlock() } return square, nil } -func readRow(fl io.ReaderAt, hdr *headerV0, sharesOffset, quadrantIdx, rowIdx int) ([]share.Share, error) { - shrLn := int(hdr.shareSize) - odsLn := int(hdr.squareSize / 2) - quadrantOffset := quadrantIdx * odsLn * odsLn * shrLn - - shares := make([]share.Share, odsLn) +func readAxisHalf(r io.ReaderAt, axisTp rsmt2d.Axis, shrLn, edsLn, offset, axisIdx int) ([]share.Share, error) { + switch axisTp { + case rsmt2d.Row: + return readRowHalf(r, shrLn, edsLn, offset, axisIdx) + case rsmt2d.Col: + return readColHalf(r, shrLn, edsLn, offset, axisIdx) + default: + return nil, fmt.Errorf("unknown axis") + } +} +func readRowHalf(fl io.ReaderAt, shrLn, edsLn, offset, rowIdx int) ([]share.Share, error) { + odsLn := edsLn / 2 rowOffset := rowIdx * odsLn * shrLn - offset := sharesOffset + quadrantOffset + rowOffset + offset = offset + rowOffset + shares := make([]share.Share, odsLn) axsData := make([]byte, odsLn*shrLn) if _, err := fl.ReadAt(axsData, int64(offset)); err != nil { return nil, err @@ -378,15 +390,12 @@ func readRow(fl io.ReaderAt, hdr *headerV0, sharesOffset, quadrantIdx, rowIdx in return shares, nil } -func readCol(fl io.ReaderAt, hdr *headerV0, sharesOffset, quadrantIdx, colIdx int) ([]share.Share, error) { - shrLn := int(hdr.shareSize) - odsLn := int(hdr.squareSize / 2) - quadrantOffset := quadrantIdx * odsLn * odsLn * shrLn - +func readColHalf(fl io.ReaderAt, shrLn, edsLn, offset, colIdx int) ([]share.Share, error) { + odsLn := edsLn / 2 shares := make([]share.Share, odsLn) for i := range shares { pos := colIdx + i*odsLn - offset := sharesOffset + quadrantOffset + pos*shrLn + offset := offset + pos*shrLn shr := make(share.Share, shrLn) if _, err := fl.ReadAt(shr, int64(offset)); err != nil { @@ -396,12 +405,3 @@ func readCol(fl io.ReaderAt, hdr *headerV0, sharesOffset, quadrantIdx, colIdx in } return shares, nil } - -func (f *ODSFile) axis(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) ([]share.Share, error) { - half, err := f.AxisHalf(ctx, axisType, axisIdx) - if err != nil { - return nil, err - } - - return half.Extended() -} diff --git a/store/file/ods_q4.go b/store/file/ods_q4.go new file mode 100644 index 0000000000..fec58f81a5 --- /dev/null +++ b/store/file/ods_q4.go @@ -0,0 +1,183 @@ +package file + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "sync/atomic" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" +) + +var _ eds.AccessorStreamer = (*ODSQ4)(nil) + +// ODSQ4 is an Accessor that combines ODS and Q4 files. +// It extends the ODS with the ability to read Q4 of the EDS. +// Reading from the fourth quadrant allows to serve samples from Q2 and Q3 quadrants of the square, +// without reading entire Q1. +type ODSQ4 struct { + ods *ODS + + q4Open func() (*Q4, error) + q4Mu sync.Mutex + q4Opened atomic.Bool + q4 *Q4 +} + +// CreateODSQ4 creates ODS and Q4 files under the given FS paths. +func CreateODSQ4( + pathODS, pathQ4 string, + roots *share.AxisRoots, + eds *rsmt2d.ExtendedDataSquare, +) error { + errCh := make(chan error) + go func() { + // doing this async shaves off ~27% of time for 128 ODS + // for bigger ODSes the discrepancy is even bigger + err := CreateQ4(pathQ4, roots, eds) + if err != nil { + err = fmt.Errorf("сreating Q4 file: %w", err) + } + + errCh <- err + }() + + if err := CreateODS(pathODS, roots, eds); err != nil { + return fmt.Errorf("creating ODS file: %w", err) + } + + err := <-errCh + if err != nil { + return err + } + + return nil +} + +// OpenODSQ4 opens ODS file under the given FS path. The Q4 is opened lazily +// on demand. +func OpenODSQ4(pathODS, pathQ4 string) (*ODSQ4, error) { + ods, err := OpenODS(pathODS) + if err != nil { + return nil, fmt.Errorf("failed to open ODS: %w", err) + } + + return &ODSQ4{ + ods: ods, + q4Open: func() (*Q4, error) { + return OpenQ4(pathQ4) + }, + }, nil +} + +func (odsq4 *ODSQ4) getQ4() (eds.Accessor, error) { + if odsq4.q4Opened.Load() { + return odsq4.q4, nil + } + + odsq4.q4Mu.Lock() + defer odsq4.q4Mu.Unlock() + if odsq4.q4Opened.Load() { + return odsq4.q4, nil + } + + q4, err := odsq4.q4Open() + if err != nil { + return nil, fmt.Errorf("failed to open Q4: %w", err) + } + + odsq4.q4Opened.Store(true) + odsq4.q4 = q4 + return q4, nil +} + +func (odsq4 *ODSQ4) Size(ctx context.Context) int { + return odsq4.ods.Size(ctx) +} + +func (odsq4 *ODSQ4) DataHash(ctx context.Context) (share.DataHash, error) { + return odsq4.ods.DataHash(ctx) +} + +func (odsq4 *ODSQ4) AxisRoots(ctx context.Context) (*share.AxisRoots, error) { + return odsq4.ods.AxisRoots(ctx) +} + +func (odsq4 *ODSQ4) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { + // use native AxisHalf implementation, to read axis from q4 quadrant when possible + half, err := odsq4.AxisHalf(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return shwap.Sample{}, fmt.Errorf("reading axis: %w", err) + } + shares, err := half.Extended() + if err != nil { + return shwap.Sample{}, fmt.Errorf("extending shares: %w", err) + } + return shwap.SampleFromShares(shares, rsmt2d.Row, rowIdx, colIdx) +} + +func (odsq4 *ODSQ4) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { + size := odsq4.Size(ctx) // TODO(@Wondertan): Should return error. + + var acsr eds.Accessor = odsq4.ods + if axisIdx >= size/2 { + q4, err := odsq4.getQ4() + if err != nil { + return eds.AxisHalf{}, err + } + + acsr = q4 + } + + half, err := acsr.AxisHalf(ctx, axisType, axisIdx) + if err != nil { + return eds.AxisHalf{}, fmt.Errorf("reading axis half: %w", err) + } + + return half, nil +} + +func (odsq4 *ODSQ4) RowNamespaceData(ctx context.Context, + namespace share.Namespace, + rowIdx int, +) (shwap.RowNamespaceData, error) { + half, err := odsq4.AxisHalf(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return shwap.RowNamespaceData{}, fmt.Errorf("reading axis: %w", err) + } + shares, err := half.Extended() + if err != nil { + return shwap.RowNamespaceData{}, fmt.Errorf("extending shares: %w", err) + } + return shwap.RowNamespaceDataFromShares(shares, namespace, rowIdx) +} + +func (odsq4 *ODSQ4) Shares(ctx context.Context) ([]share.Share, error) { + return odsq4.ods.Shares(ctx) +} + +func (odsq4 *ODSQ4) Reader() (io.Reader, error) { + return odsq4.ods.Reader() +} + +func (odsq4 *ODSQ4) Close() error { + err := odsq4.ods.Close() + if err != nil { + err = fmt.Errorf("closing ODS file: %w", err) + } + + if odsq4.q4Opened.Load() { + errQ4 := odsq4.q4.Close() + if err != nil { + errQ4 = fmt.Errorf("closing Q4 file: %w", errQ4) + err = errors.Join(err, errQ4) + } + } + return err +} diff --git a/store/file/ods_q4_test.go b/store/file/ods_q4_test.go new file mode 100644 index 0000000000..3f660df535 --- /dev/null +++ b/store/file/ods_q4_test.go @@ -0,0 +1,108 @@ +package file + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/rand" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" + eds "github.com/celestiaorg/celestia-node/share/new_eds" +) + +func TestCreateODSQ4File(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + edsIn := edstest.RandEDS(t, 8) + odsq4 := createODSQ4File(t, edsIn) + + shares, err := odsq4.Shares(ctx) + require.NoError(t, err) + expected := edsIn.FlattenedODS() + require.Equal(t, expected, shares) + require.NoError(t, odsq4.Close()) +} + +func TestODSQ4File(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + t.Cleanup(cancel) + + ODSSize := 16 + eds.TestSuiteAccessor(ctx, t, createODSAccessor, ODSSize) + eds.TestStreamer(ctx, t, createODSAccessorStreamer, ODSSize) +} + +// BenchmarkAxisFromODSQ4File/Size:32/ProofType:row/squareHalf:0-16 354836 3345 ns/op +// BenchmarkAxisFromODSQ4File/Size:32/ProofType:row/squareHalf:1-16 339547 3187 ns/op +// BenchmarkAxisFromODSQ4File/Size:32/ProofType:col/squareHalf:0-16 69364 16440 ns/op +// BenchmarkAxisFromODSQ4File/Size:32/ProofType:col/squareHalf:1-16 66928 15964 ns/op +// BenchmarkAxisFromODSQ4File/Size:64/ProofType:row/squareHalf:0-16 223290 5184 ns/op +// BenchmarkAxisFromODSQ4File/Size:64/ProofType:row/squareHalf:1-16 194018 5240 ns/op +// BenchmarkAxisFromODSQ4File/Size:64/ProofType:col/squareHalf:0-16 39949 29549 ns/op +// BenchmarkAxisFromODSQ4File/Size:64/ProofType:col/squareHalf:1-16 39356 29912 ns/op +// BenchmarkAxisFromODSQ4File/Size:128/ProofType:row/squareHalf:0-16 134220 8903 ns/op +// BenchmarkAxisFromODSQ4File/Size:128/ProofType:row/squareHalf:1-16 125110 8789 ns/op +// BenchmarkAxisFromODSQ4File/Size:128/ProofType:col/squareHalf:0-16 15075 74996 ns/op +// BenchmarkAxisFromODSQ4File/Size:128/ProofType:col/squareHalf:1-16 15530 74855 ns/op +func BenchmarkAxisFromODSQ4File(b *testing.B) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + b.Cleanup(cancel) + + minSize, maxSize := 32, 128 + newFile := func(size int) eds.Accessor { + eds := edstest.RandEDS(b, size) + return createODSQ4File(b, eds) + } + eds.BenchGetHalfAxisFromAccessor(ctx, b, newFile, minSize, maxSize) +} + +// BenchmarkSampleFromODSQ4File/Size:32/quadrant:1-16 14260 82827 ns/op +// BenchmarkSampleFromODSQ4File/Size:32/quadrant:2-16 14281 85465 ns/op +// BenchmarkSampleFromODSQ4File/Size:32/quadrant:3-16 12938 91213 ns/op +// BenchmarkSampleFromODSQ4File/Size:32/quadrant:4-16 12934 94077 ns/op +// BenchmarkSampleFromODSQ4File/Size:64/quadrant:1-16 7497 172978 ns/op +// BenchmarkSampleFromODSQ4File/Size:64/quadrant:2-16 6332 191139 ns/op +// BenchmarkSampleFromODSQ4File/Size:64/quadrant:3-16 5852 214140 ns/op +// BenchmarkSampleFromODSQ4File/Size:64/quadrant:4-16 5899 215875 ns/op +// BenchmarkSampleFromODSQ4File/Size:128/quadrant:1-16 3520 399728 ns/op +// BenchmarkSampleFromODSQ4File/Size:128/quadrant:2-16 3242 410557 ns/op +// BenchmarkSampleFromODSQ4File/Size:128/quadrant:3-16 2590 424491 ns/op +// BenchmarkSampleFromODSQ4File/Size:128/quadrant:4-16 2812 444697 ns/op +func BenchmarkSampleFromODSQ4File(b *testing.B) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + b.Cleanup(cancel) + + minSize, maxSize := 32, 128 + newFile := func(size int) eds.Accessor { + eds := edstest.RandEDS(b, size) + return createODSQ4File(b, eds) + } + eds.BenchGetSampleFromAccessor(ctx, b, newFile, minSize, maxSize) +} + +func createODSAccessorStreamer(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.AccessorStreamer { + return createODSFile(t, eds) +} + +func createODSAccessor(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { + return createODSFile(t, eds) +} + +func createODSQ4File(t testing.TB, eds *rsmt2d.ExtendedDataSquare) *ODSQ4 { + path := t.TempDir() + "/" + strconv.Itoa(rand.Intn(1000)) + roots, err := share.NewAxisRoots(eds) + require.NoError(t, err) + pathODS, pathQ4 := path+".ods", path+".q4" + err = CreateODSQ4(pathODS, pathQ4, roots, eds) + require.NoError(t, err) + odsq4, err := OpenODSQ4(pathODS, pathQ4) + require.NoError(t, err) + return odsq4 +} diff --git a/store/file/ods_test.go b/store/file/ods_test.go index c26517d883..a38e31e205 100644 --- a/store/file/ods_test.go +++ b/store/file/ods_test.go @@ -21,41 +21,26 @@ func TestCreateODSFile(t *testing.T) { t.Cleanup(cancel) edsIn := edstest.RandEDS(t, 8) - roots, err := share.NewAxisRoots(edsIn) - require.NoError(t, err) - path := t.TempDir() + "/" + roots.String() - f, err := CreateODSFile(path, roots, edsIn) + f := createODSFile(t, edsIn) + readRoots, err := share.NewAxisRoots(edsIn) require.NoError(t, err) shares, err := f.Shares(ctx) require.NoError(t, err) + expected := edsIn.FlattenedODS() require.Equal(t, expected, shares) - require.Equal(t, share.DataHash(roots.Hash()), f.hdr.datahash) - readRoots, err := share.NewAxisRoots(edsIn) - require.NoError(t, err) - require.True(t, roots.Equals(readRoots)) - require.NoError(t, f.Close()) - f, err = OpenODSFile(path) + roots, err := f.AxisRoots(ctx) require.NoError(t, err) - shares, err = f.Shares(ctx) - require.NoError(t, err) - require.Equal(t, expected, shares) require.Equal(t, share.DataHash(roots.Hash()), f.hdr.datahash) - readRoots, err = share.NewAxisRoots(edsIn) - require.NoError(t, err) require.True(t, roots.Equals(readRoots)) require.NoError(t, f.Close()) } func TestReadODSFromFile(t *testing.T) { eds := edstest.RandEDS(t, 8) - roots, err := share.NewAxisRoots(eds) - require.NoError(t, err) - path := t.TempDir() + "/testfile" - f, err := CreateODSFile(path, roots, eds) - require.NoError(t, err) + f := createODSFile(t, eds) ods, err := f.readODS() require.NoError(t, err) @@ -76,18 +61,18 @@ func TestODSFile(t *testing.T) { eds.TestStreamer(ctx, t, createStreamer, ODSSize) } -// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:0-10 460231 2555 ns/op -// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:1-10 5320 218609 ns/op -// BenchmarkAxisFromODSFile/Size:32/ProofType:col/squareHalf:0-10 4572247 256.7 ns/op -// BenchmarkAxisFromODSFile/Size:32/ProofType:col/squareHalf:1-10 5170 212567 ns/op -// BenchmarkAxisFromODSFile/Size:64/ProofType:row/squareHalf:0-10 299281 3777 ns/op -// BenchmarkAxisFromODSFile/Size:64/ProofType:row/squareHalf:1-10 1646 661930 ns/op -// BenchmarkAxisFromODSFile/Size:64/ProofType:col/squareHalf:0-10 3318733 359.1 ns/op -// BenchmarkAxisFromODSFile/Size:64/ProofType:col/squareHalf:1-10 1600 648482 ns/op -// BenchmarkAxisFromODSFile/Size:128/ProofType:row/squareHalf:0-10 170642 6347 ns/op -// BenchmarkAxisFromODSFile/Size:128/ProofType:row/squareHalf:1-10 328 3194674 ns/op -// BenchmarkAxisFromODSFile/Size:128/ProofType:col/squareHalf:0-10 1931910 640.9 ns/op -// BenchmarkAxisFromODSFile/Size:128/ProofType:col/squareHalf:1-10 387 3304090 ns/op +// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:0-16 382011 3104 ns/op +// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:1-16 9320 122408 ns/op +// BenchmarkAxisFromODSFile/Size:32/ProofType:col/squareHalf:0-16 4408911 266.5 ns/op +// BenchmarkAxisFromODSFile/Size:32/ProofType:col/squareHalf:1-16 9488 119472 ns/op +// BenchmarkAxisFromODSFile/Size:64/ProofType:row/squareHalf:0-16 240913 5239 ns/op +// BenchmarkAxisFromODSFile/Size:64/ProofType:row/squareHalf:1-16 1018 1249622 ns/op +// BenchmarkAxisFromODSFile/Size:64/ProofType:col/squareHalf:0-16 2614063 451.8 ns/op +// BenchmarkAxisFromODSFile/Size:64/ProofType:col/squareHalf:1-16 1917 661510 ns/op +// BenchmarkAxisFromODSFile/Size:128/ProofType:row/squareHalf:0-16 119324 10425 ns/op +// BenchmarkAxisFromODSFile/Size:128/ProofType:row/squareHalf:1-16 163 9926752 ns/op +// BenchmarkAxisFromODSFile/Size:128/ProofType:col/squareHalf:0-16 1634124 726.2 ns/op +// BenchmarkAxisFromODSFile/Size:128/ProofType:col/squareHalf:1-16 205 5508394 ns/op func BenchmarkAxisFromODSFile(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) b.Cleanup(cancel) @@ -100,18 +85,18 @@ func BenchmarkAxisFromODSFile(b *testing.B) { eds.BenchGetHalfAxisFromAccessor(ctx, b, newFile, minSize, maxSize) } -// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:row/squareHalf:0-10 481326 2447 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:row/squareHalf:1-10 5134 218191 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:col/squareHalf:0-10 56260 21109 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:col/squareHalf:1-10 5608 217877 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:row/squareHalf:0-10 321994 3941 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:row/squareHalf:1-10 1237 919419 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:col/squareHalf:0-10 28233 43209 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:col/squareHalf:1-10 1334 898654 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:row/squareHalf:0-10 179788 6839 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:row/squareHalf:1-10 310 3935097 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:col/squareHalf:0-10 13867 85854 ns/op -// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:col/squareHalf:1-10 298 3900021 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:row/squareHalf:0-16 378975 3141 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:row/squareHalf:1-16 1026 1175651 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:col/squareHalf:0-16 80200 14721 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:32/ProofType:col/squareHalf:1-16 1014 1180527 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:row/squareHalf:0-16 212041 5417 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:row/squareHalf:1-16 253 4205953 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:col/squareHalf:0-16 35289 34033 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:64/ProofType:col/squareHalf:1-16 325 3229517 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:row/squareHalf:0-16 132261 8535 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:row/squareHalf:1-16 48 22963229 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:col/squareHalf:0-16 19053 62858 ns/op +// BenchmarkAxisFromODSFileDisabledCache/Size:128/ProofType:col/squareHalf:1-16 48 21185201 ns/op func BenchmarkAxisFromODSFileDisabledCache(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) b.Cleanup(cancel) @@ -124,18 +109,18 @@ func BenchmarkAxisFromODSFileDisabledCache(b *testing.B) { eds.BenchGetHalfAxisFromAccessor(ctx, b, newFile, minSize, maxSize) } -// BenchmarkSampleFromODSFile/Size:32/quadrant:1-10 10908 104872 ns/op -// BenchmarkSampleFromODSFile/Size:32/quadrant:2-10 9906 104641 ns/op -// BenchmarkSampleFromODSFile/Size:32/quadrant:3-10 8983 123384 ns/op -// BenchmarkSampleFromODSFile/Size:32/quadrant:4-10 3476 343850 ns/op -// BenchmarkSampleFromODSFile/Size:64/quadrant:1-10 5835 200151 ns/op -// BenchmarkSampleFromODSFile/Size:64/quadrant:2-10 5401 201271 ns/op -// BenchmarkSampleFromODSFile/Size:64/quadrant:3-10 4648 239045 ns/op -// BenchmarkSampleFromODSFile/Size:64/quadrant:4-10 1263 895983 ns/op -// BenchmarkSampleFromODSFile/Size:128/quadrant:1-10 2475 409687 ns/op -// BenchmarkSampleFromODSFile/Size:128/quadrant:2-10 2790 411153 ns/op -// BenchmarkSampleFromODSFile/Size:128/quadrant:3-10 2286 487123 ns/op -// BenchmarkSampleFromODSFile/Size:128/quadrant:4-10 321 3698735 ns/op +// BenchmarkSampleFromODSFile/Size:32/quadrant:1-16 13684 87558 ns/op +// BenchmarkSampleFromODSFile/Size:32/quadrant:2-16 13358 85677 ns/op +// BenchmarkSampleFromODSFile/Size:32/quadrant:3-16 10000 102631 ns/op +// BenchmarkSampleFromODSFile/Size:32/quadrant:4-16 5175 222615 ns/op +// BenchmarkSampleFromODSFile/Size:64/quadrant:1-16 7142 173784 ns/op +// BenchmarkSampleFromODSFile/Size:64/quadrant:2-16 6820 171602 ns/op +// BenchmarkSampleFromODSFile/Size:64/quadrant:3-16 5232 201875 ns/op +// BenchmarkSampleFromODSFile/Size:64/quadrant:4-16 1448 1035275 ns/op +// BenchmarkSampleFromODSFile/Size:128/quadrant:1-16 3829 359528 ns/op +// BenchmarkSampleFromODSFile/Size:128/quadrant:2-16 3303 358142 ns/op +// BenchmarkSampleFromODSFile/Size:128/quadrant:3-16 2666 431895 ns/op +// BenchmarkSampleFromODSFile/Size:128/quadrant:4-16 183 7347936 ns/op func BenchmarkSampleFromODSFile(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) b.Cleanup(cancel) @@ -148,18 +133,19 @@ func BenchmarkSampleFromODSFile(b *testing.B) { eds.BenchGetSampleFromAccessor(ctx, b, newFile, minSize, maxSize) } -// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:1-10 11040 106378 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:2-10 9936 106403 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:3-10 8635 124142 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:4-10 1940 596330 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:1-10 5930 199782 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:2-10 5494 201658 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:3-10 4756 237897 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:4-10 638 1874038 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:1-10 2500 408092 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:2-10 2696 410861 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:3-10 2290 490488 ns/op -// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:4-10 159 7660843 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:1 +// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:1-16 13152 85301 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:2-16 14140 84876 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:3-16 11756 102360 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:32/quadrant:4-16 985 1292232 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:1-16 7678 172306 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:2-16 5744 176533 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:3-16 6022 207884 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:64/quadrant:4-16 304 3881858 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:1-16 3697 355835 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:2-16 3558 360162 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:3-16 3027 410976 ns/op +// BenchmarkSampleFromODSFileDisabledCache/Size:128/quadrant:4-16 54 21796460 ns/op func BenchmarkSampleFromODSFileDisabledCache(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) b.Cleanup(cancel) @@ -187,21 +173,23 @@ func createCachedStreamer(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Acce return f } -func createODSFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) *ODSFile { +func createODSFile(t testing.TB, eds *rsmt2d.ExtendedDataSquare) *ODS { path := t.TempDir() + "/" + strconv.Itoa(rand.Intn(1000)) roots, err := share.NewAxisRoots(eds) require.NoError(t, err) - fl, err := CreateODSFile(path, roots, eds) + err = CreateODS(path, roots, eds) require.NoError(t, err) - return fl + ods, err := OpenODS(path) + return ods } func createODSFileDisabledCache(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { path := t.TempDir() + "/" + strconv.Itoa(rand.Intn(1000)) roots, err := share.NewAxisRoots(eds) require.NoError(t, err) - fl, err := CreateODSFile(path, roots, eds) + err = CreateODS(path, roots, eds) require.NoError(t, err) - fl.disableCache = true - return fl + ods, err := OpenODS(path) + ods.disableCache = true + return ods } diff --git a/store/file/q1q4_file.go b/store/file/q1q4_file.go deleted file mode 100644 index d44d7b2ef2..0000000000 --- a/store/file/q1q4_file.go +++ /dev/null @@ -1,174 +0,0 @@ -package file - -import ( - "bufio" - "context" - "fmt" - "io" - - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/share" - eds "github.com/celestiaorg/celestia-node/share/new_eds" - "github.com/celestiaorg/celestia-node/share/shwap" -) - -var _ eds.AccessorStreamer = (*Q1Q4File)(nil) - -// Q1Q4File represents a file that contains the first and fourth quadrants of an extended data -// square. It extends the ODSFile with the ability to read the fourth quadrant of the square. -// Reading from the fourth quadrant allows to serve samples from Q2 and Q4 quadrants of the square, -// without the need to read entire Q1. -type Q1Q4File struct { - ods *ODSFile -} - -func OpenQ1Q4File(path string) (*Q1Q4File, error) { - ods, err := OpenODSFile(path) - if err != nil { - return nil, err - } - - return &Q1Q4File{ - ods: ods, - }, nil -} - -func CreateQ1Q4File(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) (*Q1Q4File, error) { - ods, err := CreateODSFile(path, roots, eds) - if err != nil { - return nil, err - } - - // buffering gives us ~4x speed up - buf := bufio.NewWriterSize(ods.fl, writeBufferSize) - - err = writeQ4(buf, eds) - if err != nil { - return nil, fmt.Errorf("writing Q4: %w", err) - } - - err = buf.Flush() - if err != nil { - return nil, fmt.Errorf("flushing Q4: %w", err) - } - - err = ods.fl.Sync() - if err != nil { - return nil, fmt.Errorf("syncing file: %w", err) - } - - return &Q1Q4File{ - ods: ods, - }, nil -} - -// writeQ4 writes the frth quadrant of the square to the writer. iIt writes the quadrant in row-major -// order -func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error { - half := eds.Width() / 2 - for i := range half { - for j := range half { - shr := eds.GetCell(i+half, j+half) // TODO: Avoid copying inside GetCell - _, err := w.Write(shr) - if err != nil { - return fmt.Errorf("writing share: %w", err) - } - } - } - return nil -} - -func (f *Q1Q4File) Size(ctx context.Context) int { - return f.ods.Size(ctx) -} - -func (f *Q1Q4File) DataHash(ctx context.Context) (share.DataHash, error) { - return f.ods.DataHash(ctx) -} - -func (f *Q1Q4File) AxisRoots(ctx context.Context) (*share.AxisRoots, error) { - return f.ods.AxisRoots(ctx) -} - -func (f *Q1Q4File) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) { - // use native AxisHalf implementation, to read axis from Q4 quandrant when possible - half, err := f.AxisHalf(ctx, rsmt2d.Row, rowIdx) - if err != nil { - return shwap.Sample{}, fmt.Errorf("reading axis: %w", err) - } - shares, err := half.Extended() - if err != nil { - return shwap.Sample{}, fmt.Errorf("extending shares: %w", err) - } - return shwap.SampleFromShares(shares, rsmt2d.Row, rowIdx, colIdx) -} - -func (f *Q1Q4File) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { - if axisIdx < f.ods.size()/2 { - half, err := f.ods.readAxisHalf(axisType, axisIdx) - if err != nil { - return eds.AxisHalf{}, fmt.Errorf("reading axis half: %w", err) - } - return half, nil - } - - return f.readAxisHalfFromQ4(axisType, axisIdx) -} - -func (f *Q1Q4File) RowNamespaceData(ctx context.Context, - namespace share.Namespace, - rowIdx int, -) (shwap.RowNamespaceData, error) { - half, err := f.AxisHalf(ctx, rsmt2d.Row, rowIdx) - if err != nil { - return shwap.RowNamespaceData{}, fmt.Errorf("reading axis: %w", err) - } - shares, err := half.Extended() - if err != nil { - return shwap.RowNamespaceData{}, fmt.Errorf("extending shares: %w", err) - } - return shwap.RowNamespaceDataFromShares(shares, namespace, rowIdx) -} - -func (f *Q1Q4File) Shares(ctx context.Context) ([]share.Share, error) { - return f.ods.Shares(ctx) -} - -func (f *Q1Q4File) Reader() (io.Reader, error) { - return f.ods.Reader() -} - -func (f *Q1Q4File) Close() error { - return f.ods.Close() -} - -func (f *Q1Q4File) readAxisHalfFromQ4(axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { - q4idx := axisIdx - f.ods.size()/2 - if q4idx < 0 { - return eds.AxisHalf{}, fmt.Errorf("invalid axis index for Q4: %d", axisIdx) - } - offset := f.ods.sharesOffset() - switch axisType { - case rsmt2d.Col: - shares, err := readCol(f.ods.fl, f.ods.hdr, offset, 1, q4idx) - if err != nil { - return eds.AxisHalf{}, err - } - return eds.AxisHalf{ - Shares: shares, - IsParity: true, - }, nil - case rsmt2d.Row: - shares, err := readRow(f.ods.fl, f.ods.hdr, offset, 1, q4idx) - if err != nil { - return eds.AxisHalf{}, err - } - return eds.AxisHalf{ - Shares: shares, - IsParity: true, - }, nil - default: - return eds.AxisHalf{}, fmt.Errorf("invalid axis type: %d", axisType) - } -} diff --git a/store/file/q1q4_file_test.go b/store/file/q1q4_file_test.go deleted file mode 100644 index 7ab235b82a..0000000000 --- a/store/file/q1q4_file_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package file - -import ( - "context" - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/libs/rand" - - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds/edstest" - eds "github.com/celestiaorg/celestia-node/share/new_eds" -) - -func TestCreateQ1Q4File(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - t.Cleanup(cancel) - - edsIn := edstest.RandEDS(t, 8) - roots, err := share.NewAxisRoots(edsIn) - require.NoError(t, err) - path := t.TempDir() + "/" + roots.String() - f, err := CreateQ1Q4File(path, roots, edsIn) - require.NoError(t, err) - - shares, err := f.Shares(ctx) - require.NoError(t, err) - expected := edsIn.FlattenedODS() - require.Equal(t, expected, shares) - require.NoError(t, f.Close()) - - f, err = OpenQ1Q4File(path) - require.NoError(t, err) - shares, err = f.Shares(ctx) - require.NoError(t, err) - require.Equal(t, expected, shares) - require.NoError(t, f.Close()) -} - -func TestQ1Q4File(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - t.Cleanup(cancel) - - ODSSize := 16 - eds.TestSuiteAccessor(ctx, t, createQ1Q4File, ODSSize) -} - -// BenchmarkAxisFromQ1Q4File/Size:32/ProofType:row/squareHalf:0-10 481144 2413 ns/op -// BenchmarkAxisFromQ1Q4File/Size:32/ProofType:row/squareHalf:1-10 479437 2431 ns/op -// BenchmarkAxisFromQ1Q4File/Size:32/ProofType:col/squareHalf:0-10 56775 21272 ns/op -// BenchmarkAxisFromQ1Q4File/Size:32/ProofType:col/squareHalf:1-10 57283 20941 ns/op -// BenchmarkAxisFromQ1Q4File/Size:64/ProofType:row/squareHalf:0-10 301357 3870 ns/op -// BenchmarkAxisFromQ1Q4File/Size:64/ProofType:row/squareHalf:1-10 329796 3913 ns/op -// BenchmarkAxisFromQ1Q4File/Size:64/ProofType:col/squareHalf:0-10 28035 42560 ns/op -// BenchmarkAxisFromQ1Q4File/Size:64/ProofType:col/squareHalf:1-10 28179 42447 ns/op -// BenchmarkAxisFromQ1Q4File/Size:128/ProofType:row/squareHalf:0-10 170607 6368 ns/op -// BenchmarkAxisFromQ1Q4File/Size:128/ProofType:row/squareHalf:1-10 184579 6517 ns/op -// BenchmarkAxisFromQ1Q4File/Size:128/ProofType:col/squareHalf:0-10 13952 84004 ns/op -// BenchmarkAxisFromQ1Q4File/Size:128/ProofType:col/squareHalf:1-10 14398 83240 ns/op -func BenchmarkAxisFromQ1Q4File(b *testing.B) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - b.Cleanup(cancel) - - minSize, maxSize := 32, 128 - newFile := func(size int) eds.Accessor { - eds := edstest.RandEDS(b, size) - return createQ1Q4File(b, eds) - } - eds.BenchGetHalfAxisFromAccessor(ctx, b, newFile, minSize, maxSize) -} - -// BenchmarkSampleFromQ1Q4File/Size:32/quadrant:1-10 11000 103665 ns/op -// BenchmarkSampleFromQ1Q4File/Size:32/quadrant:2-10 9757 105395 ns/op -// BenchmarkSampleFromQ1Q4File/Size:32/quadrant:3-10 8880 119081 ns/op -// BenchmarkSampleFromQ1Q4File/Size:32/quadrant:4-10 9049 118572 ns/op -// BenchmarkSampleFromQ1Q4File/Size:64/quadrant:1-10 5372 200685 ns/op -// BenchmarkSampleFromQ1Q4File/Size:64/quadrant:2-10 5499 200007 ns/op -// BenchmarkSampleFromQ1Q4File/Size:64/quadrant:3-10 4879 233044 ns/op -// BenchmarkSampleFromQ1Q4File/Size:64/quadrant:4-10 4802 232696 ns/op -// BenchmarkSampleFromQ1Q4File/Size:128/quadrant:1-10 2810 398169 ns/op -// BenchmarkSampleFromQ1Q4File/Size:128/quadrant:2-10 2830 407077 ns/op -// BenchmarkSampleFromQ1Q4File/Size:128/quadrant:3-10 2434 478656 ns/op -// BenchmarkSampleFromQ1Q4File/Size:128/quadrant:4-10 2368 478021 ns/op -func BenchmarkSampleFromQ1Q4File(b *testing.B) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - b.Cleanup(cancel) - - minSize, maxSize := 32, 128 - newFile := func(size int) eds.Accessor { - eds := edstest.RandEDS(b, size) - return createQ1Q4File(b, eds) - } - eds.BenchGetSampleFromAccessor(ctx, b, newFile, minSize, maxSize) -} - -func createQ1Q4File(t testing.TB, eds *rsmt2d.ExtendedDataSquare) eds.Accessor { - path := t.TempDir() + "/" + strconv.Itoa(rand.Intn(1000)) - roots, err := share.NewAxisRoots(eds) - require.NoError(t, err) - fl, err := CreateQ1Q4File(path, roots, eds) - require.NoError(t, err) - return fl -} diff --git a/store/file/q4.go b/store/file/q4.go new file mode 100644 index 0000000000..72de55c529 --- /dev/null +++ b/store/file/q4.go @@ -0,0 +1,170 @@ +package file + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" +) + +var _ eds.AccessorStreamer = (*Q4)(nil) + +// Q4 implements eds.Accessor as an FS file. +// It stores the Q4 of the EDS and its metadata in file's header. +// It currently implements access only to Q4 +// and can be extended to serve the whole EDS as the need comes. +type Q4 struct { + hdr *headerV0 + file *os.File +} + +// CreateQ4 creates a new file under given FS path and +// writes the Q4 into it out of given EDS. +// It ensures FS is synced after writing finishes. +// It may leave partially written file if any of the writes fail. +func CreateQ4( + path string, + roots *share.AxisRoots, + eds *rsmt2d.ExtendedDataSquare, +) error { + mod := os.O_RDWR | os.O_CREATE | os.O_EXCL // ensure we fail if already exist + f, err := os.OpenFile(path, mod, filePermissions) + if err != nil { + return fmt.Errorf("creating Q4 file: %w", err) + } + + hdr := &headerV0{ + fileVersion: fileV0, + fileType: q4, + shareSize: share.Size, + squareSize: uint16(eds.Width()), + datahash: roots.Hash(), + } + + err = writeQ4File(f, eds, hdr) + if errClose := f.Close(); errClose != nil { + err = errors.Join(err, fmt.Errorf("closing created Q4 file: %w", errClose)) + } + + return err +} + +// writeQ4File full Q4 content into OS File. +func writeQ4File(f *os.File, eds *rsmt2d.ExtendedDataSquare, hdr *headerV0) error { + // buffering gives us ~4x speed up + buf := bufio.NewWriterSize(f, writeBufferSize) + + if err := writeHeader(buf, hdr); err != nil { + return fmt.Errorf("writing Q4 header: %w", err) + } + + if err := writeQ4(buf, eds); err != nil { + return fmt.Errorf("writing Q4: %w", err) + } + + if err := buf.Flush(); err != nil { + return fmt.Errorf("flushing Q4: %w", err) + } + + if err := f.Sync(); err != nil { + return fmt.Errorf("syncing Q4 file: %w", err) + } + + return nil +} + +// writeQ4 writes the forth quadrant of the square to the writer. It writes the quadrant in row-major +// order. +func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error { + half := eds.Width() / 2 + for i := range half { + for j := range half { + shr := eds.GetCell(i+half, j+half) // TODO: Avoid copying inside GetCell + _, err := w.Write(shr) + if err != nil { + return fmt.Errorf("writing share: %w", err) + } + } + } + return nil +} + +// OpenQ4 opens an existing Q4 file under given FS path. +// It only reads the header with metadata. The other content +// of the File is read lazily. +// If file is empty, the ErrEmptyFile is returned. +// File must be closed after usage. +func OpenQ4(path string) (*Q4, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + + hdr, err := readHeader(f) + if err != nil { + return nil, err + } + + return &Q4{ + hdr: hdr, + file: f, + }, nil +} + +func (q4 *Q4) Close() error { + return q4.file.Close() +} + +func (q4 *Q4) AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (eds.AxisHalf, error) { + size := q4.Size(ctx) + q4AxisIdx := axisIdx - size/2 + if q4AxisIdx < 0 { + return eds.AxisHalf{}, fmt.Errorf("invalid axis index for Q4: %d", axisIdx) + } + + axisHalf, err := readAxisHalf(q4.file, axisType, q4.hdr.ShareSize(), size, q4.hdr.Size(), q4AxisIdx) + if err != nil { + return eds.AxisHalf{}, fmt.Errorf("reading axis half: %w", err) + } + + return eds.AxisHalf{ + Shares: axisHalf, + IsParity: true, + }, nil +} + +func (q4 *Q4) Size(context.Context) int { + return q4.hdr.SquareSize() +} + +func (q4 *Q4) DataHash(context.Context) (share.DataHash, error) { + return q4.hdr.datahash, nil +} + +func (q4 *Q4) AxisRoots(context.Context) (*share.AxisRoots, error) { + panic("not implemented") +} + +func (q4 *Q4) Sample(context.Context, int, int) (shwap.Sample, error) { + panic("not implemented") +} + +func (q4 *Q4) RowNamespaceData(context.Context, share.Namespace, int) (shwap.RowNamespaceData, error) { + panic("not implemented") +} + +func (q4 *Q4) Shares(context.Context) ([]share.Share, error) { + panic("not implemented") +} + +func (q4 *Q4) Reader() (io.Reader, error) { + panic("not implemented") +} diff --git a/store/store.go b/store/store.go index 1a6f612888..7d90f9c44f 100644 --- a/store/store.go +++ b/store/store.go @@ -21,15 +21,14 @@ import ( ) var ( - log = logging.Logger("share/eds") - - emptyAccessor = &eds.Rsmt2D{ExtendedDataSquare: share.EmptyEDS()} + log = logging.Logger("edsstore") ) const ( - blocksPath = "blocks" - heightsPath = blocksPath + "/heights" - + blocksPath = "blocks" + heightsPath = blocksPath + "/heights" + odsFileExt = ".ods" + q4FileExt = ".q4" defaultDirPerm = 0o755 ) @@ -51,37 +50,29 @@ type Store struct { // NewStore creates a new EDS Store under the given basepath and datastore. func NewStore(params *Parameters, basePath string) (*Store, error) { - if err := params.Validate(); err != nil { + err := params.Validate() + if err != nil { return nil, err } - // Ensure the blocks folder exists or is created. - blocksFolderPath := filepath.Join(basePath, blocksPath) - if err := ensureFolder(blocksFolderPath); err != nil { - log.Errorf("Failed to ensure the existence of the blocks folder at '%s': %s", blocksFolderPath, err) - return nil, fmt.Errorf("ensure blocks folder '%s': %w", blocksFolderPath, err) - } - - // Ensure the heights folder exists or is created. - heightsFolderPath := filepath.Join(basePath, heightsPath) - if err := ensureFolder(heightsFolderPath); err != nil { - log.Errorf("Failed to ensure the existence of the heights folder at '%s': %s", heightsFolderPath, err) - return nil, fmt.Errorf("ensure heights folder '%s': %w", heightsFolderPath, err) + // ensure the blocks dir exists + blocksDir := filepath.Join(basePath, blocksPath) + if err := mkdir(blocksDir); err != nil { + return nil, fmt.Errorf("ensuring blocks directory: %w", err) } - err := ensureEmptyFile(basePath) - if err != nil { - return nil, fmt.Errorf("creating empty file: %w", err) + // ensure the heights dir exists + heightsDir := filepath.Join(basePath, heightsPath) + if err := mkdir(heightsDir); err != nil { + return nil, fmt.Errorf("ensuring heights directory: %w", err) } - var recentCache cache.Cache - recentCache = cache.NoopCache{} + var recentCache cache.Cache = cache.NoopCache{} if params.RecentBlocksCacheSize > 0 { recentCache, err = cache.NewAccessorCache("recent", params.RecentBlocksCacheSize) if err != nil { return nil, fmt.Errorf("failed to create recent eds cache: %w", err) } - } store := &Store{ @@ -89,6 +80,11 @@ func NewStore(params *Parameters, basePath string) (*Store, error) { cache: recentCache, stripLock: newStripLock(1024), } + + if err := store.populateEmptyFile(); err != nil { + return nil, fmt.Errorf("ensuring empty EDS: %w", err) + } + return store, nil } @@ -107,7 +103,7 @@ func (s *Store) Put( if datahash.IsEmptyEDS() { lock := s.stripLock.byHeight(height) lock.Lock() - err := s.ensureHeightLink(roots.Hash(), height) + err := s.linkHeight(datahash, height) lock.Unlock() return err } @@ -145,46 +141,73 @@ func (s *Store) createFile( square *rsmt2d.ExtendedDataSquare, roots *share.AxisRoots, height uint64, -) (exists bool, err error) { - path := s.hashToPath(roots.Hash()) - f, err := file.CreateQ1Q4File(path, roots, square) +) (bool, error) { + pathODS := s.hashToPath(roots.Hash(), odsFileExt) + pathQ4 := s.hashToPath(roots.Hash(), q4FileExt) + + err := file.CreateODSQ4(pathODS, pathQ4, roots, square) if errors.Is(err, os.ErrExist) { + // TODO(@Wondertan): Should we verify that the exist file is correct? return true, nil } - - if err != nil { - return false, fmt.Errorf("creating Q1Q4 file: %w", err) - } - - err = f.Close() if err != nil { - return false, fmt.Errorf("closing created Q1Q4 file: %w", err) + return false, errors.Join( + fmt.Errorf("creating ODSQ4 file: %w", err), + // ensure we don't have partial writes + remove(pathODS), + remove(pathQ4), + ) } // create hard link with height as name - err = s.ensureHeightLink(roots.Hash(), height) + err = s.linkHeight(roots.Hash(), height) if err != nil { - // remove the file if we failed to create a hard link - removeErr := s.removeFile(roots.Hash()) - return false, fmt.Errorf("creating hard link: %w", errors.Join(err, removeErr)) + // ensure we don't have partial writes if any operation fails + return false, errors.Join( + fmt.Errorf("hardlinking height: %w", err), + remove(pathODS), + remove(pathQ4), + s.removeLink(height), + ) } return false, nil } -func (s *Store) ensureHeightLink(datahash share.DataHash, height uint64) error { - path := s.hashToPath(datahash) +func (s *Store) linkHeight(datahash share.DataHash, height uint64) error { // create hard link with height as name - linkPath := s.heightToPath(height) - err := os.Link(path, linkPath) - if err != nil && !errors.Is(err, os.ErrExist) { - return fmt.Errorf("creating hard link: %w", err) + pathOds := s.hashToPath(datahash, odsFileExt) + linktoOds := s.heightToPath(height, odsFileExt) + pathQ4 := s.hashToPath(datahash, q4FileExt) + linktoQ4 := s.heightToPath(height, q4FileExt) + return errors.Join( + link(pathOds, linktoOds), + link(pathQ4, linktoQ4), + ) +} + +// populateEmptyFile writes fresh empty EDS file on disk. +// It overrides existing empty file to ensure disk format is always consistent with the canonical +// in-mem representation. +func (s *Store) populateEmptyFile() error { + pathOds := s.hashToPath(share.EmptyEDSDataHash(), odsFileExt) + pathQ4 := s.hashToPath(share.EmptyEDSDataHash(), q4FileExt) + + err := errors.Join(remove(pathOds), remove(pathQ4)) + if err != nil { + return fmt.Errorf("cleaning old empty EDS file: %w", err) + } + + err = file.CreateODSQ4(pathOds, pathQ4, share.EmptyEDSRoots(), eds.EmptyAccessor.ExtendedDataSquare) + if err != nil { + return fmt.Errorf("creating fresh empty EDS file: %w", err) } + return nil } func (s *Store) GetByHash(ctx context.Context, datahash share.DataHash) (eds.AccessorStreamer, error) { if datahash.IsEmptyEDS() { - return emptyAccessor, nil + return eds.EmptyAccessor, nil } lock := s.stripLock.byHash(datahash) lock.RLock() @@ -197,8 +220,8 @@ func (s *Store) GetByHash(ctx context.Context, datahash share.DataHash) (eds.Acc } func (s *Store) getByHash(datahash share.DataHash) (eds.AccessorStreamer, error) { - path := s.hashToPath(datahash) - return s.openFile(path) + path := s.hashToPath(datahash, "") + return s.openODSQ4(path) } func (s *Store) GetByHeight(ctx context.Context, height uint64) (eds.AccessorStreamer, error) { @@ -217,28 +240,31 @@ func (s *Store) getByHeight(height uint64) (eds.AccessorStreamer, error) { if err == nil { return f, nil } - path := s.heightToPath(height) - return s.openFile(path) + + path := s.heightToPath(height, "") + return s.openODSQ4(path) } -func (s *Store) openFile(path string) (eds.AccessorStreamer, error) { - f, err := file.OpenQ1Q4File(path) - if err == nil { - return wrapAccessor(f), nil - } - if os.IsNotExist(err) { +// openODSQ4 opens ODSQ4 Accessor. +// It opens ODS file first, reads up its DataHash and constructs the path for Q4 +// This done as Q4 is not indexed(hardlinked) and there is no other way to Q4 by height only. +func (s *Store) openODSQ4(path string) (eds.AccessorStreamer, error) { + odsq4, err := file.OpenODSQ4(path+odsFileExt, path+q4FileExt) + switch { + case errors.Is(err, os.ErrNotExist): return nil, ErrNotFound + case err != nil: + return nil, fmt.Errorf("opening ODSQ4: %w", err) } - if errors.Is(err, file.ErrEmptyFile) { - return emptyAccessor, nil - } - return nil, fmt.Errorf("opening file: %w", err) + + return wrapAccessor(odsq4), nil } func (s *Store) HasByHash(ctx context.Context, datahash share.DataHash) (bool, error) { if datahash.IsEmptyEDS() { return true, nil } + lock := s.stripLock.byHash(datahash) lock.RLock() defer lock.RUnlock() @@ -250,8 +276,9 @@ func (s *Store) HasByHash(ctx context.Context, datahash share.DataHash) (bool, e } func (s *Store) hasByHash(datahash share.DataHash) (bool, error) { - path := s.hashToPath(datahash) - return pathExists(path) + // For now, we assume that if ODS exists, the Q4 exists as well. + path := s.hashToPath(datahash, odsFileExt) + return exists(path) } func (s *Store) HasByHeight(ctx context.Context, height uint64) (bool, error) { @@ -272,8 +299,9 @@ func (s *Store) hasByHeight(height uint64) (bool, error) { return true, nil } - path := s.heightToPath(height) - return pathExists(path) + // For now, we assume that if ODS exists, the Q4 exists as well. + pathODS := s.heightToPath(height, odsFileExt) + return exists(pathODS) } func (s *Store) Remove(ctx context.Context, height uint64, datahash share.DataHash) error { @@ -305,13 +333,12 @@ func (s *Store) removeLink(height uint64) error { return fmt.Errorf("removing from cache: %w", err) } - // remove hard link by height - heightPath := s.heightToPath(height) - err := os.Remove(heightPath) - if err != nil && !errors.Is(err, os.ErrNotExist) { - return err - } - return nil + pathODS := s.heightToPath(height, odsFileExt) + pathQ4 := s.heightToPath(height, q4FileExt) + return errors.Join( + remove(pathODS), + remove(pathQ4), + ) } func (s *Store) removeFile(hash share.DataHash) error { @@ -320,20 +347,20 @@ func (s *Store) removeFile(hash share.DataHash) error { return nil } - hashPath := s.hashToPath(hash) - err := os.Remove(hashPath) - if err != nil && !errors.Is(err, os.ErrNotExist) { - return err - } - return nil + pathODS := s.hashToPath(hash, odsFileExt) + pathQ4 := s.hashToPath(hash, q4FileExt) + return errors.Join( + remove(pathODS), + remove(pathQ4), + ) } -func (s *Store) hashToPath(datahash share.DataHash) string { - return filepath.Join(s.basepath, blocksPath, datahash.String()) +func (s *Store) hashToPath(datahash share.DataHash, ext string) string { + return filepath.Join(s.basepath, blocksPath, datahash.String()) + ext } -func (s *Store) heightToPath(height uint64) string { - return filepath.Join(s.basepath, heightsPath, strconv.Itoa(int(height))) +func (s *Store) heightToPath(height uint64, ext string) string { + return filepath.Join(s.basepath, heightsPath, strconv.Itoa(int(height))) + ext } func accessorLoader(accessor eds.AccessorStreamer) cache.OpenAccessorFn { @@ -350,51 +377,39 @@ func wrapAccessor(accessor eds.AccessorStreamer) eds.AccessorStreamer { return accessorStreamer } -func ensureFolder(path string) error { - info, err := os.Stat(path) - if errors.Is(err, os.ErrNotExist) { - err = os.Mkdir(path, defaultDirPerm) - if err != nil { - return fmt.Errorf("creating blocks dir: %w", err) - } - return nil - } - if err != nil { - return fmt.Errorf("checking dir: %w", err) +func mkdir(path string) error { + err := os.Mkdir(path, defaultDirPerm) + if err != nil && !errors.Is(err, os.ErrExist) { + return fmt.Errorf("making directory '%s': %w", path, err) } - if !info.IsDir() { - return errors.New("expected dir, got a file") + + return nil +} + +func link(filepath, linkpath string) error { + err := os.Link(filepath, linkpath) + if err != nil && !errors.Is(err, os.ErrExist) { + return fmt.Errorf("creating hardlink (%s -> %s): %w", filepath, linkpath, err) } return nil } -func pathExists(path string) (bool, error) { +func exists(path string) (bool, error) { _, err := os.Stat(path) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return false, nil - } - return false, err + switch { + case err == nil: + return true, nil + case errors.Is(err, os.ErrNotExist): + return false, nil + default: + return false, fmt.Errorf("checking file existence '%s': %w", path, err) } - return true, nil } -func ensureEmptyFile(basepath string) error { - emptyFile := share.DataHash(share.EmptyEDSRoots().Hash()).String() - path := filepath.Join(basepath, blocksPath, emptyFile) - ok, err := pathExists(path) - if err != nil { - return fmt.Errorf("checking empty file path: %w", err) - } - if ok { - return nil - } - f, err := os.Create(path) - if err != nil { - return fmt.Errorf("creating empty eds file: %w", err) - } - if err = f.Close(); err != nil { - return fmt.Errorf("closing empty eds file: %w", err) +func remove(path string) error { + err := os.Remove(path) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("removing file '%s': %w", path, err) } return nil } diff --git a/store/store_cache.go b/store/store_cache.go index 864d59a47f..4559c91d45 100644 --- a/store/store_cache.go +++ b/store/store_cache.go @@ -45,10 +45,10 @@ func (cs *CachedStore) GetByHeight(ctx context.Context, height uint64) (eds.Acce } func (cs *CachedStore) openFile(height uint64) cache.OpenAccessorFn { - return func(ctx context.Context) (eds.AccessorStreamer, error) { + return func(context.Context) (eds.AccessorStreamer, error) { // open file directly wihout calling GetByHeight of inner getter to // avoid hitting store cache second time - path := cs.store.heightToPath(height) - return cs.store.openFile(path) + path := cs.store.heightToPath(height, "") + return cs.store.openODSQ4(path) } } diff --git a/store/store_test.go b/store/store_test.go index cd1bcf88f4..4694b3da74 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -19,7 +19,8 @@ func TestEDSStore(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) - edsStore, err := NewStore(paramsNoCache(), t.TempDir()) + dir := t.TempDir() + edsStore, err := NewStore(paramsNoCache(), dir) require.NoError(t, err) // disable cache @@ -27,7 +28,6 @@ func TestEDSStore(t *testing.T) { height := atomic.Uint64{} height.Store(100) - // PutRegistersShard tests if Put registers the shard on the underlying DAGStore t.Run("Put", func(t *testing.T) { eds, roots := randomEDS(t) height := height.Add(1) @@ -245,6 +245,15 @@ func TestEDSStore(t *testing.T) { require.NoError(t, f.Close()) } }) + + t.Run("reopen", func(t *testing.T) { + // tests that store can be reopened + err = edsStore.Close() + require.NoError(t, err) + + edsStore, err = NewStore(paramsNoCache(), dir) + require.NoError(t, err) + }) } func BenchmarkStore(b *testing.B) { @@ -255,7 +264,7 @@ func BenchmarkStore(b *testing.B) { roots, err := share.NewAxisRoots(eds) require.NoError(b, err) - // BenchmarkStore/bench_put_128-10 27 19209780 ns/op (~19ms) + // BenchmarkStore/put_128-16 186 6623266 ns/op b.Run("put 128", func(b *testing.B) { edsStore, err := NewStore(paramsNoCache(), b.TempDir()) require.NoError(b, err) @@ -268,7 +277,7 @@ func BenchmarkStore(b *testing.B) { }) // read 128 EDSs does not read full EDS, but only the header - // BenchmarkStore/bench_read_128-10 82766 14678 ns/op (~14mcs) + // BenchmarkStore/open_by_height,_128-16 1585693 747.6 ns/op (~7mcs) b.Run("open by height, 128", func(b *testing.B) { edsStore, err := NewStore(paramsNoCache(), b.TempDir()) require.NoError(b, err) @@ -285,7 +294,7 @@ func BenchmarkStore(b *testing.B) { } }) - // BenchmarkStore/open_by_hash,_128-10 72921 16799 ns/op (~16mcs) + // BenchmarkStore/open_by_hash,_128-16 1240942 945.9 ns/op (~9mcs) b.Run("open by hash, 128", func(b *testing.B) { edsStore, err := NewStore(DefaultParameters(), b.TempDir()) require.NoError(b, err)