From 206309dde9a9a0ac850b2532ef2508f2f1a672de Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Sat, 2 Sep 2023 20:14:53 +1000 Subject: [PATCH 1/3] feat: add DeferredCarWriter extracted from Lassie --- v2/storage/deferred/deferredcarwriter.go | 177 ++++++++++++++ v2/storage/deferred/deferredcarwriter_test.go | 231 ++++++++++++++++++ 2 files changed, 408 insertions(+) create mode 100644 v2/storage/deferred/deferredcarwriter.go create mode 100644 v2/storage/deferred/deferredcarwriter_test.go diff --git a/v2/storage/deferred/deferredcarwriter.go b/v2/storage/deferred/deferredcarwriter.go new file mode 100644 index 00000000..dfcd9527 --- /dev/null +++ b/v2/storage/deferred/deferredcarwriter.go @@ -0,0 +1,177 @@ +package deferred + +import ( + "context" + "io" + "os" + "sync" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + carstorage "github.com/ipld/go-car/v2/storage" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/linking" + ipldstorage "github.com/ipld/go-ipld-prime/storage" +) + +type putCb struct { + cb func(int) + once bool +} + +var _ ipldstorage.WritableStorage = (*DeferredCarWriter)(nil) +var _ io.Closer = (*DeferredCarWriter)(nil) + +// DeferredCarWriter creates a write-only CAR either to an existing stream or +// to a file designated by a supplied path. CAR content (including header) +// only begins when the first Put() operation is performed. If the output is a +// file, it will be created when the first Put() operation is performed. +// DeferredCarWriter is threadsafe, and can be used concurrently. +// Closing the writer will close, but not delete, the underlying file. +// +// This utility is useful for cases where a CAR will be streamed but an error +// may occur before any content is written. In this case, the CAR file will not +// be created, and the output stream will not be written to. In the case of an +// HTTP server, this means that the client will not receive a CAR header only, +// instead there will be an opportunity to return a proper HTTP error to the +// client. +// +// The OnPut listener can be used to either track each Put() operation, or to +// just track the first Put() operation, which can be useful for setting +// HTTP headers in the assumption that the beginning of a valid CAR is about to +// be streamed. +type DeferredCarWriter struct { + root cid.Cid + outPath string + outStream io.Writer + + lk sync.Mutex + f *os.File + w carstorage.WritableCar + putCb []putCb + opts []carv2.Option +} + +// NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a +// file designated by the supplied path. The file will only be created on the +// first Put() operation. +// +// No options are supplied to carstorage.NewWritable by default, add +// the car.WriteAsCarV1(true) option to write a CARv1 file. +func NewDeferredCarWriterForPath(root cid.Cid, outPath string, opts ...carv2.Option) *DeferredCarWriter { + return &DeferredCarWriter{root: root, outPath: outPath, opts: opts} +} + +// NewDeferredCarWriterForStream creates a DeferredCarWriter that will write to +// the supplied stream. The stream will only be written to on the first Put() +// operation. +// +// The car.WriteAsCarV1(true) option will be supplied by default to +// carstorage.NewWritable as CARv2 is not a valid streaming format due to the +// header. +func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer, opts ...carv2.Option) *DeferredCarWriter { + opts = append([]carv2.Option{carv2.WriteAsCarV1(true)}, opts...) + return &DeferredCarWriter{root: root, outStream: outStream, opts: opts} +} + +// OnPut will call a callback when each Put() operation is started. The argument +// to the callback is the number of bytes being written. If once is true, the +// callback will be removed after the first call. +func (dcw *DeferredCarWriter) OnPut(cb func(int), once bool) { + if dcw.putCb == nil { + dcw.putCb = make([]putCb, 0) + } + dcw.putCb = append(dcw.putCb, putCb{cb: cb, once: once}) +} + +// Has returns false if the key was not already written to the CAR output. +func (dcw *DeferredCarWriter) Has(ctx context.Context, key string) (bool, error) { + dcw.lk.Lock() + defer dcw.lk.Unlock() + + if dcw.w == nil { // shortcut, haven't written anything, don't even initialise + return false, nil + } + + writer, err := dcw.writer() + if err != nil { + return false, err + } + + return writer.Has(ctx, key) +} + +// Put writes the given content to the CAR output stream, creating it if it +// doesn't exist yet. +func (dcw *DeferredCarWriter) Put(ctx context.Context, key string, content []byte) error { + dcw.lk.Lock() + defer dcw.lk.Unlock() + + if dcw.putCb != nil { + // call all callbacks, remove those that were only needed once + for i := 0; i < len(dcw.putCb); i++ { + cb := dcw.putCb[i] + cb.cb(len(content)) + if cb.once { + dcw.putCb = append(dcw.putCb[:i], dcw.putCb[i+1:]...) + i-- + } + } + } + + // first Put() call, initialise writer, which will write a CAR header + writer, err := dcw.writer() + if err != nil { + return err + } + + return writer.Put(ctx, key, content) +} + +// writer() +func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { + if dcw.w == nil { + outStream := dcw.outStream + if outStream == nil { + openedFile, err := os.OpenFile(dcw.outPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + dcw.f = openedFile + outStream = openedFile + } + w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, dcw.opts...) + if err != nil { + return nil, err + } + dcw.w = w + } + return dcw.w, nil +} + +// Close closes the underlying file, if one was created. +func (dcw *DeferredCarWriter) Close() error { + dcw.lk.Lock() + defer dcw.lk.Unlock() + + err := dcw.w.Finalize() + + if dcw.f != nil { + defer func() { dcw.f = nil }() + err2 := dcw.f.Close() + if err == nil { + err = err2 + } + } + return err +} + +// BlockWriteOpener returns a BlockWriteOpener that operates on this storage. +func (dcw *DeferredCarWriter) BlockWriteOpener() linking.BlockWriteOpener { + return func(lctx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { + wr, wrcommit, err := ipldstorage.PutStream(lctx.Ctx, dcw) + return wr, func(lnk ipld.Link) error { + return wrcommit(lnk.Binary()) + }, err + } +} diff --git a/v2/storage/deferred/deferredcarwriter_test.go b/v2/storage/deferred/deferredcarwriter_test.go new file mode 100644 index 00000000..790cde45 --- /dev/null +++ b/v2/storage/deferred/deferredcarwriter_test.go @@ -0,0 +1,231 @@ +package deferred_test + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "os" + "sync" + "testing" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + deferred "github.com/ipld/go-car/v2/storage/deferred" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +var rng = rand.New(rand.NewSource(3333)) +var rngLk sync.Mutex + +func TestDeferredCarWriterForPath(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + + for version := 1; version <= 2; version++ { + t.Run(fmt.Sprintf("version=%d", version), func(t *testing.T) { + tmpFile := t.TempDir() + "/test.car" + + opts := []carv2.Option{} + if version == 1 { + opts = append(opts, carv2.WriteAsCarV1(true)) + } + cw := deferred.NewDeferredCarWriterForPath(testCid1, tmpFile, opts...) + + _, err := os.Stat(tmpFile) + req.True(os.IsNotExist(err)) + + req.NoError(cw.Put(ctx, testCid1.KeyString(), testData1)) + req.NoError(cw.Put(ctx, testCid2.KeyString(), testData2)) + + stat, err := os.Stat(tmpFile) + req.NoError(err) + req.True(stat.Size() > int64(len(testData1)+len(testData2))) + + req.NoError(cw.Close()) + + // shouldn't be deleted + _, err = os.Stat(tmpFile) + req.NoError(err) + + r, err := os.Open(tmpFile) + req.NoError(err) + t.Cleanup(func() { r.Close() }) + carv2, err := carv2.NewBlockReader(r) + req.NoError(err) + + // compare CAR contents to what we wrote + req.Equal([]cid.Cid{testCid1}, carv2.Roots) + req.Equal(uint64(version), carv2.Version) + + blk, err := carv2.Next() + req.NoError(err) + req.Equal(testCid1, blk.Cid()) + req.Equal(testData1, blk.RawData()) + + blk, err = carv2.Next() + req.NoError(err) + req.Equal(testCid2, blk.Cid()) + req.Equal(testData2, blk.RawData()) + + _, err = carv2.Next() + req.ErrorIs(io.EOF, err) + }) + } +} + +func TestDeferredCarWriter(t *testing.T) { + for _, tc := range []string{"path", "stream"} { + tc := tc + t.Run(tc, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + testCid3, _ := randBlock() + + var cw *deferred.DeferredCarWriter + var buf bytes.Buffer + tmpFile := t.TempDir() + "/test.car" + + if tc == "path" { + cw = deferred.NewDeferredCarWriterForPath(testCid1, tmpFile, carv2.WriteAsCarV1(true)) + _, err := os.Stat(tmpFile) + require.True(t, os.IsNotExist(err)) + } else { + cw = deferred.NewDeferredCarWriterForStream(testCid1, &buf) + require.Equal(t, buf.Len(), 0) + } + + has, err := cw.Has(ctx, testCid3.KeyString()) + require.NoError(t, err) + require.False(t, has) + + require.NoError(t, cw.Put(ctx, testCid1.KeyString(), testData1)) + has, err = cw.Has(ctx, testCid1.KeyString()) + require.NoError(t, err) + require.True(t, has) + require.NoError(t, cw.Put(ctx, testCid2.KeyString(), testData2)) + has, err = cw.Has(ctx, testCid1.KeyString()) + require.NoError(t, err) + require.True(t, has) + has, err = cw.Has(ctx, testCid2.KeyString()) + require.NoError(t, err) + require.True(t, has) + has, err = cw.Has(ctx, testCid3.KeyString()) + require.NoError(t, err) + require.False(t, has) + + if tc == "path" { + stat, err := os.Stat(tmpFile) + require.NoError(t, err) + require.True(t, stat.Size() > int64(len(testData1)+len(testData2))) + } else { + require.True(t, buf.Len() > len(testData1)+len(testData2)) + } + + require.NoError(t, cw.Close()) + + var rdr *carv2.BlockReader + if tc == "path" { + r, err := os.Open(tmpFile) + require.NoError(t, err) + rdr, err = carv2.NewBlockReader(r) + require.NoError(t, err) + t.Cleanup(func() { r.Close() }) + } else { + rdr, err = carv2.NewBlockReader(&buf) + require.NoError(t, err) + } + + // compare CAR contents to what we wrote + require.Equal(t, rdr.Roots, []cid.Cid{testCid1}) + require.Equal(t, rdr.Version, uint64(1)) + + blk, err := rdr.Next() + require.NoError(t, err) + require.Equal(t, blk.Cid(), testCid1) + require.Equal(t, blk.RawData(), testData1) + + blk, err = rdr.Next() + require.NoError(t, err) + require.Equal(t, blk.Cid(), testCid2) + require.Equal(t, blk.RawData(), testData2) + + _, err = rdr.Next() + require.ErrorIs(t, err, io.EOF) + }) + } +} + +func TestDeferredCarWriterPutCb(t *testing.T) { + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + + var buf bytes.Buffer + cw := deferred.NewDeferredCarWriterForStream(testCid1, &buf) + + var pc1 int + cw.OnPut(func(ii int) { + switch pc1 { + case 0: + require.Equal(t, buf.Len(), 0) // called before first write + require.Equal(t, len(testData1), ii) + case 1: + require.Equal(t, len(testData2), ii) + default: + require.Fail(t, "unexpected put callback") + } + pc1++ + }, false) + var pc2 int + cw.OnPut(func(ii int) { + switch pc2 { + case 0: + require.Equal(t, buf.Len(), 0) // called before first write + require.Equal(t, len(testData1), ii) + case 1: + require.Equal(t, len(testData2), ii) + default: + require.Fail(t, "unexpected put callback") + } + pc2++ + }, false) + var pc3 int + cw.OnPut(func(ii int) { + switch pc3 { + case 0: + require.Equal(t, buf.Len(), 0) // called before first write + require.Equal(t, len(testData1), ii) + default: + require.Fail(t, "unexpected put callback") + } + pc3++ + }, true) + + require.NoError(t, cw.Put(ctx, testCid1.KeyString(), testData1)) + require.NoError(t, cw.Put(ctx, testCid2.KeyString(), testData2)) + require.NoError(t, cw.Close()) + + require.Equal(t, 2, pc1) + require.Equal(t, 2, pc2) + require.Equal(t, 1, pc3) +} + +func randBlock() (cid.Cid, []byte) { + data := make([]byte, 1024) + rngLk.Lock() + rng.Read(data) + rngLk.Unlock() + h, err := mh.Sum(data, mh.SHA2_512, -1) + if err != nil { + panic(err) + } + return cid.NewCidV1(cid.Raw, h), data +} From b12674b3b055dd3c25daedda3753652f550b7e7d Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Sat, 2 Sep 2023 20:35:37 +1000 Subject: [PATCH 2/3] fix: switch constructor args to match storage.New*, make roots plural --- v2/storage/deferred/deferredcarwriter.go | 12 ++++++------ v2/storage/deferred/deferredcarwriter_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/v2/storage/deferred/deferredcarwriter.go b/v2/storage/deferred/deferredcarwriter.go index dfcd9527..d6d5f107 100644 --- a/v2/storage/deferred/deferredcarwriter.go +++ b/v2/storage/deferred/deferredcarwriter.go @@ -41,7 +41,7 @@ var _ io.Closer = (*DeferredCarWriter)(nil) // HTTP headers in the assumption that the beginning of a valid CAR is about to // be streamed. type DeferredCarWriter struct { - root cid.Cid + roots []cid.Cid outPath string outStream io.Writer @@ -58,8 +58,8 @@ type DeferredCarWriter struct { // // No options are supplied to carstorage.NewWritable by default, add // the car.WriteAsCarV1(true) option to write a CARv1 file. -func NewDeferredCarWriterForPath(root cid.Cid, outPath string, opts ...carv2.Option) *DeferredCarWriter { - return &DeferredCarWriter{root: root, outPath: outPath, opts: opts} +func NewDeferredCarWriterForPath(outPath string, roots []cid.Cid, opts ...carv2.Option) *DeferredCarWriter { + return &DeferredCarWriter{roots: roots, outPath: outPath, opts: opts} } // NewDeferredCarWriterForStream creates a DeferredCarWriter that will write to @@ -69,9 +69,9 @@ func NewDeferredCarWriterForPath(root cid.Cid, outPath string, opts ...carv2.Opt // The car.WriteAsCarV1(true) option will be supplied by default to // carstorage.NewWritable as CARv2 is not a valid streaming format due to the // header. -func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer, opts ...carv2.Option) *DeferredCarWriter { +func NewDeferredCarWriterForStream(outStream io.Writer, roots []cid.Cid, opts ...carv2.Option) *DeferredCarWriter { opts = append([]carv2.Option{carv2.WriteAsCarV1(true)}, opts...) - return &DeferredCarWriter{root: root, outStream: outStream, opts: opts} + return &DeferredCarWriter{roots: roots, outStream: outStream, opts: opts} } // OnPut will call a callback when each Put() operation is started. The argument @@ -140,7 +140,7 @@ func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { dcw.f = openedFile outStream = openedFile } - w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, dcw.opts...) + w, err := carstorage.NewWritable(outStream, dcw.roots, dcw.opts...) if err != nil { return nil, err } diff --git a/v2/storage/deferred/deferredcarwriter_test.go b/v2/storage/deferred/deferredcarwriter_test.go index 790cde45..2a5994f2 100644 --- a/v2/storage/deferred/deferredcarwriter_test.go +++ b/v2/storage/deferred/deferredcarwriter_test.go @@ -35,7 +35,7 @@ func TestDeferredCarWriterForPath(t *testing.T) { if version == 1 { opts = append(opts, carv2.WriteAsCarV1(true)) } - cw := deferred.NewDeferredCarWriterForPath(testCid1, tmpFile, opts...) + cw := deferred.NewDeferredCarWriterForPath(tmpFile, []cid.Cid{testCid1}, opts...) _, err := os.Stat(tmpFile) req.True(os.IsNotExist(err)) @@ -94,11 +94,11 @@ func TestDeferredCarWriter(t *testing.T) { tmpFile := t.TempDir() + "/test.car" if tc == "path" { - cw = deferred.NewDeferredCarWriterForPath(testCid1, tmpFile, carv2.WriteAsCarV1(true)) + cw = deferred.NewDeferredCarWriterForPath(tmpFile, []cid.Cid{testCid1}, carv2.WriteAsCarV1(true)) _, err := os.Stat(tmpFile) require.True(t, os.IsNotExist(err)) } else { - cw = deferred.NewDeferredCarWriterForStream(testCid1, &buf) + cw = deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1}) require.Equal(t, buf.Len(), 0) } @@ -169,7 +169,7 @@ func TestDeferredCarWriterPutCb(t *testing.T) { testCid2, testData2 := randBlock() var buf bytes.Buffer - cw := deferred.NewDeferredCarWriterForStream(testCid1, &buf) + cw := deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1}) var pc1 int cw.OnPut(func(ii int) { From 0621cff7a75831b5cfc98bedd34f9a93586ebd70 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 4 Sep 2023 10:43:37 +1000 Subject: [PATCH 3/3] fix: add closed check, expose storage.ErrClosed --- v2/storage/deferred/deferredcarwriter.go | 35 +++++++++++++++---- v2/storage/deferred/deferredcarwriter_test.go | 33 +++++++++++++++++ v2/storage/storage.go | 8 ++--- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/v2/storage/deferred/deferredcarwriter.go b/v2/storage/deferred/deferredcarwriter.go index d6d5f107..7d5d3c10 100644 --- a/v2/storage/deferred/deferredcarwriter.go +++ b/v2/storage/deferred/deferredcarwriter.go @@ -29,6 +29,10 @@ var _ io.Closer = (*DeferredCarWriter)(nil) // DeferredCarWriter is threadsafe, and can be used concurrently. // Closing the writer will close, but not delete, the underlying file. // +// DeferredCarWriter only implements the storage.WritableStorage interface and +// is not intended as a general purpose storage implementation. It only supports +// storage Put() and Get() operations. +// // This utility is useful for cases where a CAR will be streamed but an error // may occur before any content is written. In this case, the CAR file will not // be created, and the output stream will not be written to. In the case of an @@ -45,11 +49,12 @@ type DeferredCarWriter struct { outPath string outStream io.Writer - lk sync.Mutex - f *os.File - w carstorage.WritableCar - putCb []putCb - opts []carv2.Option + lk sync.Mutex + f *os.File + closed bool + w carstorage.WritableCar + putCb []putCb + opts []carv2.Option } // NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a @@ -89,6 +94,10 @@ func (dcw *DeferredCarWriter) Has(ctx context.Context, key string) (bool, error) dcw.lk.Lock() defer dcw.lk.Unlock() + if dcw.closed { + return false, carstorage.ErrClosed + } + if dcw.w == nil { // shortcut, haven't written anything, don't even initialise return false, nil } @@ -107,6 +116,10 @@ func (dcw *DeferredCarWriter) Put(ctx context.Context, key string, content []byt dcw.lk.Lock() defer dcw.lk.Unlock() + if dcw.closed { + return carstorage.ErrClosed + } + if dcw.putCb != nil { // call all callbacks, remove those that were only needed once for i := 0; i < len(dcw.putCb); i++ { @@ -150,11 +163,18 @@ func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { } // Close closes the underlying file, if one was created. -func (dcw *DeferredCarWriter) Close() error { +func (dcw *DeferredCarWriter) Close() (err error) { dcw.lk.Lock() defer dcw.lk.Unlock() - err := dcw.w.Finalize() + if dcw.closed { + return carstorage.ErrClosed + } + dcw.closed = true + + if dcw.w != nil { + err = dcw.w.Finalize() + } if dcw.f != nil { defer func() { dcw.f = nil }() @@ -163,6 +183,7 @@ func (dcw *DeferredCarWriter) Close() error { err = err2 } } + return err } diff --git a/v2/storage/deferred/deferredcarwriter_test.go b/v2/storage/deferred/deferredcarwriter_test.go index 2a5994f2..e2976d62 100644 --- a/v2/storage/deferred/deferredcarwriter_test.go +++ b/v2/storage/deferred/deferredcarwriter_test.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-cid" carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage" deferred "github.com/ipld/go-car/v2/storage/deferred" mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" @@ -218,6 +219,38 @@ func TestDeferredCarWriterPutCb(t *testing.T) { require.Equal(t, 1, pc3) } +func TestDeferredCarWriterWriteAfterClose(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + + var buf bytes.Buffer + cw := deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1}) + // no writes + req.NoError(cw.Close()) + + req.ErrorIs(cw.Put(ctx, testCid1.KeyString(), testData1), storage.ErrClosed) + _, err := cw.Has(ctx, testCid1.KeyString()) + req.ErrorIs(err, storage.ErrClosed) + req.ErrorIs(cw.Close(), storage.ErrClosed) + + // with writes + + buf = bytes.Buffer{} + cw = deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1}) + + req.NoError(cw.Put(ctx, testCid1.KeyString(), testData1)) + req.NoError(cw.Put(ctx, testCid2.KeyString(), testData2)) + req.NoError(cw.Close()) + + req.ErrorIs(cw.Put(ctx, testCid1.KeyString(), testData1), storage.ErrClosed) + _, err = cw.Has(ctx, testCid1.KeyString()) + req.ErrorIs(err, storage.ErrClosed) + req.ErrorIs(cw.Close(), storage.ErrClosed) +} + func randBlock() (cid.Cid, []byte) { data := make([]byte, 1024) rngLk.Lock() diff --git a/v2/storage/storage.go b/v2/storage/storage.go index b98e70d8..a912d7e1 100644 --- a/v2/storage/storage.go +++ b/v2/storage/storage.go @@ -18,7 +18,7 @@ import ( ipldstorage "github.com/ipld/go-ipld-prime/storage" ) -var errClosed = errors.New("cannot use a CARv2 storage after closing") +var ErrClosed = errors.New("cannot use a CAR storage after closing") type ReaderAtWriterAt interface { io.ReaderAt @@ -314,7 +314,7 @@ func (sc *StorageCar) Put(ctx context.Context, keyStr string, data []byte) error defer sc.mu.Unlock() if sc.closed { - return errClosed + return ErrClosed } idx, ok := sc.idx.(*index.InsertionIndex) @@ -361,7 +361,7 @@ func (sc *StorageCar) Has(ctx context.Context, keyStr string) (bool, error) { defer sc.mu.RUnlock() if sc.closed { - return false, errClosed + return false, ErrClosed } if idx, ok := sc.idx.(*index.InsertionIndex); ok && sc.writer != nil { @@ -443,7 +443,7 @@ func (sc *StorageCar) GetStream(ctx context.Context, keyStr string) (io.ReadClos defer sc.mu.RUnlock() if sc.closed { - return nil, errClosed + return nil, ErrClosed } _, offset, size, err := store.FindCid(