Skip to content

Commit

Permalink
split cache into recent and blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Sep 5, 2023
1 parent e520b4f commit 2c1c9d6
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 22 deletions.
8 changes: 6 additions & 2 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/ipfs/go-datastore/namespace"
dshelp "github.com/ipfs/go-ipfs-ds-help"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/celestia-node/share/eds/cache"
)

var _ bstore.Blockstore = (*blockstore)(nil)
Expand All @@ -32,12 +34,14 @@ var (
// implementation to allow for the blockstore operations to be routed to the underlying stores.
type blockstore struct {
store *Store
cache cache.Cache
ds datastore.Batching
}

func newBlockstore(store *Store, ds datastore.Batching) *blockstore {
func newBlockstore(store *Store, cache cache.Cache, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
cache: cache,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}
Expand Down Expand Up @@ -155,7 +159,7 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (d

// a share can exist in multiple EDSes, so just take the first one.
shardKey := keys[0]
accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor)
accessor, err := bs.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
Expand Down
60 changes: 60 additions & 0 deletions share/eds/cache/multicache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package cache

import (
"context"
"errors"

"github.com/filecoin-project/dagstore/shard"
)

var _ Cache = (*MultiCache)(nil)

// MultiCache represents a Cache that looks into multiple caches one by one.
type MultiCache struct {
first, second Cache
}

// NewMultiCache creates a new MultiCache with the provided caches.
func NewMultiCache(first, second Cache) *MultiCache {
return &MultiCache{
first: first,
second: second,
}
}

// Get looks for an item in all the caches one by one and returns the Cache found item.
func (mc *MultiCache) Get(key shard.Key) (Accessor, error) {
ac, err := mc.first.Get(key)
if err == nil {
return ac, nil
}
return mc.second.Get(key)
}

// GetOrLoad attempts to get an item from all caches, and if not found, invokes
// the provided loader function to load it into one of the caches.
func (mc *MultiCache) GetOrLoad(
ctx context.Context,
key shard.Key,
loader func(context.Context, shard.Key) (AccessorProvider, error),
) (Accessor, error) {
ac, err := mc.first.GetOrLoad(ctx, key, loader)
if err == nil {
return ac, nil
}
return mc.second.GetOrLoad(ctx, key, loader)
}

// Remove removes an item from all underlying caches
func (mc *MultiCache) Remove(key shard.Key) error {
err1 := mc.first.Remove(key)
err2 := mc.second.Remove(key)
return errors.Join(err1, err2)
}

func (mc *MultiCache) EnableMetrics() error {
if err := mc.first.EnableMetrics(); err != nil {
return err
}
return mc.second.EnableMetrics()
}
76 changes: 65 additions & 11 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const (
// We don't use transient files right now, so GC is turned off by default.
defaultGCInterval = 0

defaultCacheSize = 128
defaultRecentBlocksCacheSize = 10
defaultBlockstoreCacheSize = 128
)

var ErrNotFound = errors.New("eds not found in store")
Expand Down Expand Up @@ -108,21 +109,26 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
return nil, fmt.Errorf("failed to create DAGStore: %w", err)
}

accessorCache, err := cache.NewAccessorCache("cache", defaultCacheSize)
recentBlocksCache, err := cache.NewAccessorCache("recent", defaultRecentBlocksCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create recent blocks cache: %w", err)
}

blockstoreCache, err := cache.NewAccessorCache("blockstore", defaultBlockstoreCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create blockstore cache: %w", err)
}

store := &Store{
basepath: basepath,
dgstr: dagStore,
carIdx: fsRepo,
invertedIdx: invertedIdx,
gcInterval: defaultGCInterval,
mounts: r,
cache: accessorCache,
cache: cache.NewMultiCache(recentBlocksCache, blockstoreCache),
}
store.bs = newBlockstore(store, ds)
store.bs = newBlockstore(store, blockstoreCache, ds)
return store, nil
}

Expand Down Expand Up @@ -246,6 +252,20 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
if result.Error != nil {
return fmt.Errorf("failed to register shard: %w", result.Error)
}

// accessor returned in result will be nil, so shard needs to be acquired first, to become
// available in cache. It might take some time and result should not affect put operation, so do it
// in goroutine
//TODO: Ideally only recent blocks should be put in cache, but there is no way right now to check
// such condition.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if _, err := s.cache.GetOrLoad(ctx, result.Key, s.getAccessor); err != nil {
log.Warnw("unable to put accessor to recent blocks accessors cache", "err", err)
}
}()

return nil
}

Expand Down Expand Up @@ -295,11 +315,22 @@ func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser,

func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) {
key := shard.KeyFromString(root.String())
accessor, err := s.cache.GetOrLoad(ctx, key, s.getAccessor)
accessor, err := s.cache.Get(key)
if err == nil {
return accessor.ReadCloser(), nil
}
// If the accessor is not found in the cache, create a new one from dagstore. We don't put accessor
// to the cache here, because getCAR is used by shrex.eds. There is a lower probability, compared
// to other cache put triggers, that the same block to be requested again.
shardAccessor, err := s.getAccessor(ctx, key)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get accessor: %w", err)
}
return accessor.ReadCloser(), nil

return readCloser{
Reader: shardAccessor.Reader(),
Closer: shardAccessor,
}, nil
}

