Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
add View() to all the various blockstores. (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored Nov 10, 2020
1 parent 9283acf commit 3e8fd89
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 42 deletions.
121 changes: 81 additions & 40 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,68 +12,51 @@ import (
type cacheHave bool
type cacheSize int

// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) for
// block Cids. This provides block access-time improvements, allowing
// to short-cut many searches without query-ing the underlying datastore.
// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) that
// does not store the actual blocks, just metadata about them: existence and
// size. This provides block access-time improvements, allowing
// to short-cut many searches without querying the underlying datastore.
type arccache struct {
arc *lru.TwoQueueCache
cache *lru.TwoQueueCache
blockstore Blockstore
viewer Viewer

hits metrics.Counter
total metrics.Counter
}

var _ Blockstore = (*arccache)(nil)
var _ Viewer = (*arccache)(nil)

func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) {
arc, err := lru.New2Q(lruSize)
cache, err := lru.New2Q(lruSize)
if err != nil {
return nil, err
}
c := &arccache{arc: arc, blockstore: bs}
c := &arccache{cache: cache, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()

if v, ok := bs.(Viewer); ok {
c.viewer = v
}
return c, nil
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
if has, _, ok := b.hasCached(k); ok && !has {
if has, _, ok := b.queryCache(k); ok && !has {
return nil
}

b.arc.Remove(k) // Invalidate cache before deleting.
b.cache.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
if err == nil {
b.cacheHave(k, false)
}
return err
}

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) {
b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, -1, false
}

h, ok := b.arc.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
return bool(h), -1, true
case cacheSize:
return true, int(h), true
}
}
return false, -1, false
}

func (b *arccache) Has(k cid.Cid) (bool, error) {
if has, _, ok := b.hasCached(k); ok {
if has, _, ok := b.queryCache(k); ok {
return has, nil
}
has, err := b.blockstore.Has(k)
Expand All @@ -85,7 +68,7 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
}

func (b *arccache) GetSize(k cid.Cid) (int, error) {
if has, blockSize, ok := b.hasCached(k); ok {
if has, blockSize, ok := b.queryCache(k); ok {
if !has {
// don't have it, return
return -1, ErrNotFound
Expand All @@ -105,13 +88,38 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
return blockSize, err
}

func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
// shortcircuit and fall back to Get if the underlying store
// doesn't support Viewer.
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}

if !k.Defined() {
log.Error("undefined cid in arc cache")
return ErrNotFound
}

if has, _, ok := b.queryCache(k); ok && !has {
// short circuit if the cache deterministically tells us the item
// doesn't exist.
return ErrNotFound
}

return b.viewer.View(k, callback)
}

func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in arc cache")
return nil, ErrNotFound
}

if has, _, ok := b.hasCached(k); ok && !has {
if has, _, ok := b.queryCache(k); ok && !has {
return nil, ErrNotFound
}

Expand All @@ -125,7 +133,7 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
}

func (b *arccache) Put(bl blocks.Block) error {
if has, _, ok := b.hasCached(bl.Cid()); ok && has {
if has, _, ok := b.queryCache(bl.Cid()); ok && has {
return nil
}

Expand All @@ -141,7 +149,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
}
}
Expand All @@ -160,11 +168,44 @@ func (b *arccache) HashOnRead(enabled bool) {
}

func (b *arccache) cacheHave(c cid.Cid, have bool) {
b.arc.Add(string(c.Hash()), cacheHave(have))
b.cache.Add(string(c.Hash()), cacheHave(have))
}

func (b *arccache) cacheSize(c cid.Cid, blockSize int) {
b.arc.Add(string(c.Hash()), cacheSize(blockSize))
b.cache.Add(string(c.Hash()), cacheSize(blockSize))
}

// queryCache checks if the CID is in the cache. If so, it returns:
//
// * exists (bool): whether the CID is known to exist or not.
// * size (int): the size if cached, or -1 if not cached.
// * ok (bool): whether present in the cache.
//
// When ok is false, the answer in inconclusive and the caller must ignore the
// other two return values. Querying the underying store is necessary.
//
// When ok is true, exists carries the correct answer, and size carries the
// size, if known, or -1 if not.
func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) {
b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, -1, false
}

h, ok := b.cache.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
return bool(h), -1, true
case cacheSize:
return true, int(h), true
}
}
return false, -1, false
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
Expand Down
15 changes: 15 additions & 0 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ type Blockstore interface {
HashOnRead(enabled bool)
}

// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
// Callers of View must not mutate or retain the byte slice, as it could be
// an mmapped memory region, or a pooled byte buffer.
//
// View is especially suitable for deserialising in place.
//
// The callback will only be called iff the query operation is successful (and
// the block is found); otherwise, the error will be propagated. Errors returned
// by the callback will be propagated as well.
type Viewer interface {
View(cid cid.Cid, callback func([]byte) error) error
}

// GCLocker abstract functionality to lock a blockstore when performing
// garbage-collection operations.
type GCLocker interface {
Expand Down
22 changes: 22 additions & 0 deletions bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (
"Total number of requests to bloom cache").Counter(),
buildChan: make(chan struct{}),
}
if v, ok := bs.(Viewer); ok {
bc.viewer = v
}
go func() {
err := bc.build(ctx)
if err != nil {
Expand Down Expand Up @@ -67,12 +70,16 @@ type bloomcache struct {

buildChan chan struct{}
blockstore Blockstore
viewer Viewer

// Statistics
hits metrics.Counter
total metrics.Counter
}

var _ Blockstore = (*bloomcache)(nil)
var _ Viewer = (*bloomcache)(nil)

func (b *bloomcache) BloomActive() bool {
return atomic.LoadInt32(&b.active) != 0
}
Expand Down Expand Up @@ -151,6 +158,21 @@ func (b *bloomcache) GetSize(k cid.Cid) (int, error) {
return b.blockstore.GetSize(k)
}

func (b *bloomcache) View(k cid.Cid, callback func([]byte) error) error {
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}

if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}
return b.viewer.View(k, callback)
}

func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
Expand Down
27 changes: 25 additions & 2 deletions idstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@ import (

// idstore wraps a BlockStore to add support for identity hashes
type idstore struct {
bs Blockstore
bs Blockstore
viewer Viewer
}

var _ Blockstore = (*idstore)(nil)
var _ Viewer = (*idstore)(nil)

func NewIdStore(bs Blockstore) Blockstore {
return &idstore{bs}
ids := &idstore{bs: bs}
if v, ok := bs.(Viewer); ok {
ids.viewer = v
}
return ids
}

func extractContents(k cid.Cid) (bool, []byte) {
Expand Down Expand Up @@ -46,6 +54,21 @@ func (b *idstore) Has(k cid.Cid) (bool, error) {
return b.bs.Has(k)
}

func (b *idstore) View(k cid.Cid, callback func([]byte) error) error {
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}
isId, bdata := extractContents(k)
if isId {
return callback(bdata)
}
return b.viewer.View(k, callback)
}

func (b *idstore) GetSize(k cid.Cid) (int, error) {
isId, bdata := extractContents(k)
if isId {
Expand Down

0 comments on commit 3e8fd89

Please sign in to comment.