Skip to content

Commit

Permalink
- upg to golang-lru/v2
Browse files Browse the repository at this point in the history
- track close errors in metrics
- move BlockstoreCloser and readCloser to utils
- add comment for sleep in test
  • Loading branch information
walldiss committed Sep 13, 2023
1 parent 795735a commit a118401
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 77 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ require (
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/hashicorp/golang-lru v1.0.2
github.com/imdario/mergo v0.3.16
github.com/ipfs/boxo v0.12.0
github.com/ipfs/go-block-format v0.1.2
Expand Down Expand Up @@ -76,6 +75,8 @@ require (
google.golang.org/protobuf v1.31.0
)

require github.com/hashicorp/golang-lru v1.0.2 // indirect

require (
cloud.google.com/go v0.110.6 // indirect
cloud.google.com/go/compute v1.23.0 // indirect
Expand Down Expand Up @@ -186,7 +187,7 @@ require (
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect
Expand Down
17 changes: 0 additions & 17 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ import (
"context"
"errors"
"fmt"
"io"

"github.com/filecoin-project/dagstore"
bstore "github.com/ipfs/boxo/blockstore"
dshelp "github.com/ipfs/boxo/datastore/dshelp"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/celestia-node/share/eds/cache"
Expand All @@ -38,20 +35,6 @@ type blockstore struct {
ds datastore.Batching
}

// BlockstoreCloser represents a blockstore that can also be closed. It combines the functionality
// of a dagstore.ReadBlockstore with that of an io.Closer.
type BlockstoreCloser struct {
dagstore.ReadBlockstore
io.Closer
}

func newBlockstore(store *Store, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}

func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
Expand Down
49 changes: 14 additions & 35 deletions share/eds/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"fmt"
"io"
"reflect"
"sync"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
lru "github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru/v2"
)

var _ Cache = (*AccessorCache)(nil)
Expand All @@ -24,7 +23,7 @@ type AccessorCache struct {
stripedLocks [256]sync.Mutex
// Caches the blockstore for a given shard for shard read affinity, i.e., further reads will likely
// be from the same shard. Maps (shard key -> blockstore).
cache *lru.Cache
cache *lru.Cache[shard.Key, *accessorWithBlockstore]

metrics *metrics
}
Expand All @@ -44,7 +43,7 @@ func NewAccessorCache(name string, cacheSize int) (*AccessorCache, error) {
name: name,
}
// Instantiate the blockstore Cache.
bslru, err := lru.NewWithEvict(cacheSize, bc.evictFn())
bslru, err := lru.NewWithEvict[shard.Key, *accessorWithBlockstore](cacheSize, bc.evictFn())
if err != nil {
return nil, fmt.Errorf("failed to instantiate blockstore cache: %w", err)
}
Expand All @@ -53,22 +52,15 @@ func NewAccessorCache(name string, cacheSize int) (*AccessorCache, error) {
}

// evictFn will be invoked when an item is evicted from the cache.
func (bc *AccessorCache) evictFn() func(_ interface{}, val interface{}) {
return func(_ interface{}, val interface{}) {
// Ensure we close the blockstore for a shard when it's evicted so dagstore can garbage collect it.
abs, ok := val.(*accessorWithBlockstore)
if !ok {
panic(fmt.Sprintf(
"casting value from cache to accessorWithBlockstore: %s",
reflect.TypeOf(val),
))
}

func (bc *AccessorCache) evictFn() func(shard.Key, *accessorWithBlockstore) {
return func(_ shard.Key, abs *accessorWithBlockstore) {
err := abs.shardAccessor.Close()
if err != nil {
bc.metrics.observeEvicted(true)
log.Errorf("couldn't close accessor after cache eviction: %s", err)
return
}
bc.metrics.observeEvicted()
bc.metrics.observeEvicted(false)
}
}

Expand All @@ -85,23 +77,14 @@ func (bc *AccessorCache) Get(key shard.Key) (Accessor, error) {
return nil, err
}
bc.metrics.observeGet(true)
return newCloser(accessor)
return newCloser(accessor), nil
}

func (bc *AccessorCache) get(key shard.Key) (*accessorWithBlockstore, error) {
// We've already ensured that the given shard has the CID/multihash we are looking for.
val, ok := bc.cache.Get(key)
abs, ok := bc.cache.Get(key)
if !ok {
return nil, ErrCacheMiss
}

abs, ok := val.(*accessorWithBlockstore)
if !ok {
panic(fmt.Sprintf(
"casting value from cache to accessorWithBlockstore: %s",
reflect.TypeOf(val),
))
}
return abs, nil
}

Expand All @@ -119,7 +102,7 @@ func (bc *AccessorCache) GetOrLoad(
abs, err := bc.get(key)
if err == nil {
bc.metrics.observeGet(true)
return newCloser(abs)
return newCloser(abs), nil
}

// accessor not found in cache, so load new one using loader
Expand All @@ -134,10 +117,7 @@ func (bc *AccessorCache) GetOrLoad(

// Create a new accessor first to increment the reference count in it, so it cannot get evicted
// from the inner lru cache before it is used.
ac, err := newCloser(abs)
if err != nil {
return nil, err
}
ac := newCloser(abs)
bc.cache.Add(key, abs)
return ac, nil
}
Expand Down Expand Up @@ -165,7 +145,6 @@ func (s *accessorWithBlockstore) Blockstore() (dagstore.ReadBlockstore, error) {
if s.bs == nil {
s.bs, err = s.shardAccessor.Blockstore()
}

return s.bs, err
}

Expand All @@ -180,11 +159,11 @@ type accessorCloser struct {
io.Closer
}

func newCloser(abs *accessorWithBlockstore) (*accessorCloser, error) {
func newCloser(abs *accessorWithBlockstore) *accessorCloser {
return &accessorCloser{
accessorWithBlockstore: abs,
Closer: io.NopCloser(nil),
}, nil
}
}

// shardKeyToStriped returns the index of the lock to use for a given shard key. We use the last
Expand Down
6 changes: 4 additions & 2 deletions share/eds/cache/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

const (
cacheFoundKey = "found"
failedKey = "failed"
)

type metrics struct {
Expand Down Expand Up @@ -48,11 +49,12 @@ func newMetrics(bc *AccessorCache) (*metrics, error) {
}, err
}

func (m *metrics) observeEvicted() {
func (m *metrics) observeEvicted(failed bool) {
if m == nil {
return
}
m.evictedCounter.Add(context.Background(), 1)
m.evictedCounter.Add(context.Background(), 1, metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeGet(found bool) {
Expand Down
13 changes: 0 additions & 13 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,16 +636,3 @@ type inMemoryReader struct {
func (r *inMemoryReader) Close() error {
return nil
}

// readCloser is a helper struct, that combines io.Reader and io.Closer
type readCloser struct {
io.Reader
io.Closer
}

func newReadCloser(ac cache.Accessor) io.ReadCloser {
return readCloser{
ac.Reader(),
ac,
}
}
22 changes: 15 additions & 7 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func TestEDSStore(t *testing.T) {
_, err = os.Stat(edsStore.basepath + blocksPath + dah.String())
assert.NoError(t, err)

// accessor will be registered in cache async on put, so give it some time to settle
time.Sleep(time.Millisecond * 100)

err = edsStore.Remove(ctx, dah.Hash())
assert.NoError(t, err)

Expand Down Expand Up @@ -150,8 +152,10 @@ func TestEDSStore(t *testing.T) {
err = os.Remove(path)
assert.NoError(t, err)

// remove non-failed accessor from cache
// accessor will be registered in cache async on put, so give it some time to settle
time.Sleep(time.Millisecond * 100)

// remove non-failed accessor from cache
err = edsStore.cache.Remove(shard.KeyFromString(dah.String()))
assert.NoError(t, err)

Expand Down Expand Up @@ -194,8 +198,10 @@ func TestEDSStore(t *testing.T) {
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// check, that the key is in the cache after put
// accessor will be registered in cache async on put, so give it some time to settle
time.Sleep(time.Millisecond * 100)

// check, that the key is in the cache after put
shardKey := shard.KeyFromString(dah.String())
_, err = edsStore.cache.Get(shardKey)
assert.NoError(t, err)
Expand Down Expand Up @@ -262,8 +268,10 @@ func TestEDSStore_GC(t *testing.T) {
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// remove links to the shard from cache
// accessor will be registered in cache async on put, so give it some time to settle
time.Sleep(time.Millisecond * 100)

// remove links to the shard from cache
key := shard.KeyFromString(share.DataHash(dah.Hash()).String())
err = edsStore.cache.Remove(key)
require.NoError(t, err)
Expand Down Expand Up @@ -347,8 +355,8 @@ func Test_CachedAccessor(t *testing.T) {
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// give some time to let cache to get settled in background
time.Sleep(time.Millisecond * 50)
// accessor will be registered in cache async on put, so give it some time to settle
time.Sleep(time.Millisecond * 100)

// accessor should be in cache
cachedAccessor, err := edsStore.cache.Get(shard.KeyFromString(dah.String()))
Expand Down Expand Up @@ -386,8 +394,8 @@ func Test_NotCachedAccessor(t *testing.T) {
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)

// give some time to let cache to get settled in background
time.Sleep(time.Millisecond * 50)
// accessor will be registered in cache async on put, so give it some time to settle
time.Sleep(time.Millisecond * 100)

// accessor should be in cache
_, err = edsStore.cache.Get(shard.KeyFromString(dah.String()))
Expand Down
37 changes: 36 additions & 1 deletion share/eds/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
package eds

import "io"
import (
"io"

"github.com/filecoin-project/dagstore"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"

"github.com/celestiaorg/celestia-node/share/eds/cache"
)

// readCloser is a helper struct, that combines io.Reader and io.Closer
type readCloser struct {
io.Reader
io.Closer
}

// BlockstoreCloser represents a blockstore that can also be closed. It combines the functionality
// of a dagstore.ReadBlockstore with that of an io.Closer.
type BlockstoreCloser struct {
dagstore.ReadBlockstore
io.Closer
}

func newReadCloser(ac cache.Accessor) io.ReadCloser {
return readCloser{
ac.Reader(),
ac,
}
}

func newBlockstore(store *Store, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}

func closeAndLog(name string, closer io.Closer) {
if err := closer.Close(); err != nil {
Expand Down

0 comments on commit a118401

Please sign in to comment.