Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/eds): Rework accessor cache #2612

Merged
merged 21 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nodebuilder/blob/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions nodebuilder/fraud/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 107 additions & 23 deletions share/eds/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,71 @@
)

var (
defaultCacheSize = 128
errCacheMiss = errors.New("accessor not found in blockstore cache")
errCacheMiss = errors.New("accessor not found in blockstore cache")
)

// cache is an interface that defines the basic cache operations.
type cache interface {
// Get retrieves an item from the cache.
get(shard.Key) (*accessorWithBlockstore, error)

// getOrLoad attempts to get an item from the cache and, if not found, invokes
// the provided loader function to load it into the cache.
getOrLoad(ctx context.Context, key shard.Key, loader func(context.Context, shard.Key) (*dagstore.ShardAccessor, error)) (*accessorWithBlockstore, error)

// enableMetrics enables metrics in cache
enableMetrics() error
}

// multiCache represents a cache that looks into multiple caches one by one.
type multiCache struct {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
caches []cache
}

// newMultiCache creates a new multiCache with the provided caches.
func newMultiCache(caches ...cache) *multiCache {
return &multiCache{caches: caches}
}

// get looks for an item in all the caches one by one and returns the first found item.
func (mc *multiCache) get(key shard.Key) (*accessorWithBlockstore, error) {
for _, cache := range mc.caches {
accessor, err := cache.get(key)
if err == nil {
return accessor, nil
}
}

return nil, errCacheMiss
}

// getOrLoad attempts to get an item from all caches, and if not found, invokes
// the provided loader function to load it into one of the caches.
func (mc *multiCache) getOrLoad(
ctx context.Context,
key shard.Key,
loader func(context.Context, shard.Key) (*dagstore.ShardAccessor, error),
) (*accessorWithBlockstore, error) {
for _, cache := range mc.caches {
accessor, err := cache.getOrLoad(ctx, key, loader)
if err == nil {
return accessor, nil
}
}

return nil, errors.New("multicache: unable to get or load accessor")
}

func (mc *multiCache) enableMetrics() error {
for _, cache := range mc.caches {
err := cache.enableMetrics()
if err != nil {
return err
}
}
return nil
}

// accessorWithBlockstore is the value that we store in the blockstore cache
type accessorWithBlockstore struct {
sa *dagstore.ShardAccessor
Expand All @@ -28,6 +89,8 @@
}

type blockstoreCache struct {
// name is a prefix, that will be used for cache metrics if it is enabled
name string
// stripedLocks prevents simultaneous RW access to the blockstore cache for a shard. Instead
// of using only one lock or one lock per key, we stripe the shard keys across 256 locks. 256 is
// chosen because it 0-255 is the range of values we get looking at the last byte of the key.
Expand All @@ -39,7 +102,7 @@
metrics *cacheMetrics
}

