From 9a17f9d01df7771a8a61695cf1d21a0cf6e29e1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 16 Feb 2023 16:34:22 +0100 Subject: [PATCH 1/2] ReadWrite: add an alternative FinalizeReadOnly+Close flow The goal being to be able to keep reading blocks and the index after safely finalizing the file on disk. Typically for indexing purpose. --- v2/blockstore/readwrite.go | 78 +++++++++++++++++++++++++++------ v2/blockstore/readwrite_test.go | 64 +++++++++++++++++++++++++-- 2 files changed, 125 insertions(+), 17 deletions(-) diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index 3f5f7841..dc220ec0 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -18,6 +18,10 @@ import ( var _ blockstore.Blockstore = (*ReadWrite)(nil) +var ( + errFinalized = fmt.Errorf("cannot write in a carv2 blockstore after finalize") +) + // ReadWrite implements a blockstore that stores blocks in CARv2 format. // Blocks put into the blockstore can be read back once they are successfully written. // This implementation is preferable for a write-heavy workload. @@ -35,6 +39,8 @@ type ReadWrite struct { idx *store.InsertionIndex header carv2.Header + finalized bool // also protected by ronly.mu + opts carv2.Options } @@ -109,10 +115,11 @@ func OpenReadWriteFile(f *os.File, roots []cid.Cid, opts ...carv2.Option) (*Read // Instantiate block store. // Set the header fileld before applying options since padding options may modify header. rwbs := &ReadWrite{ - f: f, - idx: store.NewInsertionIndex(), - header: carv2.NewHeader(0), - opts: carv2.ApplyOptions(opts...), + f: f, + idx: store.NewInsertionIndex(), + header: carv2.NewHeader(0), + opts: carv2.ApplyOptions(opts...), + finalized: false, } rwbs.ronly.opts = rwbs.opts @@ -186,6 +193,9 @@ func (b *ReadWrite) PutMany(ctx context.Context, blks []blocks.Block) error { if b.ronly.closed { return errClosed } + if b.finalized { + return errFinalized + } for _, bl := range blks { c := bl.Cid() @@ -227,30 +237,70 @@ func (b *ReadWrite) Discard() { // Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index // for more efficient subsequent read. +// This is the equivalent to calling FinalizeReadOnly and Close. // After this call, the blockstore can no longer be used. func (b *ReadWrite) Finalize() error { + b.ronly.mu.Lock() + defer b.ronly.mu.Unlock() + + if err := b.finalizeReadOnlyWithoutMutex(); err != nil { + return err + } + if err := b.closeWithoutMutex(); err != nil { + return err + } + return nil +} + +// Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index +// for more efficient subsequent read, but keep it open read-only. +// This call should be complemented later by a call to Close. +func (b *ReadWrite) FinalizeReadOnly() error { + b.ronly.mu.Lock() + defer b.ronly.mu.Unlock() + + return b.finalizeReadOnlyWithoutMutex() +} + +func (b *ReadWrite) finalizeReadOnlyWithoutMutex() error { if b.opts.WriteAsCarV1 { // all blocks are already properly written to the CARv1 inner container and there's // no additional finalization required at the end of the file for a complete v1 - b.ronly.Close() + b.finalized = true return nil } - b.ronly.mu.Lock() - defer b.ronly.mu.Unlock() - if b.ronly.closed { // Allow duplicate Finalize calls, just like Close. // Still error, just like ReadOnly.Close; it should be discarded. - return fmt.Errorf("called Finalize on a closed blockstore") + return fmt.Errorf("called Finalize or FinalizeReadOnly on a closed blockstore") + } + if b.finalized { + return fmt.Errorf("called Finalize or FinalizeReadOnly on an already finalized blockstore") } - // Note that we can't use b.Close here, as that tries to grab the same - // mutex we're holding here. - defer b.ronly.closeWithoutMutex() + b.finalized = true - if err := store.Finalize(b.f, b.header, b.idx, uint64(b.dataWriter.Position()), b.opts.StoreIdentityCIDs, b.opts.IndexCodec); err != nil { - return err + return store.Finalize(b.f, b.header, b.idx, uint64(b.dataWriter.Position()), b.opts.StoreIdentityCIDs, b.opts.IndexCodec) +} + +// Close closes the blockstore. +// After this call, the blockstore can no longer be used. +func (b *ReadWrite) Close() error { + b.ronly.mu.Lock() + defer b.ronly.mu.Unlock() + + return b.closeWithoutMutex() +} + +func (b *ReadWrite) closeWithoutMutex() error { + if !b.opts.WriteAsCarV1 && !b.finalized { + return fmt.Errorf("called Close without FinalizeReadOnly first") + } + if b.ronly.closed { + // Allow duplicate Close calls + // Still error, just like ReadOnly.Close; it should be discarded. + return fmt.Errorf("called Close on a closed blockstore") } if err := b.ronly.closeWithoutMutex(); err != nil { diff --git a/v2/blockstore/readwrite_test.go b/v2/blockstore/readwrite_test.go index 04da36fe..45fcb21a 100644 --- a/v2/blockstore/readwrite_test.go +++ b/v2/blockstore/readwrite_test.go @@ -577,9 +577,6 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) { require.NoError(t, subject.Finalize()) require.Error(t, subject.Finalize()) - _, ok := (interface{})(subject).(io.Closer) - require.False(t, ok) - _, err = subject.Get(ctx, oneTestBlockCid) require.Error(t, err) _, err = subject.GetSize(ctx, anotherTestBlockCid) @@ -1027,3 +1024,64 @@ func TestBlockstore_IdentityCidWithEmptyDataIsIndexed(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, count) } + +func TestBlockstoreFinalizeReadOnly(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + root := blocks.NewBlock([]byte("foo")) + + p := filepath.Join(t.TempDir(), "readwrite.car") + bs, err := blockstore.OpenReadWrite(p, []cid.Cid{root.Cid()}) + require.NoError(t, err) + + err = bs.Put(ctx, root) + require.NoError(t, err) + + roots, err := bs.Roots() + require.NoError(t, err) + _, err = bs.Has(ctx, roots[0]) + require.NoError(t, err) + _, err = bs.Get(ctx, roots[0]) + require.NoError(t, err) + _, err = bs.GetSize(ctx, roots[0]) + require.NoError(t, err) + _, err = bs.AllKeysChan(ctx) + require.NoError(t, err) + + // soft finalize, we can still read, but not write + err = bs.FinalizeReadOnly() + require.NoError(t, err) + + _, err = bs.Roots() + require.NoError(t, err) + _, err = bs.Has(ctx, roots[0]) + require.NoError(t, err) + _, err = bs.Get(ctx, roots[0]) + require.NoError(t, err) + _, err = bs.GetSize(ctx, roots[0]) + require.NoError(t, err) + _, err = bs.AllKeysChan(ctx) + require.NoError(t, err) + + err = bs.Put(ctx, root) + require.Error(t, err) + + // final close, nothing works anymore + err = bs.Close() + require.NoError(t, err) + + _, err = bs.Roots() + require.Error(t, err) + _, err = bs.Has(ctx, roots[0]) + require.Error(t, err) + _, err = bs.Get(ctx, roots[0]) + require.Error(t, err) + _, err = bs.GetSize(ctx, roots[0]) + require.Error(t, err) + _, err = bs.AllKeysChan(ctx) + require.Error(t, err) + + err = bs.Put(ctx, root) + require.Error(t, err) +} From b52aa84f0694deea25485482bc109dd1f617033a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Tue, 7 Mar 2023 12:15:23 +0100 Subject: [PATCH 2/2] blockstore: try to close during Finalize(), even in case of previous error Co-authored-by: Rod Vagg --- v2/blockstore/readwrite.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index dc220ec0..afd079d3 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -243,11 +243,10 @@ func (b *ReadWrite) Finalize() error { b.ronly.mu.Lock() defer b.ronly.mu.Unlock() - if err := b.finalizeReadOnlyWithoutMutex(); err != nil { - return err - } - if err := b.closeWithoutMutex(); err != nil { - return err + for _, err := range []error{b.finalizeReadOnlyWithoutMutex(), b.closeWithoutMutex()} { + if err != nil { + return err + } } return nil }