Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadWrite: add an alternative FinalizeReadOnly+Close flow #376

Merged
merged 2 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 63 additions & 14 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (

var _ blockstore.Blockstore = (*ReadWrite)(nil)

var (
errFinalized = fmt.Errorf("cannot write in a carv2 blockstore after finalize")
)

// ReadWrite implements a blockstore that stores blocks in CARv2 format.
// Blocks put into the blockstore can be read back once they are successfully written.
// This implementation is preferable for a write-heavy workload.
Expand All @@ -35,6 +39,8 @@ type ReadWrite struct {
idx *store.InsertionIndex
header carv2.Header

finalized bool // also protected by ronly.mu

opts carv2.Options
}

Expand Down Expand Up @@ -109,10 +115,11 @@ func OpenReadWriteFile(f *os.File, roots []cid.Cid, opts ...carv2.Option) (*Read
// Instantiate block store.
// Set the header fileld before applying options since padding options may modify header.
rwbs := &ReadWrite{
f: f,
idx: store.NewInsertionIndex(),
header: carv2.NewHeader(0),
opts: carv2.ApplyOptions(opts...),
f: f,
idx: store.NewInsertionIndex(),
header: carv2.NewHeader(0),
opts: carv2.ApplyOptions(opts...),
finalized: false,
}
rwbs.ronly.opts = rwbs.opts

Expand Down Expand Up @@ -186,6 +193,9 @@ func (b *ReadWrite) PutMany(ctx context.Context, blks []blocks.Block) error {
if b.ronly.closed {
return errClosed
}
if b.finalized {
return errFinalized
}

for _, bl := range blks {
c := bl.Cid()
Expand Down Expand Up @@ -227,30 +237,69 @@ func (b *ReadWrite) Discard() {

// Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index
// for more efficient subsequent read.
// This is the equivalent to calling FinalizeReadOnly and Close.
// After this call, the blockstore can no longer be used.
func (b *ReadWrite) Finalize() error {
b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

for _, err := range []error{b.finalizeReadOnlyWithoutMutex(), b.closeWithoutMutex()} {
if err != nil {
return err
}
}
return nil
}

// Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index
// for more efficient subsequent read, but keep it open read-only.
// This call should be complemented later by a call to Close.
func (b *ReadWrite) FinalizeReadOnly() error {
b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

return b.finalizeReadOnlyWithoutMutex()
}

func (b *ReadWrite) finalizeReadOnlyWithoutMutex() error {
if b.opts.WriteAsCarV1 {
// all blocks are already properly written to the CARv1 inner container and there's
// no additional finalization required at the end of the file for a complete v1
b.ronly.Close()
b.finalized = true
return nil
}

b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

if b.ronly.closed {
// Allow duplicate Finalize calls, just like Close.
// Still error, just like ReadOnly.Close; it should be discarded.
return fmt.Errorf("called Finalize on a closed blockstore")
return fmt.Errorf("called Finalize or FinalizeReadOnly on a closed blockstore")
}
if b.finalized {
return fmt.Errorf("called Finalize or FinalizeReadOnly on an already finalized blockstore")
}

// Note that we can't use b.Close here, as that tries to grab the same
// mutex we're holding here.
defer b.ronly.closeWithoutMutex()
b.finalized = true

if err := store.Finalize(b.f, b.header, b.idx, uint64(b.dataWriter.Position()), b.opts.StoreIdentityCIDs, b.opts.IndexCodec); err != nil {
return err
return store.Finalize(b.f, b.header, b.idx, uint64(b.dataWriter.Position()), b.opts.StoreIdentityCIDs, b.opts.IndexCodec)
}

// Close closes the blockstore.
// After this call, the blockstore can no longer be used.
func (b *ReadWrite) Close() error {
b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

return b.closeWithoutMutex()
}

func (b *ReadWrite) closeWithoutMutex() error {
if !b.opts.WriteAsCarV1 && !b.finalized {
return fmt.Errorf("called Close without FinalizeReadOnly first")
}
if b.ronly.closed {
// Allow duplicate Close calls
// Still error, just like ReadOnly.Close; it should be discarded.
return fmt.Errorf("called Close on a closed blockstore")
}

if err := b.ronly.closeWithoutMutex(); err != nil {
Expand Down
64 changes: 61 additions & 3 deletions v2/blockstore/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,6 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) {
require.NoError(t, subject.Finalize())
require.Error(t, subject.Finalize())

_, ok := (interface{})(subject).(io.Closer)
require.False(t, ok)

_, err = subject.Get(ctx, oneTestBlockCid)
require.Error(t, err)
_, err = subject.GetSize(ctx, anotherTestBlockCid)
Expand Down Expand Up @@ -1027,3 +1024,64 @@ func TestBlockstore_IdentityCidWithEmptyDataIsIndexed(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, count)
}

func TestBlockstoreFinalizeReadOnly(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

root := blocks.NewBlock([]byte("foo"))

p := filepath.Join(t.TempDir(), "readwrite.car")
bs, err := blockstore.OpenReadWrite(p, []cid.Cid{root.Cid()})
require.NoError(t, err)

err = bs.Put(ctx, root)
require.NoError(t, err)

roots, err := bs.Roots()
require.NoError(t, err)
_, err = bs.Has(ctx, roots[0])
require.NoError(t, err)
_, err = bs.Get(ctx, roots[0])
require.NoError(t, err)
_, err = bs.GetSize(ctx, roots[0])
require.NoError(t, err)
_, err = bs.AllKeysChan(ctx)
require.NoError(t, err)

// soft finalize, we can still read, but not write
err = bs.FinalizeReadOnly()
require.NoError(t, err)

_, err = bs.Roots()
require.NoError(t, err)
_, err = bs.Has(ctx, roots[0])
require.NoError(t, err)
_, err = bs.Get(ctx, roots[0])
require.NoError(t, err)
_, err = bs.GetSize(ctx, roots[0])
require.NoError(t, err)
_, err = bs.AllKeysChan(ctx)
require.NoError(t, err)

err = bs.Put(ctx, root)
require.Error(t, err)

// final close, nothing works anymore
err = bs.Close()
require.NoError(t, err)

_, err = bs.Roots()
require.Error(t, err)
_, err = bs.Has(ctx, roots[0])
require.Error(t, err)
_, err = bs.Get(ctx, roots[0])
require.Error(t, err)
_, err = bs.GetSize(ctx, roots[0])
require.Error(t, err)
_, err = bs.AllKeysChan(ctx)
require.Error(t, err)

err = bs.Put(ctx, root)
require.Error(t, err)
}