Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/eds): Rework accessor cache #2612

Merged
merged 21 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/hashicorp/golang-lru v1.0.2
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/imdario/mergo v0.3.16
github.com/ipfs/boxo v0.12.0
github.com/ipfs/go-block-format v0.2.0
Expand Down Expand Up @@ -185,8 +185,8 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
Expand Down
1 change: 1 addition & 0 deletions nodebuilder/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func TestStoreRestart(t *testing.T) {
require.NoError(t, err)
_, err = eds.ReadEDS(ctx, odsReader, h)
require.NoError(t, err)
require.NoError(t, edsReader.Close())
}
}

Expand Down
178 changes: 0 additions & 178 deletions share/eds/accessor_cache.go

This file was deleted.

63 changes: 39 additions & 24 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"errors"
"fmt"

"github.com/filecoin-project/dagstore"
bstore "github.com/ipfs/boxo/blockstore"
dshelp "github.com/ipfs/boxo/datastore/dshelp"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/celestia-node/share/eds/cache"
)

var _ bstore.Blockstore = (*blockstore)(nil)
Expand All @@ -32,18 +32,9 @@ var (
// implementation to allow for the blockstore operations to be routed to the underlying stores.
type blockstore struct {
store *Store
cache *blockstoreCache
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
ds datastore.Batching
}

func newBlockstore(store *Store, cache *blockstoreCache, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
cache: cache,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}

func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
Expand All @@ -63,6 +54,11 @@ func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {

func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if err == nil {
defer closeAndLog("blockstore", blockstr)
return blockstr.Get(ctx, cid)
}

if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
blockData, err := bs.ds.Get(ctx, k)
Expand All @@ -72,15 +68,18 @@ func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error
// nmt's GetNode expects an ipld.ErrNotFound when a cid is not found.
return nil, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}
return blockstr.Get(ctx, cid)

log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}

func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if err == nil {
defer closeAndLog("blockstore", blockstr)
return blockstr.GetSize(ctx, cid)
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
size, err := bs.ds.GetSize(ctx, k)
Expand All @@ -90,10 +89,9 @@ func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
// nmt's GetSize expects an ipld.ErrNotFound when a cid is not found.
return 0, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
return 0, err
}
return blockstr.GetSize(ctx, cid)

log.Debugf("failed to get size for cid %s: %s", cid, err)
return 0, err
}

func (bs *blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
Expand Down Expand Up @@ -139,7 +137,7 @@ func (bs *blockstore) HashOnRead(bool) {
}

// getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID.
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) {
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (*BlockstoreCloser, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
return nil, ErrNotFound
Expand All @@ -148,11 +146,28 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (d
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}

// a share can exist in multiple EDSes, so just take the first one.
// check if cache contains any of accessors
shardKey := keys[0]
accessor, err := bs.store.getCachedAccessor(ctx, shardKey)
if accessor, err := bs.store.cache.Get(shardKey); err == nil {
return blockstoreCloser(accessor)
}

// load accessor to the cache and use it as blockstoreCloser
accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
return accessor.bs, nil
return blockstoreCloser(accessor)
}

// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor
func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) {
bs, err := ac.Blockstore()
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err)
}
return &BlockstoreCloser{
ReadBlockstore: bs,
Closer: ac,
}, nil
}
3 changes: 3 additions & 0 deletions share/eds/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func TestBlockstore_Operations(t *testing.T) {
topLevelBS := edsStore.Blockstore()
carBS, err := edsStore.CARBlockstore(ctx, dah.Hash())
require.NoError(t, err)
defer func() {
require.NoError(t, carBS.Close())
}()

root, err := edsStore.GetDAH(ctx, dah.Hash())
require.NoError(t, err)
Expand Down
Loading
Loading