Skip to content

Commit

Permalink
fix grammar
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Sep 7, 2023
1 parent cee7338 commit 79b72d7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
3 changes: 3 additions & 0 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type blockstore struct {
ds datastore.Batching
}

// BlockstoreCloser represents a blockstore that can also be closed. It combines the functionality
// of a dagstore.ReadBlockstore with that of an io.Closer.
type BlockstoreCloser struct {
dagstore.ReadBlockstore
io.Closer
Expand Down Expand Up @@ -94,6 +96,7 @@ func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
defer closeAndLog("blockstore", blockstr)
return blockstr.GetSize(ctx, cid)
}

if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
size, err := bs.ds.GetSize(ctx, k)
Expand Down
46 changes: 22 additions & 24 deletions share/eds/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,34 @@ import (

var _ Cache = (*AccessorCache)(nil)

// AccessorCache implements Cache interface using LRU cache backend
// AccessorCache implements the Cache interface using an LRU cache backend.
type AccessorCache struct {
// name is a prefix, that will be used for cache metrics if it is enabled
// The name is a prefix that will be used for cache metrics if they are enabled.
name string
// stripedLocks prevents simultaneous RW access to the blockstore cache for a shard. Instead
// of using only one lock or one lock per key, we stripe the shard keys across 256 locks. 256 is
// chosen because it 0-255 is the range of values we get looking at the last byte of the key.
stripedLocks [256]sync.Mutex
// caches the blockstore for a given shard for shard read affinity i.e.
// further reads will likely be from the same shard. Maps (shard key -> blockstore).
// Caches the blockstore for a given shard for shard read affinity, i.e., further reads will likely be from the same shard.
// Maps (shard key -> blockstore).
cache *lru.Cache

metrics *metrics
}

// accessorWithBlockstore is the value that we store in the blockstore Cache. Implements Accessor
// interface
// accessorWithBlockstore is the value that we store in the blockstore Cache. It implements the Accessor interface.
type accessorWithBlockstore struct {
sync.RWMutex
shardAccessor Accessor
// blockstore is stored separately because each access to the blockstore over the shard accessor
// reopens the underlying CAR.
// The blockstore is stored separately because each access to the blockstore over the shard accessor reopens the underlying CAR.
bs dagstore.ReadBlockstore
}

func NewAccessorCache(name string, cacheSize int) (*AccessorCache, error) {
bc := &AccessorCache{
name: name,
}
// instantiate the blockstore Cache
// Instantiate the blockstore Cache.
bslru, err := lru.NewWithEvict(cacheSize, bc.evictFn())
if err != nil {
return nil, fmt.Errorf("failed to instantiate blockstore cache: %w", err)
Expand All @@ -52,10 +50,10 @@ func NewAccessorCache(name string, cacheSize int) (*AccessorCache, error) {
return bc, nil
}

// evictFn will be invoked when item is evicted from the cache
// evictFn will be invoked when an item is evicted from the cache.
func (bc *AccessorCache) evictFn() func(_ interface{}, val interface{}) {
return func(_ interface{}, val interface{}) {
// ensure we close the blockstore for a shard when it's evicted so dagstore can gc it.
// Ensure we close the blockstore for a shard when it's evicted so dagstore can garbage collect it.
abs, ok := val.(*accessorWithBlockstore)
if !ok {
panic(fmt.Sprintf(
Expand All @@ -73,7 +71,7 @@ func (bc *AccessorCache) evictFn() func(_ interface{}, val interface{}) {
}

// Get retrieves the Accessor for a given shard key from the Cache. If the Accessor is not in
// the Cache, it returns an ErrCacheMiss
// the Cache, it returns an ErrCacheMiss.
func (bc *AccessorCache) Get(key shard.Key) (Accessor, error) {
lk := &bc.stripedLocks[shardKeyToStriped(key)]
lk.Lock()
Expand All @@ -89,7 +87,7 @@ func (bc *AccessorCache) Get(key shard.Key) (Accessor, error) {
}

func (bc *AccessorCache) get(key shard.Key) (*accessorWithBlockstore, error) {
// We've already ensured that the given shard has the cid/multihash we are looking for.
// We've already ensured that the given shard has the CID/multihash we are looking for.
val, ok := bc.cache.Get(key)
if !ok {
return nil, ErrCacheMiss
Expand All @@ -105,7 +103,7 @@ func (bc *AccessorCache) get(key shard.Key) (*accessorWithBlockstore, error) {
return abs, nil
}

// GetOrLoad attempts to get an item from cache, and if not found, invokes
// GetOrLoad attempts to get an item from the cache, and if not found, invokes
// the provided loader function to load it.
func (bc *AccessorCache) GetOrLoad(
ctx context.Context,
Expand All @@ -122,15 +120,15 @@ func (bc *AccessorCache) GetOrLoad(

provider, err := loader(ctx, key)
if err != nil {
return nil, fmt.Errorf("unable to get accessor: %w", err)
return nil, fmt.Errorf("unable to load accessor: %w", err)
}

abs := &accessorWithBlockstore{
shardAccessor: provider,
}

// create new accessor first to inc ref count in it, so it could not get evicted from inner cache
// before it is used
// Create a new accessor first to increment the reference count in it, so it cannot get evicted from the inner lru cache
// before it is used.
accessor, err := newCloser(abs)
if err != nil {
return nil, err
Expand All @@ -139,22 +137,22 @@ func (bc *AccessorCache) GetOrLoad(
return accessor, nil
}

// Remove removes Accessor for given key from the cache
// Remove removes the Accessor for a given key from the cache.
func (bc *AccessorCache) Remove(key shard.Key) error {
// cache will call evictFn on removal, where accessor close will be called
// The cache will call evictFn on removal, where accessor close will be called.
bc.cache.Remove(key)
return nil
}

// EnableMetrics enables metrics for the cache
// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() error {
var err error
bc.metrics, err = newMetrics(bc)
return err
}

// Blockstore implements Blockstore of the Accessor interface. It creates blockstore on first
// request and reuses created instance for all next requests.
// Blockstore implements the Blockstore of the Accessor interface. It creates the blockstore on the first
// request and reuses the created instance for all subsequent requests.
func (s *accessorWithBlockstore) Blockstore() (dagstore.ReadBlockstore, error) {
s.Lock()
defer s.Unlock()
Expand All @@ -166,12 +164,12 @@ func (s *accessorWithBlockstore) Blockstore() (dagstore.ReadBlockstore, error) {
return s.bs, err
}

// Reader returns new copy of reader to read data
// Reader returns a new copy of the reader to read data.
func (s *accessorWithBlockstore) Reader() io.Reader {
return s.shardAccessor.Reader()
}

// accessorCloser is a temporal object before refs count is implemented
// accessorCloser is a temporary object before reference counting is implemented.
type accessorCloser struct {
*accessorWithBlockstore
io.Closer
Expand Down
23 changes: 11 additions & 12 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func (s *Store) gc(ctx context.Context) {
}
s.lastGCResult.Store(res)
}

}
}

Expand Down Expand Up @@ -262,7 +261,7 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
select {
case result = <-ch:
case <-ctx.Done():
// if context finished before result was received, track result in separate goroutine
// if the context finished before the result was received, track the result in a separate goroutine
go trackLateResult("put", ch, s.metrics, time.Minute*5)
return ctx.Err()
}
Expand All @@ -271,11 +270,11 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
return fmt.Errorf("failed to register shard: %w", result.Error)
}

// accessor returned in result will be nil, so shard needs to be acquired first, to become
// available in cache. It might take some time and result should not affect put operation, so do it
// in goroutine
//TODO: Ideally only recent blocks should be put in cache, but there is no way right now to check
// such condition.
// the accessor returned in the result will be nil, so the shard needs to be acquired first to become
// available in the cache. It might take some time, and the result should not affect the put operation, so do it
// in a goroutine
// TODO: Ideally, only recent blocks should be put in the cache, but there is no way right now to check
// such a condition.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down Expand Up @@ -339,9 +338,9 @@ func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.ReadCloser,
if err == nil {
return newReadCloser(accessor), nil
}
// If the accessor is not found in the cache, create a new one from dagstore. We don't put accessor
// to the cache here, because getCAR is used by shrex.eds. There is a lower probability, compared
// to other cache put triggers, that the same block to be requested again.
// If the accessor is not found in the cache, create a new one from dagstore. We don't put the accessor
// in the cache here because getCAR is used by shrex-eds. There is a lower probability, compared
// to other cache put triggers, that the same block will be requested again soon.
shardAccessor, err := s.getAccessor(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to get accessor: %w", err)
Expand Down Expand Up @@ -384,7 +383,7 @@ func (s *Store) carBlockstore(
return blockstoreCloser(accessor)
}

// if accessor not found in cache, create new one from dagstore
// if the accessor is not found in the cache, create a new one from dagstore
sa, err := s.getAccessor(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to get accessor: %w", err)
Expand All @@ -405,7 +404,7 @@ func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, e
func (s *Store) getDAH(ctx context.Context, root share.DataHash) (*share.Root, error) {
r, err := s.getCAR(ctx, root)
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err)
return nil, fmt.Errorf("eds/store: failed to get CAR file: %w", err)
}
defer closeAndLog("car reader", r)

Expand Down

0 comments on commit 79b72d7

Please sign in to comment.