Skip to content

Commit

Permalink
Merge pull request #33 from raulk/txndatastore
Browse files Browse the repository at this point in the history
Migrate to ds.TxnDatastore, optimisations++, cache changes, benchmarks
  • Loading branch information
raulk authored Sep 14, 2018
2 parents 1e12471 + b9b3939 commit 1d8bbeb
Show file tree
Hide file tree
Showing 10 changed files with 761 additions and 264 deletions.
517 changes: 338 additions & 179 deletions p2p/host/peerstore/pstoreds/addr_book.go

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions p2p/host/peerstore/pstoreds/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pstoreds

// cache abstracts all methods we access from ARCCache, to enable alternate
// implementations such as a no-op one.
type cache interface {
Get(key interface{}) (value interface{}, ok bool)
Add(key, value interface{})
Remove(key interface{})
Contains(key interface{}) bool
Peek(key interface{}) (value interface{}, ok bool)
}

// noopCache is a dummy implementation that's used when the cache is disabled.
type noopCache struct {
}

func (*noopCache) Get(key interface{}) (value interface{}, ok bool) {
return nil, false
}

func (*noopCache) Add(key, value interface{}) {
}

func (*noopCache) Remove(key interface{}) {
}

func (*noopCache) Contains(key interface{}) bool {
return false
}

func (*noopCache) Peek(key interface{}) (value interface{}, ok bool) {
return nil, false
}
170 changes: 151 additions & 19 deletions p2p/host/peerstore/pstoreds/ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,159 @@ package pstoreds

import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"

bd "github.com/dgraph-io/badger"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-ds-badger"
badger "github.com/ipfs/go-ds-badger"

pstore "github.com/libp2p/go-libp2p-peerstore"
pt "github.com/libp2p/go-libp2p-peerstore/test"
)

func setupBadgerDatastore(t testing.TB) (ds.Batching, func()) {
func BenchmarkBaselineBadgerDatastorePutEntry(b *testing.B) {
bds, closer := badgerStore(b)
defer closer()

b.ResetTimer()
for i := 0; i < b.N; i++ {
txn := bds.NewTransaction(false)

key := ds.RawKey(fmt.Sprintf("/key/%d", i))
txn.Put(key, []byte(fmt.Sprintf("/value/%d", i)))

txn.Commit()
txn.Discard()
}
}

func BenchmarkBaselineBadgerDatastoreGetEntry(b *testing.B) {
bds, closer := badgerStore(b)
defer closer()

txn := bds.NewTransaction(false)
keys := make([]ds.Key, 1000)
for i := 0; i < 1000; i++ {
key := ds.RawKey(fmt.Sprintf("/key/%d", i))
txn.Put(key, []byte(fmt.Sprintf("/value/%d", i)))
keys[i] = key
}
if err := txn.Commit(); err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
txn := bds.NewTransaction(true)
if _, err := txn.Get(keys[i%1000]); err != nil {
b.Fatal(err)
}
txn.Discard()
}
}

func BenchmarkBaselineBadgerDirectPutEntry(b *testing.B) {
opts := bd.DefaultOptions

dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
if err != nil {
b.Fatal(err)
}

opts.Dir = dataPath
opts.ValueDir = dataPath
opts.SyncWrites = false

db, err := bd.Open(opts)
if err != nil {
b.Fatal(err)
}

defer db.Close()

b.ResetTimer()
for i := 0; i < b.N; i++ {
txn := db.NewTransaction(true)
txn.Set([]byte(fmt.Sprintf("/key/%d", i)), []byte(fmt.Sprintf("/value/%d", i)))
txn.Commit(nil)
}
}

func BenchmarkBaselineBadgerDirectGetEntry(b *testing.B) {
opts := bd.DefaultOptions

dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
if err != nil {
b.Fatal(err)
}

opts.Dir = dataPath
opts.ValueDir = dataPath

db, err := bd.Open(opts)
if err != nil {
b.Fatal(err)
}

defer db.Close()

txn := db.NewTransaction(true)
for i := 0; i < 1000; i++ {
txn.Set([]byte(fmt.Sprintf("/key/%d", i)), []byte(fmt.Sprintf("/value/%d", i)))
}
txn.Commit(nil)

b.ResetTimer()
for i := 0; i < b.N; i++ {
txn := db.NewTransaction(false)
txn.Get([]byte(fmt.Sprintf("/key/%d", i%1000)))
txn.Discard()
}
}

func TestBadgerDsPeerstore(t *testing.T) {
pt.TestPeerstore(t, peerstoreFactory(t, DefaultOpts()))
}

