Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/eds): Rework accessor cache #2612

Merged
merged 21 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nodebuilder/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func TestStoreRestart(t *testing.T) {
require.NoError(t, err)
_, err = eds.ReadEDS(ctx, odsReader, h)
require.NoError(t, err)
require.NoError(t, edsReader.Close())
}
}

Expand Down
178 changes: 0 additions & 178 deletions share/eds/accessor_cache.go

This file was deleted.

61 changes: 46 additions & 15 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"

"github.com/filecoin-project/dagstore"
bstore "github.com/ipfs/boxo/blockstore"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/celestia-node/share/eds/cache"
)

var _ bstore.Blockstore = (*blockstore)(nil)
Expand All @@ -32,14 +35,17 @@ var (
// implementation to allow for the blockstore operations to be routed to the underlying stores.
type blockstore struct {
store *Store
cache *blockstoreCache
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
ds datastore.Batching
}

func newBlockstore(store *Store, cache *blockstoreCache, ds datastore.Batching) *blockstore {
type BlockstoreCloser struct {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
dagstore.ReadBlockstore
io.Closer
}

func newBlockstore(store *Store, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
cache: cache,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}
Expand All @@ -63,6 +69,11 @@ func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {

func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if err == nil {
defer closeAndLog("blockstore", blockstr)
return blockstr.Get(ctx, cid)
}

if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
blockData, err := bs.ds.Get(ctx, k)
Expand All @@ -72,15 +83,17 @@ func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error
// nmt's GetNode expects an ipld.ErrNotFound when a cid is not found.
return nil, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}
return blockstr.Get(ctx, cid)

log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}

func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if err == nil {
defer closeAndLog("blockstore", blockstr)
return blockstr.GetSize(ctx, cid)
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
size, err := bs.ds.GetSize(ctx, k)
Expand All @@ -90,10 +103,9 @@ func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
// nmt's GetSize expects an ipld.ErrNotFound when a cid is not found.
return 0, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
return 0, err
}
return blockstr.GetSize(ctx, cid)

log.Debugf("failed to get size for cid %s: %s", cid, err)
return 0, err
}

func (bs *blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
Expand Down Expand Up @@ -139,7 +151,7 @@ func (bs *blockstore) HashOnRead(bool) {
}

// getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID.
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) {
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (*BlockstoreCloser, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
return nil, ErrNotFound
Expand All @@ -148,11 +160,30 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (d
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}

// a share can exist in multiple EDSes, check cache to contain any of accessors containing shard
walldiss marked this conversation as resolved.
Show resolved Hide resolved
for _, k := range keys {
if accessor, err := bs.store.cache.Get(k); err == nil {
return blockstoreCloser(accessor)
}
}

// a share can exist in multiple EDSes, so just take the first one.
shardKey := keys[0]
accessor, err := bs.store.getCachedAccessor(ctx, shardKey)
accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
return accessor.bs, nil
return blockstoreCloser(accessor)
}

// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor
func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) {
bs, err := ac.Blockstore()
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err)
}
return &BlockstoreCloser{
ReadBlockstore: bs,
Closer: ac,
}, nil
}
3 changes: 3 additions & 0 deletions share/eds/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func TestBlockstore_Operations(t *testing.T) {
topLevelBS := edsStore.Blockstore()
carBS, err := edsStore.CARBlockstore(ctx, dah.Hash())
require.NoError(t, err)
defer func() {
require.NoError(t, carBS.Close())
}()

root, err := edsStore.GetDAH(ctx, dah.Hash())
require.NoError(t, err)
Expand Down
Loading
Loading