Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: do not run bloom in case of ephemeral node #2953

Merged
merged 7 commits into from
Jul 10, 2016
Merged
27 changes: 19 additions & 8 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"sync/atomic"
)

// BloomCached returns Blockstore that caches Has requests using Bloom filter
// bloomCached returns Blockstore that caches Has requests using Bloom filter
// Size is size of bloom filter in bytes
func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(7))
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
if err != nil {
return nil, err
}
Expand All @@ -24,7 +24,7 @@ func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
}
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
bc.Invalidate()
go bc.Rebuild()
go bc.Rebuild(ctx)

return bc, nil
}
Expand Down Expand Up @@ -52,8 +52,7 @@ func (b *bloomcache) BloomActive() bool {
return atomic.LoadInt32(&b.active) != 0
}

func (b *bloomcache) Rebuild() {
ctx := context.TODO()
func (b *bloomcache) Rebuild(ctx context.Context) {
evt := log.EventBegin(ctx, "bloomcache.Rebuild")
defer evt.Done()

Expand All @@ -62,8 +61,19 @@ func (b *bloomcache) Rebuild() {
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
return
}
for key := range ch {
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
finish := false
for !finish {
select {
case key, ok := <-ch:
if ok {
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
} else {
finish = true
}
case <-ctx.Done():
log.Warning("Cache rebuild closed by context finishing.")
return
}
}
close(b.rebuildChan)
atomic.StoreInt32(&b.active, 1)
Expand Down Expand Up @@ -159,6 +169,7 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
if err == nil {
for _, block := range bs {
b.bloom.AddTS([]byte(block.Key()))
b.arc.Add(block.Key(), true)
}
}
return err
Expand Down
27 changes: 21 additions & 6 deletions blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,32 @@ import (

"github.com/ipfs/go-ipfs/blocks"

context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync"
)

func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
if ctx == nil {
ctx = context.TODO()
}
opts := DefaultCacheOpts()
bbs, err := CachedBlockstore(bs, ctx, opts)
if err == nil {
return bbs.(*bloomcache), nil
} else {
return nil, err
}
}

func TestReturnsErrorWhenSizeNegative(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
_, err := BloomCached(bs, 100, -1)
_, err := bloomCached(bs, context.TODO(), 100, 1, -1)
if err == nil {
t.Fail()
}
_, err = BloomCached(bs, -1, 100)
_, err = bloomCached(bs, context.TODO(), -1, 1, 100)
if err == nil {
t.Fail()
}
Expand All @@ -29,7 +43,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := BloomCached(bs, 1, 1)
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -53,7 +67,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
func TestElideDuplicateWrite(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := BloomCached(bs, 1, 1)
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,14 +87,15 @@ func TestHasIsBloomCached(t *testing.T) {
for i := 0; i < 1000; i++ {
bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
}
cachedbs, err := BloomCached(bs, 256*1024, 128)
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
cachedbs, err := testBloomCached(bs, ctx)
if err != nil {
t.Fatal(err)
}

select {
case <-cachedbs.rebuildChan:
case <-time.After(1 * time.Second):
case <-ctx.Done():
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
}

Expand Down
42 changes: 42 additions & 0 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package blockstore

import (
"errors"

context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

// Next to each option is it aproximate memory usage per unit
type CacheOpts struct {
HasBloomFilterSize int // 1 bit
HasBloomFilterHashes int // No size, 7 is usually best, consult bloom papers
HasARCCacheSize int // 32 bytes
}

func DefaultCacheOpts() CacheOpts {
return CacheOpts{
HasBloomFilterSize: 512 * 8 * 1024,
HasBloomFilterHashes: 7,
HasARCCacheSize: 64 * 1024,
}
}

func CachedBlockstore(bs GCBlockstore,
ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) {
cbs = bs

if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
opts.HasARCCacheSize < 0 {
return nil, errors.New("all options for cache need to be greater than zero")
}

if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 {
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
}
if opts.HasBloomFilterSize != 0 {
Copy link
Member

@jbenet jbenet Aug 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • > 0 -- reduces the failure cases (negative numbers)
  • okay fine (Lines 28-31 cover the negative cases)
  • but still :) safer

cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes,
opts.HasARCCacheSize)
}

return cbs, err
}
4 changes: 3 additions & 1 deletion cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ func daemonFunc(req cmds.Request, res cmds.Response) {

// Start assembling node config
ncfg := &core.BuildCfg{
Repo: repo,
Repo: repo,
Permament: true, // It is temporary way to signify that node is permament
//TODO(Kubuxu): refactor Online vs Offline by adding Permement vs Epthemeral
}
offline, _, _ := req.Option(offlineKwd).Bool()
ncfg.Online = !offline
Expand Down
11 changes: 10 additions & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type BuildCfg struct {
// If online is set, the node will have networking enabled
Online bool

// If permament then node should run more expensive processes
// that will improve performance in long run
Permament bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a good name. the node wont ever be Permanent. you're using it here to signify "NotEphemeral", so just use Ephemeral and set it to false.

Copy link
Member Author

@Kubuxu Kubuxu Aug 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't use Ephemeral as it would be hard to find all tests and places where node is created and set it there to true.

This part is on my list to refactor for the split to: Online, Offline and Ephemeral.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split to: Online, Offline and Ephemeral

those aren't good distinctions. an ephemeral node may be online and may be offline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we don't have Online Ephemeral, but very good point.


// If NilRepo is set, a repo backed by a nil datastore will be constructed
NilRepo bool

Expand Down Expand Up @@ -131,7 +135,12 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {

var err error
bs := bstore.NewBlockstore(n.Repo.Datastore())
n.Blockstore, err = bstore.BloomCached(bs, 256*1024, kSizeBlockstoreWriteCache)
opts := bstore.DefaultCacheOpts()
if !cfg.Permament {
opts.HasBloomFilterSize = 0
}

n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion exchange/bitswap/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))

bstore, err := blockstore.BloomCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), bloomSize, writeCacheElems)
bstore, err := blockstore.CachedBlockstore(blockstore.NewBlockstore(
ds_sync.MutexWrap(dstore)), ctx, blockstore.DefaultCacheOpts())
if err != nil {
panic(err.Error()) // FIXME perhaps change signature and return error.
}
Expand Down