diff --git a/go.mod b/go.mod index bb65be4a9d..d3a5771c23 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/golang/mock v1.6.0 github.com/gorilla/mux v1.8.0 github.com/hashicorp/go-retryablehttp v0.7.4 - github.com/hashicorp/golang-lru v1.0.2 + github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/imdario/mergo v0.3.16 github.com/ipfs/boxo v0.12.0 github.com/ipfs/go-block-format v0.2.0 @@ -185,8 +185,8 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-safetemp v1.0.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect + github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect github.com/holiman/uint256 v1.2.3 // indirect diff --git a/nodebuilder/store_test.go b/nodebuilder/store_test.go index 8a39849060..8ea56a280f 100644 --- a/nodebuilder/store_test.go +++ b/nodebuilder/store_test.go @@ -154,6 +154,7 @@ func TestStoreRestart(t *testing.T) { require.NoError(t, err) _, err = eds.ReadEDS(ctx, odsReader, h) require.NoError(t, err) + require.NoError(t, edsReader.Close()) } } diff --git a/share/eds/accessor_cache.go b/share/eds/accessor_cache.go deleted file mode 100644 index 9f70178be6..0000000000 --- a/share/eds/accessor_cache.go +++ /dev/null @@ -1,178 +0,0 @@ -package eds - -import ( - "context" - "errors" - "fmt" - "reflect" - "sync" - - "github.com/filecoin-project/dagstore" - "github.com/filecoin-project/dagstore/shard" - lru "github.com/hashicorp/golang-lru" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" -) - -var ( - defaultCacheSize = 128 - errCacheMiss = errors.New("accessor not found in blockstore cache") -) - -// accessorWithBlockstore is the value that we store in the blockstore cache -type accessorWithBlockstore struct { - sa *dagstore.ShardAccessor - // blockstore is stored separately because each access to the blockstore over the shard accessor - // reopens the underlying CAR. - bs dagstore.ReadBlockstore -} - -type blockstoreCache struct { - // stripedLocks prevents simultaneous RW access to the blockstore cache for a shard. Instead - // of using only one lock or one lock per key, we stripe the shard keys across 256 locks. 256 is - // chosen because it 0-255 is the range of values we get looking at the last byte of the key. - stripedLocks [256]sync.Mutex - // caches the blockstore for a given shard for shard read affinity i.e. - // further reads will likely be from the same shard. Maps (shard key -> blockstore). - cache *lru.Cache - - metrics *cacheMetrics -} - -func newBlockstoreCache(cacheSize int) (*blockstoreCache, error) { - bc := &blockstoreCache{} - // instantiate the blockstore cache - bslru, err := lru.NewWithEvict(cacheSize, bc.evictFn()) - if err != nil { - return nil, fmt.Errorf("failed to instantiate blockstore cache: %w", err) - } - bc.cache = bslru - return bc, nil -} - -func (bc *blockstoreCache) evictFn() func(_ interface{}, val interface{}) { - return func(_ interface{}, val interface{}) { - // ensure we close the blockstore for a shard when it's evicted so dagstore can gc it. - abs, ok := val.(*accessorWithBlockstore) - if !ok { - panic(fmt.Sprintf( - "casting value from cache to accessorWithBlockstore: %s", - reflect.TypeOf(val), - )) - } - - err := abs.sa.Close() - if err != nil { - log.Errorf("couldn't close accessor after cache eviction: %s", err) - } - bc.metrics.observeEvicted(err != nil) - } -} - -func (bc *blockstoreCache) Remove(key shard.Key) bool { - lk := &bc.stripedLocks[shardKeyToStriped(key)] - lk.Lock() - defer lk.Unlock() - - return bc.cache.Remove(key) -} - -// Get retrieves the blockstore for a given shard key from the cache. If the blockstore is not in -// the cache, it returns an errCacheMiss -func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlockstore, error) { - lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)] - lk.Lock() - defer lk.Unlock() - - return bc.unsafeGet(shardContainingCid) -} - -func (bc *blockstoreCache) unsafeGet(shardContainingCid shard.Key) (*accessorWithBlockstore, error) { - // We've already ensured that the given shard has the cid/multihash we are looking for. - val, ok := bc.cache.Get(shardContainingCid) - if !ok { - return nil, errCacheMiss - } - - accessor, ok := val.(*accessorWithBlockstore) - if !ok { - panic(fmt.Sprintf( - "casting value from cache to accessorWithBlockstore: %s", - reflect.TypeOf(val), - )) - } - return accessor, nil -} - -// Add adds a blockstore for a given shard key to the cache. -func (bc *blockstoreCache) Add( - shardContainingCid shard.Key, - accessor *dagstore.ShardAccessor, -) (*accessorWithBlockstore, error) { - lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)] - lk.Lock() - defer lk.Unlock() - - return bc.unsafeAdd(shardContainingCid, accessor) -} - -func (bc *blockstoreCache) unsafeAdd( - shardContainingCid shard.Key, - accessor *dagstore.ShardAccessor, -) (*accessorWithBlockstore, error) { - blockStore, err := accessor.Blockstore() - if err != nil { - return nil, fmt.Errorf("failed to get blockstore from accessor: %w", err) - } - - newAccessor := &accessorWithBlockstore{ - bs: blockStore, - sa: accessor, - } - bc.cache.Add(shardContainingCid, newAccessor) - return newAccessor, nil -} - -// shardKeyToStriped returns the index of the lock to use for a given shard key. We use the last -// byte of the shard key as the pseudo-random index. -func shardKeyToStriped(sk shard.Key) byte { - return sk.String()[len(sk.String())-1] -} - -type cacheMetrics struct { - evictedCounter metric.Int64Counter -} - -func (bc *blockstoreCache) withMetrics() error { - evictedCounter, err := meter.Int64Counter("eds_blockstore_cache_evicted_counter", - metric.WithDescription("eds blockstore cache evicted event counter")) - if err != nil { - return err - } - - cacheSize, err := meter.Int64ObservableGauge("eds_blockstore_cache_size", - metric.WithDescription("total amount of items in blockstore cache"), - ) - if err != nil { - return err - } - - callback := func(ctx context.Context, observer metric.Observer) error { - observer.ObserveInt64(cacheSize, int64(bc.cache.Len())) - return nil - } - _, err = meter.RegisterCallback(callback, cacheSize) - if err != nil { - return err - } - bc.metrics = &cacheMetrics{evictedCounter: evictedCounter} - return nil -} - -func (m *cacheMetrics) observeEvicted(failed bool) { - if m == nil { - return - } - m.evictedCounter.Add(context.Background(), 1, metric.WithAttributes( - attribute.Bool(failedKey, failed))) -} diff --git a/share/eds/blockstore.go b/share/eds/blockstore.go index 2abe4219f1..349d6f58ba 100644 --- a/share/eds/blockstore.go +++ b/share/eds/blockstore.go @@ -5,14 +5,14 @@ import ( "errors" "fmt" - "github.com/filecoin-project/dagstore" bstore "github.com/ipfs/boxo/blockstore" dshelp "github.com/ipfs/boxo/datastore/dshelp" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" ipld "github.com/ipfs/go-ipld-format" + + "github.com/celestiaorg/celestia-node/share/eds/cache" ) var _ bstore.Blockstore = (*blockstore)(nil) @@ -32,18 +32,9 @@ var ( // implementation to allow for the blockstore operations to be routed to the underlying stores. type blockstore struct { store *Store - cache *blockstoreCache ds datastore.Batching } -func newBlockstore(store *Store, cache *blockstoreCache, ds datastore.Batching) *blockstore { - return &blockstore{ - store: store, - cache: cache, - ds: namespace.Wrap(ds, blockstoreCacheKey), - } -} - func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash()) if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { @@ -63,6 +54,11 @@ func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { blockstr, err := bs.getReadOnlyBlockstore(ctx, cid) + if err == nil { + defer closeAndLog("blockstore", blockstr) + return blockstr.Get(ctx, cid) + } + if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { k := dshelp.MultihashToDsKey(cid.Hash()) blockData, err := bs.ds.Get(ctx, k) @@ -72,15 +68,18 @@ func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error // nmt's GetNode expects an ipld.ErrNotFound when a cid is not found. return nil, ipld.ErrNotFound{Cid: cid} } - if err != nil { - log.Debugf("failed to get blockstore for cid %s: %s", cid, err) - return nil, err - } - return blockstr.Get(ctx, cid) + + log.Debugf("failed to get blockstore for cid %s: %s", cid, err) + return nil, err } func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { blockstr, err := bs.getReadOnlyBlockstore(ctx, cid) + if err == nil { + defer closeAndLog("blockstore", blockstr) + return blockstr.GetSize(ctx, cid) + } + if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { k := dshelp.MultihashToDsKey(cid.Hash()) size, err := bs.ds.GetSize(ctx, k) @@ -90,10 +89,9 @@ func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { // nmt's GetSize expects an ipld.ErrNotFound when a cid is not found. return 0, ipld.ErrNotFound{Cid: cid} } - if err != nil { - return 0, err - } - return blockstr.GetSize(ctx, cid) + + log.Debugf("failed to get size for cid %s: %s", cid, err) + return 0, err } func (bs *blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error { @@ -139,7 +137,7 @@ func (bs *blockstore) HashOnRead(bool) { } // getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID. -func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) { +func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (*BlockstoreCloser, error) { keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash()) if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { return nil, ErrNotFound @@ -148,11 +146,28 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (d return nil, fmt.Errorf("failed to find shards containing multihash: %w", err) } - // a share can exist in multiple EDSes, so just take the first one. + // check if cache contains any of accessors shardKey := keys[0] - accessor, err := bs.store.getCachedAccessor(ctx, shardKey) + if accessor, err := bs.store.cache.Get(shardKey); err == nil { + return blockstoreCloser(accessor) + } + + // load accessor to the cache and use it as blockstoreCloser + accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor) if err != nil { return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err) } - return accessor.bs, nil + return blockstoreCloser(accessor) +} + +// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor +func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) { + bs, err := ac.Blockstore() + if err != nil { + return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err) + } + return &BlockstoreCloser{ + ReadBlockstore: bs, + Closer: ac, + }, nil } diff --git a/share/eds/blockstore_test.go b/share/eds/blockstore_test.go index 745797bf42..d9dbf7ed30 100644 --- a/share/eds/blockstore_test.go +++ b/share/eds/blockstore_test.go @@ -37,6 +37,9 @@ func TestBlockstore_Operations(t *testing.T) { topLevelBS := edsStore.Blockstore() carBS, err := edsStore.CARBlockstore(ctx, dah.Hash()) require.NoError(t, err) + defer func() { + require.NoError(t, carBS.Close()) + }() root, err := edsStore.GetDAH(ctx, dah.Hash()) require.NoError(t, err) diff --git a/share/eds/cache/accessor_cache.go b/share/eds/cache/accessor_cache.go new file mode 100644 index 0000000000..ff955f8c45 --- /dev/null +++ b/share/eds/cache/accessor_cache.go @@ -0,0 +1,173 @@ +package cache + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/shard" + lru "github.com/hashicorp/golang-lru/v2" +) + +var _ Cache = (*AccessorCache)(nil) + +// AccessorCache implements the Cache interface using an LRU cache backend. +type AccessorCache struct { + // The name is a prefix that will be used for cache metrics if they are enabled. + name string + // stripedLocks prevents simultaneous RW access to the blockstore cache for a shard. Instead + // of using only one lock or one lock per key, we stripe the shard keys across 256 locks. 256 is + // chosen because it 0-255 is the range of values we get looking at the last byte of the key. + stripedLocks [256]sync.Mutex + // Caches the blockstore for a given shard for shard read affinity, i.e., further reads will likely + // be from the same shard. Maps (shard key -> blockstore). + cache *lru.Cache[shard.Key, *accessorWithBlockstore] + + metrics *metrics +} + +// accessorWithBlockstore is the value that we store in the blockstore Cache. It implements the +// Accessor interface. +type accessorWithBlockstore struct { + sync.RWMutex + shardAccessor Accessor + // The blockstore is stored separately because each access to the blockstore over the shard + // accessor reopens the underlying CAR. + bs dagstore.ReadBlockstore +} + +// Blockstore implements the Blockstore of the Accessor interface. It creates the blockstore on the +// first request and reuses the created instance for all subsequent requests. +func (s *accessorWithBlockstore) Blockstore() (dagstore.ReadBlockstore, error) { + s.Lock() + defer s.Unlock() + var err error + if s.bs == nil { + s.bs, err = s.shardAccessor.Blockstore() + } + return s.bs, err +} + +// Reader returns a new copy of the reader to read data. +func (s *accessorWithBlockstore) Reader() io.Reader { + return s.shardAccessor.Reader() +} + +func NewAccessorCache(name string, cacheSize int) (*AccessorCache, error) { + bc := &AccessorCache{ + name: name, + } + // Instantiate the blockstore Cache. + bslru, err := lru.NewWithEvict[shard.Key, *accessorWithBlockstore](cacheSize, bc.evictFn()) + if err != nil { + return nil, fmt.Errorf("failed to instantiate blockstore cache: %w", err) + } + bc.cache = bslru + return bc, nil +} + +// evictFn will be invoked when an item is evicted from the cache. +func (bc *AccessorCache) evictFn() func(shard.Key, *accessorWithBlockstore) { + return func(_ shard.Key, abs *accessorWithBlockstore) { + err := abs.shardAccessor.Close() + if err != nil { + bc.metrics.observeEvicted(true) + log.Errorf("couldn't close accessor after cache eviction: %s", err) + return + } + bc.metrics.observeEvicted(false) + } +} + +// Get retrieves the Accessor for a given shard key from the Cache. If the Accessor is not in +// the Cache, it returns an errCacheMiss. +func (bc *AccessorCache) Get(key shard.Key) (Accessor, error) { + lk := &bc.stripedLocks[shardKeyToStriped(key)] + lk.Lock() + defer lk.Unlock() + + accessor, err := bc.get(key) + if err != nil { + bc.metrics.observeGet(false) + return nil, err + } + bc.metrics.observeGet(true) + return newCloser(accessor), nil +} + +func (bc *AccessorCache) get(key shard.Key) (*accessorWithBlockstore, error) { + abs, ok := bc.cache.Get(key) + if !ok { + return nil, errCacheMiss + } + return abs, nil +} + +// GetOrLoad attempts to get an item from the cache, and if not found, invokes +// the provided loader function to load it. +func (bc *AccessorCache) GetOrLoad( + ctx context.Context, + key shard.Key, + loader func(context.Context, shard.Key) (Accessor, error), +) (Accessor, error) { + lk := &bc.stripedLocks[shardKeyToStriped(key)] + lk.Lock() + defer lk.Unlock() + + abs, err := bc.get(key) + if err == nil { + bc.metrics.observeGet(true) + return newCloser(abs), nil + } + + // accessor not found in cache, so load new one using loader + accessor, err := loader(ctx, key) + if err != nil { + return nil, fmt.Errorf("unable to load accessor: %w", err) + } + + abs = &accessorWithBlockstore{ + shardAccessor: accessor, + } + + // Create a new accessor first to increment the reference count in it, so it cannot get evicted + // from the inner lru cache before it is used. + ac := newCloser(abs) + bc.cache.Add(key, abs) + return ac, nil +} + +// Remove removes the Accessor for a given key from the cache. +func (bc *AccessorCache) Remove(key shard.Key) error { + // The cache will call evictFn on removal, where accessor close will be called. + bc.cache.Remove(key) + return nil +} + +// EnableMetrics enables metrics for the cache. +func (bc *AccessorCache) EnableMetrics() error { + var err error + bc.metrics, err = newMetrics(bc) + return err +} + +// accessorCloser is a temporary object before reference counting is implemented. +type accessorCloser struct { + *accessorWithBlockstore + io.Closer +} + +func newCloser(abs *accessorWithBlockstore) *accessorCloser { + return &accessorCloser{ + accessorWithBlockstore: abs, + Closer: io.NopCloser(nil), + } +} + +// shardKeyToStriped returns the index of the lock to use for a given shard key. We use the last +// byte of the shard key as the pseudo-random index. +func shardKeyToStriped(sk shard.Key) byte { + return sk.String()[len(sk.String())-1] +} diff --git a/share/eds/cache/accessor_cache_test.go b/share/eds/cache/accessor_cache_test.go new file mode 100644 index 0000000000..5e928e85cc --- /dev/null +++ b/share/eds/cache/accessor_cache_test.go @@ -0,0 +1,246 @@ +package cache + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + "time" + + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/shard" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" +) + +func TestAccessorCache(t *testing.T) { + t.Run("add / get item from cache", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cache, err := NewAccessorCache("test", 1) + require.NoError(t, err) + + // add accessor to the cache + key := shard.KeyFromString("key") + mock := &mockAccessor{ + data: []byte("test_data"), + } + loaded, err := cache.GetOrLoad(ctx, key, func(ctx context.Context, key shard.Key) (Accessor, error) { + return mock, nil + }) + require.NoError(t, err) + + // check if item exists + got, err := cache.Get(key) + require.NoError(t, err) + + l, err := io.ReadAll(loaded.Reader()) + require.NoError(t, err) + require.Equal(t, mock.data, l) + g, err := io.ReadAll(got.Reader()) + require.NoError(t, err) + require.Equal(t, mock.data, g) + }) + + t.Run("get blockstore from accessor", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cache, err := NewAccessorCache("test", 1) + require.NoError(t, err) + + // add accessor to the cache + key := shard.KeyFromString("key") + mock := &mockAccessor{} + accessor, err := cache.GetOrLoad(ctx, key, func(ctx context.Context, key shard.Key) (Accessor, error) { + return mock, nil + }) + require.NoError(t, err) + + // check if item exists + _, err = cache.Get(key) + require.NoError(t, err) + + // blockstore should be created only after first request + require.Equal(t, 0, mock.returnedBs) + + // try to get blockstore + _, err = accessor.Blockstore() + require.NoError(t, err) + + // second call to blockstore should return same blockstore + _, err = accessor.Blockstore() + require.NoError(t, err) + require.Equal(t, 1, mock.returnedBs) + }) + + t.Run("remove an item", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cache, err := NewAccessorCache("test", 1) + require.NoError(t, err) + + // add accessor to the cache + key := shard.KeyFromString("key") + mock := &mockAccessor{} + ac, err := cache.GetOrLoad(ctx, key, func(ctx context.Context, key shard.Key) (Accessor, error) { + return mock, nil + }) + require.NoError(t, err) + err = ac.Close() + require.NoError(t, err) + + err = cache.Remove(key) + require.NoError(t, err) + + // accessor should be closed on removal + mock.checkClosed(t, true) + + // check if item exists + _, err = cache.Get(key) + require.ErrorIs(t, err, errCacheMiss) + }) + + t.Run("successive reads should read the same data", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cache, err := NewAccessorCache("test", 1) + require.NoError(t, err) + + // add accessor to the cache + key := shard.KeyFromString("key") + mock := &mockAccessor{data: []byte("test")} + accessor, err := cache.GetOrLoad(ctx, key, func(ctx context.Context, key shard.Key) (Accessor, error) { + return mock, nil + }) + require.NoError(t, err) + + loaded, err := io.ReadAll(accessor.Reader()) + require.NoError(t, err) + require.Equal(t, mock.data, loaded) + + for i := 0; i < 2; i++ { + accessor, err = cache.Get(key) + require.NoError(t, err) + got, err := io.ReadAll(accessor.Reader()) + require.NoError(t, err) + require.Equal(t, mock.data, got) + } + }) + + t.Run("removed by eviction", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cache, err := NewAccessorCache("test", 1) + require.NoError(t, err) + + // add accessor to the cache + key := shard.KeyFromString("key") + mock := &mockAccessor{} + ac1, err := cache.GetOrLoad(ctx, key, func(ctx context.Context, key shard.Key) (Accessor, error) { + return mock, nil + }) + require.NoError(t, err) + err = ac1.Close() + require.NoError(t, err) + + // add second item + key2 := shard.KeyFromString("key2") + ac2, err := cache.GetOrLoad(ctx, key2, func(ctx context.Context, key shard.Key) (Accessor, error) { + return mock, nil + }) + require.NoError(t, err) + err = ac2.Close() + require.NoError(t, err) + + // accessor should be closed on removal by eviction + mock.checkClosed(t, true) + + // check if item evicted + _, err = cache.Get(key) + require.ErrorIs(t, err, errCacheMiss) + }) + + t.Run("close on accessor is noop", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + cache, err := NewAccessorCache("test", 1) + require.NoError(t, err) + + // add accessor to the cache + key := shard.KeyFromString("key") + mock := &mockAccessor{} + _, err = cache.GetOrLoad(ctx, key, func(ctx context.Context, key shard.Key) (Accessor, error) { + return mock, nil + }) + require.NoError(t, err) + + // check if item exists + accessor, err := cache.Get(key) + require.NoError(t, err) + require.NotNil(t, accessor) + + // close on returned accessor should not close inner reader + err = accessor.Close() + require.NoError(t, err) + + // check that close was not performed on inner accessor + mock.checkClosed(t, false) + }) +} + +type mockAccessor struct { + data []byte + isClosed bool + returnedBs int +} + +func (m *mockAccessor) Reader() io.Reader { + return bytes.NewBuffer(m.data) +} + +func (m *mockAccessor) Blockstore() (dagstore.ReadBlockstore, error) { + if m.returnedBs > 0 { + return nil, errors.New("blockstore already returned") + } + m.returnedBs++ + return rbsMock{}, nil +} + +func (m *mockAccessor) Close() error { + if m.isClosed { + return errors.New("already closed") + } + m.isClosed = true + return nil +} + +func (m *mockAccessor) checkClosed(t *testing.T, expected bool) { + // item will be removed in background, so give it some time to settle + time.Sleep(time.Millisecond * 100) + require.Equal(t, expected, m.isClosed) +} + +// rbsMock is a dagstore.ReadBlockstore mock +type rbsMock struct{} + +func (r rbsMock) Has(context.Context, cid.Cid) (bool, error) { + panic("implement me") +} + +func (r rbsMock) Get(_ context.Context, _ cid.Cid) (blocks.Block, error) { + panic("implement me") +} + +func (r rbsMock) GetSize(context.Context, cid.Cid) (int, error) { + panic("implement me") +} + +func (r rbsMock) AllKeysChan(context.Context) (<-chan cid.Cid, error) { + panic("implement me") +} + +func (r rbsMock) HashOnRead(bool) { + panic("implement me") +} diff --git a/share/eds/cache/cache.go b/share/eds/cache/cache.go new file mode 100644 index 0000000000..13e207d7c0 --- /dev/null +++ b/share/eds/cache/cache.go @@ -0,0 +1,49 @@ +package cache + +import ( + "context" + "errors" + "io" + + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/shard" + logging "github.com/ipfs/go-log/v2" + "go.opentelemetry.io/otel" +) + +var ( + log = logging.Logger("share/eds/cache") + meter = otel.Meter("eds_store_cache") +) + +var ( + errCacheMiss = errors.New("accessor not found in blockstore cache") +) + +// Cache is an interface that defines the basic Cache operations. +type Cache interface { + // Get retrieves an item from the Cache. + Get(shard.Key) (Accessor, error) + + // GetOrLoad attempts to get an item from the Cache and, if not found, invokes + // the provided loader function to load it into the Cache. + GetOrLoad( + ctx context.Context, + key shard.Key, + loader func(context.Context, shard.Key) (Accessor, error), + ) (Accessor, error) + + // Remove removes an item from Cache. + Remove(shard.Key) error + + // EnableMetrics enables metrics in Cache + EnableMetrics() error +} + +// Accessor is a interface type returned by cache, that allows to read raw data by reader or create +// readblockstore +type Accessor interface { + Blockstore() (dagstore.ReadBlockstore, error) + Reader() io.Reader + io.Closer +} diff --git a/share/eds/cache/metrics.go b/share/eds/cache/metrics.go new file mode 100644 index 0000000000..21e52fec10 --- /dev/null +++ b/share/eds/cache/metrics.go @@ -0,0 +1,66 @@ +package cache + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + cacheFoundKey = "found" + failedKey = "failed" +) + +type metrics struct { + getCounter metric.Int64Counter + evictedCounter metric.Int64Counter +} + +func newMetrics(bc *AccessorCache) (*metrics, error) { + evictedCounter, err := meter.Int64Counter("eds_blockstore_cache_"+bc.name+"_evicted_counter", + metric.WithDescription("eds blockstore cache evicted event counter")) + if err != nil { + return nil, err + } + + getCounter, err := meter.Int64Counter("eds_blockstore_cache_"+bc.name+"_get_counter", + metric.WithDescription("eds blockstore cache evicted event counter")) + if err != nil { + return nil, err + } + + cacheSize, err := meter.Int64ObservableGauge("eds_blockstore_cache_"+bc.name+"_size", + metric.WithDescription("total amount of items in blockstore cache"), + ) + if err != nil { + return nil, err + } + + callback := func(ctx context.Context, observer metric.Observer) error { + observer.ObserveInt64(cacheSize, int64(bc.cache.Len())) + return nil + } + _, err = meter.RegisterCallback(callback, cacheSize) + + return &metrics{ + getCounter: getCounter, + evictedCounter: evictedCounter, + }, err +} + +func (m *metrics) observeEvicted(failed bool) { + if m == nil { + return + } + m.evictedCounter.Add(context.Background(), 1, metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeGet(found bool) { + if m == nil { + return + } + m.getCounter.Add(context.Background(), 1, metric.WithAttributes( + attribute.Bool(cacheFoundKey, found))) +} diff --git a/share/eds/cache/noop.go b/share/eds/cache/noop.go new file mode 100644 index 0000000000..2af94feb1b --- /dev/null +++ b/share/eds/cache/noop.go @@ -0,0 +1,31 @@ +package cache + +import ( + "context" + + "github.com/filecoin-project/dagstore/shard" +) + +var _ Cache = (*NoopCache)(nil) + +// NoopCache implements noop version of Cache interface +type NoopCache struct{} + +func (n NoopCache) Get(shard.Key) (Accessor, error) { + return nil, errCacheMiss +} + +func (n NoopCache) GetOrLoad( + context.Context, shard.Key, + func(context.Context, shard.Key) (Accessor, error), +) (Accessor, error) { + return nil, nil +} + +func (n NoopCache) Remove(shard.Key) error { + return nil +} + +func (n NoopCache) EnableMetrics() error { + return nil +} diff --git a/share/eds/metrics.go b/share/eds/metrics.go index 9d4b2a53ef..8f87643f17 100644 --- a/share/eds/metrics.go +++ b/share/eds/metrics.go @@ -12,7 +12,6 @@ import ( const ( failedKey = "failed" sizeKey = "eds_size" - cachedKey = "cached" putResultKey = "result" putOK putResult = "ok" @@ -45,7 +44,6 @@ type metrics struct { getTime metric.Float64Histogram hasTime metric.Float64Histogram listTime metric.Float64Histogram - getAccessorTime metric.Float64Histogram shardFailureCount metric.Int64Counter @@ -102,12 +100,6 @@ func (s *Store) WithMetrics() error { return err } - getAccessorTime, err := meter.Float64Histogram("eds_store_get_accessor_time_histogram", - metric.WithDescription("eds store get accessor time histogram(s)")) - if err != nil { - return err - } - shardFailureCount, err := meter.Int64Counter("eds_store_shard_failure_counter", metric.WithDescription("eds store OpShardFail counter")) if err != nil { @@ -132,7 +124,7 @@ func (s *Store) WithMetrics() error { return err } - if err = s.cache.withMetrics(); err != nil { + if err = s.cache.EnableMetrics(); err != nil { return err } @@ -160,7 +152,6 @@ func (s *Store) WithMetrics() error { getTime: getTime, hasTime: hasTime, listTime: listTime, - getAccessorTime: getAccessorTime, shardFailureCount: shardFailureCount, longOpTime: longOpTime, gcTime: gcTime, @@ -299,16 +290,3 @@ func (m *metrics) observeList(ctx context.Context, dur time.Duration, failed boo m.listTime.Record(ctx, dur.Seconds(), metric.WithAttributes( attribute.Bool(failedKey, failed))) } - -func (m *metrics) observeGetAccessor(ctx context.Context, dur time.Duration, cached, failed bool) { - if m == nil { - return - } - if ctx.Err() != nil { - ctx = context.Background() - } - - m.getAccessorTime.Record(ctx, dur.Seconds(), metric.WithAttributes( - attribute.Bool(cachedKey, cached), - attribute.Bool(failedKey, failed))) -} diff --git a/share/eds/ods_test.go b/share/eds/ods_test.go index 5b6ed5568b..0f7c69e708 100644 --- a/share/eds/ods_test.go +++ b/share/eds/ods_test.go @@ -32,6 +32,9 @@ func TestODSReader(t *testing.T) { // get CAR reader from store r, err := edsStore.GetCAR(ctx, dah.Hash()) assert.NoError(t, err) + defer func() { + require.NoError(t, r.Close()) + }() // create ODSReader wrapper based on car reader to limit reads to ODS only odsR, err := ODSReader(r) @@ -81,6 +84,9 @@ func TestODSReaderReconstruction(t *testing.T) { // get CAR reader from store r, err := edsStore.GetCAR(ctx, dah.Hash()) assert.NoError(t, err) + defer func() { + require.NoError(t, r.Close()) + }() // create ODSReader wrapper based on car reader to limit reads to ODS only odsR, err := ODSReader(r) diff --git a/share/eds/store.go b/share/eds/store.go index 974f147292..e8caf4c35a 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -26,6 +26,7 @@ import ( "github.com/celestiaorg/celestia-node/libs/utils" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/cache" "github.com/celestiaorg/celestia-node/share/ipld" ) @@ -38,6 +39,8 @@ const ( // shards that are currently available but inactive, or errored. // We don't use transient files right now, so GC is turned off by default. defaultGCInterval = 0 + + defaultCacheSize = 128 ) var ErrNotFound = errors.New("eds not found in store") @@ -52,8 +55,8 @@ type Store struct { dgstr *dagstore.DAGStore mounts *mount.Registry - cache *blockstoreCache - bs bstore.Blockstore + bs *blockstore + cache cache.Cache carIdx index.FullIndexRepo invertedIdx *simpleInvertedIndex @@ -111,9 +114,9 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { return nil, fmt.Errorf("failed to create DAGStore: %w", err) } - cache, err := newBlockstoreCache(defaultCacheSize) + accessorCache, err := cache.NewAccessorCache("cache", defaultCacheSize) if err != nil { - return nil, fmt.Errorf("failed to create blockstore cache: %w", err) + return nil, fmt.Errorf("failed to create recent blocks cache: %w", err) } store := &Store{ @@ -124,9 +127,9 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { gcInterval: defaultGCInterval, mounts: r, shardFailures: failureChan, - cache: cache, + cache: accessorCache, } - store.bs = newBlockstore(store, cache, ds) + store.bs = newBlockstore(store, ds) return store, nil } @@ -177,7 +180,6 @@ func (s *Store) gc(ctx context.Context) { } s.lastGCResult.Store(res) } - } } @@ -237,7 +239,7 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext if err != nil { return err } - defer f.Close() + defer closeAndLog("car file", f) // save encoded eds into buffer mount := &inMemoryOnceMount{ @@ -261,17 +263,35 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext return fmt.Errorf("failed to initiate shard registration: %w", err) } + var result dagstore.ShardResult select { + case result = <-ch: case <-ctx.Done(): - // if context finished before result was received, track result in separate goroutine + // if the context finished before the result was received, track the result in a separate goroutine go trackLateResult("put", ch, s.metrics, time.Minute*5) return ctx.Err() - case result := <-ch: - if result.Error != nil { - return fmt.Errorf("failed to register shard: %w", result.Error) - } - return nil } + + if result.Error != nil { + return fmt.Errorf("failed to register shard: %w", result.Error) + } + + // the accessor returned in the result will be nil, so the shard needs to be acquired first to + // become available in the cache. It might take some time, and the result should not affect the put + // operation, so do it in a goroutine + // TODO: Ideally, only recent blocks should be put in the cache, but there is no way right now to + // check such a condition. + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + _, err := s.cache.GetOrLoad(ctx, result.Key, s.getAccessor) + if err != nil { + log.Warnw("unable to put accessor to recent blocks accessors cache", "err", err) + return + } + }() + + return nil } // waitForResult waits for a result from the res channel for a maximum duration specified by @@ -292,7 +312,7 @@ func trackLateResult(opName string, res <-chan dagstore.ShardResult, metrics *me } if result.Error != nil { metrics.observeLongOp(context.Background(), opName, time.Since(tnow), longOpFailed) - log.Errorf("failed to register shard after context expired: %v ago, err: %w", time.Since(tnow), result.Error) + log.Errorf("failed to register shard after context expired: %v ago, err: %s", time.Since(tnow), result.Error) return } metrics.observeLongOp(context.Background(), opName, time.Since(tnow), longOpOK) @@ -309,7 +329,7 @@ func trackLateResult(opName string, res <-chan dagstore.ShardResult, metrics *me // // The shard is cached in the Store, so subsequent calls to GetCAR with the same root will use the // same reader. The cache is responsible for closing the underlying reader. -func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.Reader, error) { +func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) { ctx, span := tracer.Start(ctx, "store/get-car") tnow := time.Now() r, err := s.getCAR(ctx, root) @@ -318,13 +338,21 @@ func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.Reader, err return r, err } -func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.Reader, error) { - key := root.String() - accessor, err := s.getCachedAccessor(ctx, shard.KeyFromString(key)) +func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) { + key := shard.KeyFromString(root.String()) + accessor, err := s.cache.Get(key) + if err == nil { + return newReadCloser(accessor), nil + } + // If the accessor is not found in the cache, create a new one from dagstore. We don't put the + // accessor in the cache here because getCAR is used by shrex-eds. There is a lower probability, + // compared to other cache put triggers, that the same block will be requested again soon. + shardAccessor, err := s.getAccessor(ctx, key) if err != nil { return nil, fmt.Errorf("failed to get accessor: %w", err) } - return accessor.sa.Reader(), nil + + return newReadCloser(shardAccessor), nil } // Blockstore returns an IPFS blockstore providing access to individual shares/nodes of all EDS @@ -342,25 +370,31 @@ func (s *Store) Blockstore() bstore.Blockstore { func (s *Store) CARBlockstore( ctx context.Context, root share.DataHash, -) (dagstore.ReadBlockstore, error) { +) (*BlockstoreCloser, error) { ctx, span := tracer.Start(ctx, "store/car-blockstore") tnow := time.Now() - r, err := s.carBlockstore(ctx, root) + cbs, err := s.carBlockstore(ctx, root) s.metrics.observeCARBlockstore(ctx, time.Since(tnow), err != nil) utils.SetStatusAndEnd(span, err) - return r, err + return cbs, err } func (s *Store) carBlockstore( ctx context.Context, root share.DataHash, -) (dagstore.ReadBlockstore, error) { +) (*BlockstoreCloser, error) { key := shard.KeyFromString(root.String()) - accessor, err := s.getCachedAccessor(ctx, key) + accessor, err := s.cache.Get(key) + if err == nil { + return blockstoreCloser(accessor) + } + + // if the accessor is not found in the cache, create a new one from dagstore + sa, err := s.getAccessor(ctx, key) if err != nil { - return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err) + return nil, fmt.Errorf("failed to get accessor: %w", err) } - return accessor.bs, nil + return blockstoreCloser(sa) } // GetDAH returns the DataAvailabilityHeader for the EDS identified by DataHash. @@ -374,13 +408,13 @@ func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, e } func (s *Store) getDAH(ctx context.Context, root share.DataHash) (*share.Root, error) { - key := shard.KeyFromString(root.String()) - accessor, err := s.getCachedAccessor(ctx, key) + r, err := s.getCAR(ctx, root) if err != nil { - return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err) + return nil, fmt.Errorf("eds/store: failed to get CAR file: %w", err) } + defer closeAndLog("car reader", r) - carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor.sa.Reader())) + carHeader, err := carv1.ReadHeader(bufio.NewReader(r)) if err != nil { return nil, fmt.Errorf("eds/store: failed to read car header: %w", err) } @@ -405,7 +439,7 @@ func dahFromCARHeader(carHeader *carv1.CarHeader) *share.Root { } } -func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.ShardAccessor, error) { +func (s *Store) getAccessor(ctx context.Context, key shard.Key) (cache.Accessor, error) { ch := make(chan dagstore.ShardResult, 1) err := s.dgstr.AcquireShard(ctx, key, ch, dagstore.AcquireOpts{}) if err != nil { @@ -427,33 +461,6 @@ func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.Shard } } -func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessorWithBlockstore, error) { - lk := &s.cache.stripedLocks[shardKeyToStriped(key)] - lk.Lock() - defer lk.Unlock() - - tnow := time.Now() - accessor, err := s.cache.unsafeGet(key) - if err != nil && err != errCacheMiss { - log.Errorf("unexpected error while reading key from bs cache %s: %s", key, err) - } - if accessor != nil { - s.metrics.observeGetAccessor(ctx, time.Since(tnow), true, false) - return accessor, nil - } - - // wasn't found in cache, so acquire it and add to cache - shardAccessor, err := s.getAccessor(ctx, key) - if err != nil { - s.metrics.observeGetAccessor(ctx, time.Since(tnow), false, err != nil) - return nil, err - } - - a, err := s.cache.unsafeAdd(key, shardAccessor) - s.metrics.observeGetAccessor(ctx, time.Since(tnow), false, err != nil) - return a, err -} - // Remove removes EDS from Store by the given share.Root hash and cleans up all // the indexing. func (s *Store) Remove(ctx context.Context, root share.DataHash) error { @@ -466,14 +473,13 @@ func (s *Store) Remove(ctx context.Context, root share.DataHash) error { } func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) { - key := root.String() - - // Remove from accessor cache, so that existing readers are closed and - // DestroyShard can be executed. - s.cache.Remove(shard.KeyFromString(key)) - + key := shard.KeyFromString(root.String()) + // remove open links to accessor from cache + if err := s.cache.Remove(key); err != nil { + log.Warnw("remove accessor from cache", "err", err) + } ch := make(chan dagstore.ShardResult, 1) - err = s.dgstr.DestroyShard(ctx, shard.KeyFromString(key), ch, dagstore.DestroyOpts{}) + err = s.dgstr.DestroyShard(ctx, key, ch, dagstore.DestroyOpts{}) if err != nil { return fmt.Errorf("failed to initiate shard destruction: %w", err) } @@ -488,7 +494,7 @@ func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) { return ctx.Err() } - dropped, err := s.carIdx.DropFullIndex(shard.KeyFromString(key)) + dropped, err := s.carIdx.DropFullIndex(key) if !dropped { log.Warnf("failed to drop index for %s", key) } @@ -496,7 +502,7 @@ func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) { return fmt.Errorf("failed to drop index for %s: %w", key, err) } - err = os.Remove(s.basepath + blocksPath + key) + err = os.Remove(s.basepath + blocksPath + root.String()) if err != nil { return fmt.Errorf("failed to remove CAR file: %w", err) } @@ -522,11 +528,13 @@ func (s *Store) get(ctx context.Context, root share.DataHash) (eds *rsmt2d.Exten utils.SetStatusAndEnd(span, err) }() - f, err := s.GetCAR(ctx, root) + r, err := s.getCAR(ctx, root) if err != nil { return nil, fmt.Errorf("failed to get CAR file: %w", err) } - eds, err = ReadEDS(ctx, f, root) + defer closeAndLog("car reader", r) + + eds, err = ReadEDS(ctx, r, root) if err != nil { return nil, fmt.Errorf("failed to read EDS from CAR file: %w", err) } diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 21239e320a..0d5283e2f2 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -2,6 +2,7 @@ package eds import ( "context" + "io" "os" "sync" "testing" @@ -9,6 +10,7 @@ import ( "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/shard" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" "github.com/ipld/go-car" @@ -19,6 +21,7 @@ import ( "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/cache" "github.com/celestiaorg/celestia-node/share/eds/edstest" ) @@ -72,6 +75,9 @@ func TestEDSStore(t *testing.T) { r, err := edsStore.GetCAR(ctx, dah.Hash()) assert.NoError(t, err) + defer func() { + require.NoError(t, r.Close()) + }() carReader, err := car.NewCarReader(r) assert.NoError(t, err) @@ -107,6 +113,9 @@ func TestEDSStore(t *testing.T) { _, err = os.Stat(edsStore.basepath + blocksPath + dah.String()) assert.NoError(t, err) + // accessor will be registered in cache async on put, so give it some time to settle + time.Sleep(time.Millisecond * 100) + err = edsStore.Remove(ctx, dah.Hash()) assert.NoError(t, err) @@ -143,6 +152,13 @@ func TestEDSStore(t *testing.T) { err = os.Remove(path) assert.NoError(t, err) + // accessor will be registered in cache async on put, so give it some time to settle + time.Sleep(time.Millisecond * 100) + + // remove non-failed accessor from cache + err = edsStore.cache.Remove(shard.KeyFromString(dah.String())) + assert.NoError(t, err) + _, err = edsStore.GetCAR(ctx, dah.Hash()) assert.Error(t, err) @@ -177,22 +193,18 @@ func TestEDSStore(t *testing.T) { assert.True(t, ok) }) - t.Run("BlockstoreCache", func(t *testing.T) { + t.Run("RecentBlocksCache", func(t *testing.T) { eds, dah := randomEDS(t) - err = edsStore.Put(ctx, dah.Hash(), eds) require.NoError(t, err) - // key isnt in cache yet, so get returns errCacheMiss + // accessor will be registered in cache async on put, so give it some time to settle + time.Sleep(time.Millisecond * 100) + + // check, that the key is in the cache after put shardKey := shard.KeyFromString(dah.String()) _, err = edsStore.cache.Get(shardKey) - assert.ErrorIs(t, err, errCacheMiss) - - // now get it, so that the key is in the cache - _, err = edsStore.CARBlockstore(ctx, dah.Hash()) assert.NoError(t, err) - _, err = edsStore.cache.Get(shardKey) - assert.NoError(t, err, errCacheMiss) }) t.Run("List", func(t *testing.T) { @@ -256,6 +268,14 @@ func TestEDSStore_GC(t *testing.T) { err = edsStore.Put(ctx, dah.Hash(), eds) require.NoError(t, err) + // accessor will be registered in cache async on put, so give it some time to settle + time.Sleep(time.Millisecond * 100) + + // remove links to the shard from cache + key := shard.KeyFromString(share.DataHash(dah.Hash()).String()) + err = edsStore.cache.Remove(key) + require.NoError(t, err) + // doesn't exist yet assert.NotContains(t, edsStore.lastGCResult.Load().Shards, shardKey) @@ -281,20 +301,43 @@ func Test_BlockstoreCache(t *testing.T) { err = edsStore.Start(ctx) require.NoError(t, err) + // store eds to the store with noopCache to allow clean cache after put + swap := edsStore.cache + edsStore.cache = cache.NoopCache{} eds, dah := randomEDS(t) err = edsStore.Put(ctx, dah.Hash(), eds) require.NoError(t, err) - // key isnt in cache yet, so get returns errCacheMiss + // get any key from saved eds + bs, err := edsStore.carBlockstore(ctx, dah.Hash()) + require.NoError(t, err) + defer func() { + require.NoError(t, bs.Close()) + }() + keys, err := bs.AllKeysChan(ctx) + require.NoError(t, err) + var key cid.Cid + select { + case key = <-keys: + case <-ctx.Done(): + t.Fatal("context timeout") + } + + // swap back original cache + edsStore.cache = swap + + // key shouldn't be in cache yet, check for returned errCacheMiss shardKey := shard.KeyFromString(dah.String()) _, err = edsStore.cache.Get(shardKey) - assert.ErrorIs(t, err, errCacheMiss) + require.Error(t, err) - // now get it, so that the key is in the cache - _, err = edsStore.getCachedAccessor(ctx, shardKey) - assert.NoError(t, err) + // now get it from blockstore, to trigger storing to cache + _, err = edsStore.Blockstore().Get(ctx, key) + require.NoError(t, err) + + // should be no errCacheMiss anymore _, err = edsStore.cache.Get(shardKey) - assert.NoError(t, err, errCacheMiss) + require.NoError(t, err) } // Test_CachedAccessor verifies that the reader represented by a cached accessor can be read from @@ -312,26 +355,67 @@ func Test_CachedAccessor(t *testing.T) { err = edsStore.Put(ctx, dah.Hash(), eds) require.NoError(t, err) - shardKey := shard.KeyFromString(dah.String()) - // adds to cache - cachedAccessor, err := edsStore.getCachedAccessor(ctx, shardKey) - assert.NoError(t, err) - - // first read - carReader, err := car.NewCarReader(cachedAccessor.sa.Reader()) - assert.NoError(t, err) - firstBlock, err := carReader.Next() - assert.NoError(t, err) - - // second read - cachedAccessor, err = edsStore.getCachedAccessor(ctx, shardKey) - assert.NoError(t, err) - carReader, err = car.NewCarReader(cachedAccessor.sa.Reader()) - assert.NoError(t, err) - secondBlock, err := carReader.Next() - assert.NoError(t, err) - - assert.Equal(t, firstBlock, secondBlock) + // accessor will be registered in cache async on put, so give it some time to settle + time.Sleep(time.Millisecond * 100) + + // accessor should be in cache + cachedAccessor, err := edsStore.cache.Get(shard.KeyFromString(dah.String())) + require.NoError(t, err) + + // first read from cached accessor + firstBlock, err := io.ReadAll(cachedAccessor.Reader()) + require.NoError(t, err) + require.NoError(t, cachedAccessor.Close()) + + // second read from cached accessor + cachedAccessor, err = edsStore.cache.Get(shard.KeyFromString(dah.String())) + require.NoError(t, err) + secondBlock, err := io.ReadAll(cachedAccessor.Reader()) + require.NoError(t, err) + require.NoError(t, cachedAccessor.Close()) + + require.Equal(t, firstBlock, secondBlock) +} + +// Test_CachedAccessor verifies that the reader represented by a accessor obtained directly from +// dagstore can be read from multiple times, without exhausting the underlying reader. +func Test_NotCachedAccessor(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + edsStore, err := newStore(t) + require.NoError(t, err) + err = edsStore.Start(ctx) + require.NoError(t, err) + // replace cache with noopCache to + edsStore.cache = cache.NoopCache{} + + eds, dah := randomEDS(t) + err = edsStore.Put(ctx, dah.Hash(), eds) + require.NoError(t, err) + + // accessor will be registered in cache async on put, so give it some time to settle + time.Sleep(time.Millisecond * 100) + + // accessor should be in cache + _, err = edsStore.cache.Get(shard.KeyFromString(dah.String())) + require.Error(t, err) + + // first read from direct accessor + carReader, err := edsStore.getCAR(ctx, dah.Hash()) + require.NoError(t, err) + firstBlock, err := io.ReadAll(carReader) + require.NoError(t, err) + require.NoError(t, carReader.Close()) + + // second read from direct accessor + carReader, err = edsStore.getCAR(ctx, dah.Hash()) + require.NoError(t, err) + secondBlock, err := io.ReadAll(carReader) + require.NoError(t, err) + require.NoError(t, carReader.Close()) + + require.Equal(t, firstBlock, secondBlock) } func BenchmarkStore(b *testing.B) { diff --git a/share/eds/utils.go b/share/eds/utils.go new file mode 100644 index 0000000000..e7b24a9aee --- /dev/null +++ b/share/eds/utils.go @@ -0,0 +1,44 @@ +package eds + +import ( + "io" + + "github.com/filecoin-project/dagstore" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + + "github.com/celestiaorg/celestia-node/share/eds/cache" +) + +// readCloser is a helper struct, that combines io.Reader and io.Closer +type readCloser struct { + io.Reader + io.Closer +} + +// BlockstoreCloser represents a blockstore that can also be closed. It combines the functionality +// of a dagstore.ReadBlockstore with that of an io.Closer. +type BlockstoreCloser struct { + dagstore.ReadBlockstore + io.Closer +} + +func newReadCloser(ac cache.Accessor) io.ReadCloser { + return readCloser{ + ac.Reader(), + ac, + } +} + +func newBlockstore(store *Store, ds datastore.Batching) *blockstore { + return &blockstore{ + store: store, + ds: namespace.Wrap(ds, blockstoreCacheKey), + } +} + +func closeAndLog(name string, closer io.Closer) { + if err := closer.Close(); err != nil { + log.Warnw("closing "+name, "err", err) + } +} diff --git a/share/getters/store.go b/share/getters/store.go index 5eca956faa..415c9f047f 100644 --- a/share/getters/store.go +++ b/share/getters/store.go @@ -58,6 +58,11 @@ func (sg *StoreGetter) GetShare(ctx context.Context, dah *share.Root, row, col i if err != nil { return nil, fmt.Errorf("getter/store: failed to retrieve blockstore: %w", err) } + defer func() { + if err := bs.Close(); err != nil { + log.Warnw("closing blockstore", "err", err) + } + }() // wrap the read-only CAR blockstore in a getter blockGetter := eds.NewBlockGetter(bs) @@ -117,6 +122,11 @@ func (sg *StoreGetter) GetSharesByNamespace( if err != nil { return nil, fmt.Errorf("getter/store: failed to retrieve blockstore: %w", err) } + defer func() { + if err := bs.Close(); err != nil { + log.Warnw("closing blockstore", "err", err) + } + }() // wrap the read-only CAR blockstore in a getter blockGetter := eds.NewBlockGetter(bs) diff --git a/share/p2p/shrexeds/server.go b/share/p2p/shrexeds/server.go index fffa0e8152..11b99a3438 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/p2p/shrexeds/server.go @@ -100,8 +100,15 @@ func (s *Server) handleStream(stream network.Stream) { // we do not close the reader, so that other requests will not need to re-open the file. // closing is handled by the LRU cache. edsReader, err := s.store.GetCAR(ctx, hash) - status := p2p_pb.Status_OK + var status p2p_pb.Status switch { + case err == nil: + defer func() { + if err := edsReader.Close(); err != nil { + log.Warnw("closing car reader", "err", err) + } + }() + status = p2p_pb.Status_OK case errors.Is(err, eds.ErrNotFound): logger.Warnw("server: request hash not found") s.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)