func newBlockstoreCache(cacheSize int) (*blockstoreCache, error) {
func newBlockstoreCache(name string, cacheSize int) (*blockstoreCache, error) {

Check warning on line 105 in share/eds/accessor_cache.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 'name' seems to be unused, consider removing or renaming it as _ (revive)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
bc := &blockstoreCache{}
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(cacheSize, bc.evictFn())
Expand Down Expand Up @@ -71,17 +134,17 @@

// Get retrieves the blockstore for a given shard key from the cache. If the blockstore is not in
// the cache, it returns an errCacheMiss
func (bc *blockstoreCache) Get(shardContainingCid shard.Key) (*accessorWithBlockstore, error) {
lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)]
func (bc *blockstoreCache) get(key shard.Key) (*accessorWithBlockstore, error) {
lk := &bc.stripedLocks[shardKeyToStriped(key)]
lk.Lock()
defer lk.Unlock()

return bc.unsafeGet(shardContainingCid)
return bc.unsafeGet(key)
}

func (bc *blockstoreCache) unsafeGet(shardContainingCid shard.Key) (*accessorWithBlockstore, error) {
func (bc *blockstoreCache) unsafeGet(key shard.Key) (*accessorWithBlockstore, error) {
// We've already ensured that the given shard has the cid/multihash we are looking for.
val, ok := bc.cache.Get(shardContainingCid)
val, ok := bc.cache.Get(key)
if !ok {
return nil, errCacheMiss
}
Expand All @@ -96,22 +159,26 @@
return accessor, nil
}

// Add adds a blockstore for a given shard key to the cache.
func (bc *blockstoreCache) Add(
shardContainingCid shard.Key,
accessor *dagstore.ShardAccessor,
// getOrLoad attempts to get an item from all caches, and if not found, invokes
// the provided loader function to load it into one of the caches.
func (bc *blockstoreCache) getOrLoad(
ctx context.Context,
key shard.Key,
loader func(context.Context, shard.Key) (*dagstore.ShardAccessor, error),
) (*accessorWithBlockstore, error) {
lk := &bc.stripedLocks[shardKeyToStriped(shardContainingCid)]
lk := &bc.stripedLocks[shardKeyToStriped(key)]
lk.Lock()
defer lk.Unlock()

return bc.unsafeAdd(shardContainingCid, accessor)
}
if accessor, err := bc.unsafeGet(key); err == nil {
return accessor, nil
}

accessor, err := loader(ctx, key)
if err != nil {
return nil, fmt.Errorf("unable to get accessor: %w", err)
}

func (bc *blockstoreCache) unsafeAdd(
shardContainingCid shard.Key,
accessor *dagstore.ShardAccessor,
) (*accessorWithBlockstore, error) {
blockStore, err := accessor.Blockstore()
if err != nil {
return nil, fmt.Errorf("failed to get blockstore from accessor: %w", err)
Expand All @@ -121,7 +188,7 @@
bs: blockStore,
sa: accessor,
}
bc.cache.Add(shardContainingCid, newAccessor)
bc.cache.Add(key, newAccessor)
return newAccessor, nil
}

Expand All @@ -135,14 +202,14 @@
evictedCounter metric.Int64Counter
}

func (bc *blockstoreCache) withMetrics() error {
evictedCounter, err := meter.Int64Counter("eds_blockstore_cache_evicted_counter",
func (bc *blockstoreCache) enableMetrics() error {
evictedCounter, err := meter.Int64Counter("eds_blockstore_cache"+bc.name+"_evicted_counter",
metric.WithDescription("eds blockstore cache evicted event counter"))
if err != nil {
return err
}

cacheSize, err := meter.Int64ObservableGauge("eds_blockstore_cache_size",
cacheSize, err := meter.Int64ObservableGauge("eds_blockstore"+bc.name+"_cache_size",
metric.WithDescription("total amount of items in blockstore cache"),
)
if err != nil {
Expand All @@ -168,3 +235,20 @@
m.evictedCounter.Add(context.Background(), 1, metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

type noopCache struct{}

func (n noopCache) get(shard.Key) (*accessorWithBlockstore, error) {
return nil, errCacheMiss
}

func (n noopCache) getOrLoad(
context.Context, shard.Key,
func(context.Context, shard.Key) (*dagstore.ShardAccessor, error),
) (*accessorWithBlockstore, error) {
return nil, nil
}

func (n noopCache) enableMetrics() error {
return nil
}
9 changes: 8 additions & 1 deletion share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,16 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (d
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}

// check hash for any of keys
for _, k := range keys {
if accessor, err := bs.store.cache.get(k); err == nil {
return accessor.bs, nil
}
}

// a share can exist in multiple EDSes, so just take the first one.
shardKey := keys[0]
accessor, err := bs.store.getCachedAccessor(ctx, shardKey)
accessor, err := bs.cache.getOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
Expand Down
2 changes: 1 addition & 1 deletion share/eds/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *Store) WithMetrics() error {
return err
}

if err = s.cache.withMetrics(); err != nil {
if err = s.cache.enableMetrics(); err != nil {
return err
}
s.metrics = &metrics{
Expand Down
Loading
Loading