Skip to content

Commit

Permalink
- don't check multiple shard from inverted index keys in cache
Browse files Browse the repository at this point in the history
- minor renames
  • Loading branch information
walldiss committed Sep 13, 2023
1 parent 718a80a commit 795735a
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 30 deletions.
12 changes: 5 additions & 7 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,13 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (*
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}

// a share can exist in multiple EDSes, check cache to contain any of accessors containing shard
for _, k := range keys {
if accessor, err := bs.store.cache.Get(k); err == nil {
return blockstoreCloser(accessor)
}
// check if cache contains any of accessors
shardKey := keys[0]
if accessor, err := bs.store.cache.Get(shardKey); err == nil {
return blockstoreCloser(accessor)
}

// a share can exist in multiple EDSes, so just take the first one.
shardKey := keys[0]
// load accessor to the cache and use it as blockstoreCloser
accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
Expand Down
35 changes: 20 additions & 15 deletions share/eds/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ type AccessorCache struct {
// of using only one lock or one lock per key, we stripe the shard keys across 256 locks. 256 is
// chosen because it 0-255 is the range of values we get looking at the last byte of the key.
stripedLocks [256]sync.Mutex
// Caches the blockstore for a given shard for shard read affinity, i.e., further reads will likely be from the same shard.
// Maps (shard key -> blockstore).
// Caches the blockstore for a given shard for shard read affinity, i.e., further reads will likely
// be from the same shard. Maps (shard key -> blockstore).
cache *lru.Cache

metrics *metrics
}

// accessorWithBlockstore is the value that we store in the blockstore Cache. It implements the Accessor interface.
// accessorWithBlockstore is the value that we store in the blockstore Cache. It implements the
// Accessor interface.
type accessorWithBlockstore struct {
sync.RWMutex
shardAccessor Accessor
// The blockstore is stored separately because each access to the blockstore over the shard accessor reopens the underlying CAR.
// The blockstore is stored separately because each access to the blockstore over the shard
// accessor reopens the underlying CAR.
bs dagstore.ReadBlockstore
}

Expand Down Expand Up @@ -114,27 +116,30 @@ func (bc *AccessorCache) GetOrLoad(
lk.Lock()
defer lk.Unlock()

if accessor, err := bc.get(key); err == nil {
return newCloser(accessor)
abs, err := bc.get(key)
if err == nil {
bc.metrics.observeGet(true)
return newCloser(abs)
}

provider, err := loader(ctx, key)
// accessor not found in cache, so load new one using loader
accessor, err := loader(ctx, key)
if err != nil {
return nil, fmt.Errorf("unable to load accessor: %w", err)
}

abs := &accessorWithBlockstore{
shardAccessor: provider,
abs = &accessorWithBlockstore{
shardAccessor: accessor,
}

// Create a new accessor first to increment the reference count in it, so it cannot get evicted from the inner lru cache
// before it is used.
accessor, err := newCloser(abs)
// Create a new accessor first to increment the reference count in it, so it cannot get evicted
// from the inner lru cache before it is used.
ac, err := newCloser(abs)
if err != nil {
return nil, err
}
bc.cache.Add(key, abs)
return accessor, nil
return ac, nil
}

// Remove removes the Accessor for a given key from the cache.
Expand All @@ -151,8 +156,8 @@ func (bc *AccessorCache) EnableMetrics() error {
return err
}

// Blockstore implements the Blockstore of the Accessor interface. It creates the blockstore on the first
// request and reuses the created instance for all subsequent requests.
// Blockstore implements the Blockstore of the Accessor interface. It creates the blockstore on the
// first request and reuses the created instance for all subsequent requests.
func (s *accessorWithBlockstore) Blockstore() (dagstore.ReadBlockstore, error) {
s.Lock()
defer s.Unlock()
Expand Down
16 changes: 8 additions & 8 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,11 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
return fmt.Errorf("failed to register shard: %w", result.Error)
}

// the accessor returned in the result will be nil, so the shard needs to be acquired first to become
// available in the cache. It might take some time, and the result should not affect the put operation, so do it
// in a goroutine
// TODO: Ideally, only recent blocks should be put in the cache, but there is no way right now to check
// such a condition.
// the accessor returned in the result will be nil, so the shard needs to be acquired first to
// become available in the cache. It might take some time, and the result should not affect the put
// operation, so do it in a goroutine
// TODO: Ideally, only recent blocks should be put in the cache, but there is no way right now to
// check such a condition.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down Expand Up @@ -345,9 +345,9 @@ func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.ReadCloser,
if err == nil {
return newReadCloser(accessor), nil
}
// If the accessor is not found in the cache, create a new one from dagstore. We don't put the accessor
// in the cache here because getCAR is used by shrex-eds. There is a lower probability, compared
// to other cache put triggers, that the same block will be requested again soon.
// If the accessor is not found in the cache, create a new one from dagstore. We don't put the
// accessor in the cache here because getCAR is used by shrex-eds. There is a lower probability,
// compared to other cache put triggers, that the same block will be requested again soon.
shardAccessor, err := s.getAccessor(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to get accessor: %w", err)
Expand Down

0 comments on commit 795735a

Please sign in to comment.