func TestBadgerDsAddrBook(t *testing.T) {
t.Run("Cacheful", func(t *testing.T) {
t.Parallel()

opts := DefaultOpts()
opts.TTLInterval = 100 * time.Microsecond
opts.CacheSize = 1024

pt.TestAddrBook(t, addressBookFactory(t, opts))
})

t.Run("Cacheless", func(t *testing.T) {
t.Parallel()

opts := DefaultOpts()
opts.TTLInterval = 100 * time.Microsecond
opts.CacheSize = 0

pt.TestAddrBook(t, addressBookFactory(t, opts))
})
}

func BenchmarkBadgerDsPeerstore(b *testing.B) {
caching := DefaultOpts()
caching.CacheSize = 1024

cacheless := DefaultOpts()
cacheless.CacheSize = 0

pt.BenchmarkPeerstore(b, peerstoreFactory(b, caching), "Caching")
pt.BenchmarkPeerstore(b, peerstoreFactory(b, cacheless), "Cacheless")
}

func badgerStore(t testing.TB) (ds.TxnDatastore, func()) {
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
if err != nil {
t.Fatal(err)
Expand All @@ -30,11 +170,11 @@ func setupBadgerDatastore(t testing.TB) (ds.Batching, func()) {
return ds, closer
}

func newPeerstoreFactory(tb testing.TB) pt.PeerstoreFactory {
func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory {
return func() (pstore.Peerstore, func()) {
ds, closeFunc := setupBadgerDatastore(tb)
ds, closeFunc := badgerStore(tb)

ps, err := NewPeerstore(context.Background(), ds)
ps, err := NewPeerstore(context.Background(), ds, opts)
if err != nil {
tb.Fatal(err)
}
Expand All @@ -43,27 +183,19 @@ func newPeerstoreFactory(tb testing.TB) pt.PeerstoreFactory {
}
}

func TestBadgerDsPeerstore(t *testing.T) {
pt.TestPeerstore(t, newPeerstoreFactory(t))
}
func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory {
return func() (pstore.AddrBook, func()) {
ds, closeDB := badgerStore(tb)

func TestBadgerDsAddrBook(t *testing.T) {
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
ds, closeDB := setupBadgerDatastore(t)

mgr, err := NewAddrBook(context.Background(), ds, 100*time.Microsecond)
mgr, err := NewAddrBook(context.Background(), ds, opts)
if err != nil {
t.Fatal(err)
tb.Fatal(err)
}

closeFunc := func() {
mgr.Stop()
closeDB()
}
return mgr, closeFunc
})
}

func BenchmarkBadgerDsPeerstore(b *testing.B) {
pt.BenchmarkPeerstore(b, newPeerstoreFactory(b))
}
}
29 changes: 27 additions & 2 deletions p2p/host/peerstore/pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,34 @@ import (
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
)

// Configuration object for the peerstore.
type Options struct {
// The size of the in-memory cache. A value of 0 or lower disables the cache.
CacheSize uint

// Sweep interval to expire entries, only used when TTL is *not* natively managed
// by the underlying datastore.
TTLInterval time.Duration

// Number of times to retry transactional writes.
WriteRetries uint
}

// DefaultOpts returns the default options for a persistent peerstore:
// * Cache size: 1024
// * TTL sweep interval: 1 second
// * WriteRetries: 5
func DefaultOpts() Options {
return Options{
CacheSize: 1024,
TTLInterval: time.Second,
WriteRetries: 5,
}
}

// NewPeerstore creates a peerstore backed by the provided persistent datastore.
func NewPeerstore(ctx context.Context, ds ds.Batching) (pstore.Peerstore, error) {
addrBook, err := NewAddrBook(ctx, ds, time.Second)
func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pstore.Peerstore, error) {
addrBook, err := NewAddrBook(ctx, store, opts)
if err != nil {
return nil, err
}
Expand Down
4 changes: 1 addition & 3 deletions p2p/host/peerstore/pstoremem/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,8 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t
}

exp := time.Now().Add(newTTL)
// TODO: RK - Shorthand.
for i := range addrs {
aexp := &addrs[i]
if oldTTL == aexp.TTL {
if aexp := &addrs[i]; oldTTL == aexp.TTL {
aexp.TTL = newTTL
aexp.Expires = exp
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/peerstore/pstoremem/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ func TestInMemoryKeyBook(t *testing.T) {
func BenchmarkInMemoryPeerstore(b *testing.B) {
pt.BenchmarkPeerstore(b, func() (pstore.Peerstore, func()) {
return NewPeerstore(), nil
})
}, "InMem")
}
Loading

0 comments on commit 1d8bbeb

Please sign in to comment.