// Blockstore returns an IPFS blockstore providing access to individual shares/nodes of all EDS
Expand Down Expand Up @@ -331,11 +362,24 @@ func (s *Store) carBlockstore(
root share.DataHash,
) (*cache.BlockstoreCloser, error) {
key := shard.KeyFromString(root.String())
accessor, err := s.cache.GetOrLoad(ctx, key, s.getAccessor)
accessor, err := s.cache.Get(key)
if err == nil {
return accessor.Blockstore()
}
// if accessor not found in cache, create new one from dagstore
sa, err := s.getAccessor(ctx, key)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get accessor: %w", err)
}
return accessor.Blockstore()

bs, err := sa.Blockstore()
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err)
}
return &cache.BlockstoreCloser{
ReadBlockstore: bs,
Closer: sa,
}, nil
}

// GetDAH returns the DataAvailabilityHeader for the EDS identified by DataHash.
Expand Down Expand Up @@ -483,7 +527,11 @@ func (s *Store) get(ctx context.Context, root share.DataHash) (eds *rsmt2d.Exten
}
}()

return ReadEDS(ctx, r, root)
eds, err = ReadEDS(ctx, r, root)
if err != nil {
return nil, fmt.Errorf("failed to read EDS from CAR file: %w", err)
}
return eds, nil
}

// Has checks if EDS exists by the given share.Root hash.
Expand Down Expand Up @@ -583,3 +631,9 @@ type inMemoryReader struct {
func (r *inMemoryReader) Close() error {
return nil
}

// readCloser combines io.Reader and io.Closer
type readCloser struct {
io.Reader
io.Closer
}
91 changes: 82 additions & 9 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/cache"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)

Expand Down Expand Up @@ -142,6 +143,17 @@ func TestEDSStore(t *testing.T) {
assert.True(t, ok)
})

t.Run("RecentBlocksCache", func(t *testing.T) {
eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// check, that the key is in the cache after put
shardKey := shard.KeyFromString(dah.String())
_, err = edsStore.cache.Get(shardKey)
assert.NoError(t, err)
})

t.Run("List", func(t *testing.T) {
const amount = 10
hashes := make([]share.DataHash, 0, amount)
Expand Down Expand Up @@ -210,13 +222,18 @@ func Test_BlockstoreCache(t *testing.T) {
require.NoError(t, err)

// store eds to the store with noopCache to allow clean cache after put
swap := edsStore.cache
edsStore.cache = cache.NoopCache{}
eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// get any key from saved eds
bs, err := edsStore.carBlockstore(ctx, dah.Hash())
require.NoError(t, err)
defer func() {
require.NoError(t, bs.Close())
}()
keys, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
var key cid.Cid
Expand All @@ -226,11 +243,19 @@ func Test_BlockstoreCache(t *testing.T) {
t.Fatal("context timeout")
}

// now get it, so that the key is in the cache
// swap back original cache
edsStore.cache = swap

// key shouldn't be in cache yet, check for returned errCacheMiss
shardKey := shard.KeyFromString(dah.String())
_, err = edsStore.cache.Get(shardKey)
require.ErrorIs(t, err, cache.ErrCacheMiss)

// now get it from blockstore, to trigger storing to cache
_, err = edsStore.Blockstore().Get(ctx, key)
require.NoError(t, err)
// check that blockstore is in the cache

// should be no errCacheMiss anymore
_, err = edsStore.cache.Get(shardKey)
require.NoError(t, err)
}
Expand All @@ -250,19 +275,67 @@ func Test_CachedAccessor(t *testing.T) {
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// first read
car, err := edsStore.GetCAR(ctx, dah.Hash())
// give some time to let cache to get settled in background
time.Sleep(time.Millisecond * 50)

// accessor should be in cache
cachedAccessor, err := edsStore.cache.Get(shard.KeyFromString(dah.String()))
require.NoError(t, err)

// first read from cached accessor
carReader := cachedAccessor.ReadCloser()
firstBlock, err := io.ReadAll(cachedAccessor.ReadCloser())
require.NoError(t, err)
require.NoError(t, carReader.Close())

// second read from cached accessor
carReader = cachedAccessor.ReadCloser()
secondBlock, err := io.ReadAll(cachedAccessor.ReadCloser())
require.NoError(t, err)
require.NoError(t, carReader.Close())

require.Equal(t, firstBlock, secondBlock)
}

// Test_CachedAccessor verifies that the reader represented by a accessor obtained directly from dagstore can be read from
// multiple times, without exhausting the underlying reader.
func Test_NotCachedAccessor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

edsStore, err := newStore(t)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)
// replace cache with noopCache to
edsStore.cache = cache.NoopCache{}

eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// give some time to let cache to get settled in background
time.Sleep(time.Millisecond * 50)

// accessor should be in cache
_, err = edsStore.cache.Get(shard.KeyFromString(dah.String()))
require.ErrorIs(t, err, cache.ErrCacheMiss)

// first read from direct accessor
carReader, err := edsStore.getCAR(ctx, dah.Hash())
require.NoError(t, err)
first, err := io.ReadAll(car)
firstBlock, err := io.ReadAll(carReader)
require.NoError(t, err)
require.NoError(t, carReader.Close())

// second read
car, err = edsStore.GetCAR(ctx, dah.Hash())
// second read from direct accessor
carReader, err = edsStore.getCAR(ctx, dah.Hash())
require.NoError(t, err)
second, err := io.ReadAll(car)
secondBlock, err := io.ReadAll(carReader)
require.NoError(t, err)
require.NoError(t, carReader.Close())

assert.Equal(t, first, second)
require.Equal(t, firstBlock, secondBlock)
}

func BenchmarkStore(b *testing.B) {
Expand Down

0 comments on commit 2c1c9d6

Please sign in to comment.