diff --git a/p2p/host/peerstore/pstoreds/addr_book.go b/p2p/host/peerstore/pstoreds/addr_book.go index 14c006a048..f338cb449c 100644 --- a/p2p/host/peerstore/pstoreds/addr_book.go +++ b/p2p/host/peerstore/pstoreds/addr_book.go @@ -17,36 +17,43 @@ import ( pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" ) -var log = logging.Logger("peerstore/ds") - -// Number of times to retry transactional writes -var dsWriteRetries = 5 +var ( + log = logging.Logger("peerstore/ds") +) var _ pstore.AddrBook = (*dsAddrBook)(nil) // dsAddrBook is an address book backed by a Datastore with both an // in-memory TTL manager and an in-memory address stream manager. type dsAddrBook struct { - cache *lru.ARCCache - ds ds.Batching - ttlManager *ttlmanager - subsManager *pstoremem.AddrSubManager + cache cache + ds ds.TxnDatastore + ttlManager *ttlManager + subsManager *pstoremem.AddrSubManager + writeRetries int } // NewAddrBook initializes a new address book given a // Datastore instance, a context for managing the TTL manager, // and the interval at which the TTL manager should sweep the Datastore. -func NewAddrBook(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) (*dsAddrBook, error) { - cache, err := lru.NewARC(1024) - if err != nil { - return nil, err +func NewAddrBook(ctx context.Context, ds ds.TxnDatastore, opts Options) (*dsAddrBook, error) { + var ( + cache cache = &noopCache{} + err error + ) + + if opts.CacheSize > 0 { + if cache, err = lru.NewARC(int(opts.CacheSize)); err != nil { + return nil, err + } } mgr := &dsAddrBook{ - cache: cache, - ds: ds, - ttlManager: newTTLManager(ctx, ds, cache, ttlInterval), - subsManager: pstoremem.NewAddrSubManager(), + cache: cache, + ds: ds, + ttlManager: newTTLManager(ctx, ds, &cache, opts.TTLInterval), + subsManager: pstoremem.NewAddrSubManager(), + writeRetries: int(opts.WriteRetries), } return mgr, nil } @@ -56,17 +63,29 @@ func (mgr *dsAddrBook) Stop() { mgr.ttlManager.cancel() } -func peerAddressKey(p *peer.ID, addr *ma.Multiaddr) (ds.Key, error) { - hash, err := mh.Sum((*addr).Bytes(), mh.MURMUR3, -1) - if err != nil { - return ds.Key{}, nil +func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) { + var ( + keys = make([]ds.Key, len(addrs)) + clean = make([]ma.Multiaddr, len(addrs)) + parentKey = ds.NewKey(peer.IDB58Encode(p)) + i = 0 + ) + + for _, addr := range addrs { + if addr == nil { + continue + } + + hash, err := mh.Sum((addr).Bytes(), mh.MURMUR3, -1) + if err != nil { + return nil, nil, err + } + keys[i] = parentKey.ChildString(hash.B58String()) + clean[i] = addr + i++ } - return ds.NewKey(peer.IDB58Encode(*p)).ChildString(hash.B58String()), nil -} -func peerIDFromKey(key ds.Key) (peer.ID, error) { - idstring := key.Parent().Name() - return peer.IDB58Decode(idstring) + return keys[:i], clean[:i], nil } // AddAddr will add a new address if it's not already in the AddrBook. @@ -79,141 +98,208 @@ func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati if ttl <= 0 { return } - - mgr.setAddrs(p, addrs, ttl, true) + mgr.setAddrs(p, addrs, ttl, false) } // SetAddr will add or update the TTL of an address in the AddrBook. func (mgr *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { - mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl) + addrs := []ma.Multiaddr{addr} + mgr.SetAddrs(p, addrs, ttl) } // SetAddrs will add or update the TTLs of addresses in the AddrBook. func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - mgr.setAddrs(p, addrs, ttl, false) + if ttl <= 0 { + mgr.deleteAddrs(p, addrs) + return + } + mgr.setAddrs(p, addrs, ttl, true) } -func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) { - for i := 0; i < dsWriteRetries; i++ { - // keys to add to the TTL manager - var keys []ds.Key - batch, err := mgr.ds.Batch() - if err != nil { - log.Error(err) - return +func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error { + // Keys and cleaned up addresses. + keys, addrs, err := keysAndAddrs(p, addrs) + if err != nil { + return err + } + + mgr.cache.Remove(p.Pretty()) + // Attempt transactional KV deletion. + for i := 0; i < mgr.writeRetries; i++ { + if err = mgr.dbDelete(keys); err == nil { + break } + log.Errorf("failed to delete addresses for peer %s: %s\n", p.Pretty(), err) + } - for _, addr := range addrs { - if addr == nil { - continue - } + if err != nil { + log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err) + return err + } - key, err := peerAddressKey(&p, &addr) - if err != nil { - log.Error(err) - continue - } - keys = append(keys, key) + mgr.ttlManager.deleteTTLs(keys) + return nil +} - if ttl <= 0 { - if err := batch.Delete(key); err != nil { - log.Error(err) - } else { - mgr.cache.Remove(key) - } - continue - } +func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, ttlReset bool) error { + // Keys and cleaned up addresses. + keys, addrs, err := keysAndAddrs(p, addrs) + if err != nil { + return err + } - has := mgr.cache.Contains(key) - if !has { - has, err = mgr.ds.Has(key) - } - if err != nil || !has { - mgr.subsManager.BroadcastAddr(p, addr) - } + mgr.cache.Remove(p.Pretty()) + // Attempt transactional KV insertion. + var existed []bool + for i := 0; i < mgr.writeRetries; i++ { + if existed, err = mgr.dbInsert(keys, addrs); err == nil { + break + } + log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err) + } - // Allows us to support AddAddr and SetAddr in one function - if !has { - if err := batch.Put(key, addr.Bytes()); err != nil { - log.Error(err) - } else { - mgr.cache.Add(key, addr.Bytes()) - } - } + if err != nil { + log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err) + return err + } + + // Update was successful, so broadcast event only for new addresses. + for i, _ := range keys { + if !existed[i] { + mgr.subsManager.BroadcastAddr(p, addrs[i]) + } + } + + // Force update TTLs only if TTL reset was requested; otherwise + // insert the appropriate TTL entries if they don't already exist. + if ttlReset { + mgr.ttlManager.setTTLs(keys, ttl) + } else { + mgr.ttlManager.insertOrExtendTTLs(keys, ttl) + } + + return nil +} + +// dbInsert performs a transactional insert of the provided keys and values. +func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr) ([]bool, error) { + var ( + err error + existed = make([]bool, len(keys)) + ) + + txn := mgr.ds.NewTransaction(false) + defer txn.Discard() + + for i, key := range keys { + // Check if the key existed previously. + if existed[i], err = txn.Has(key); err != nil { + log.Errorf("transaction failed and aborted while checking key existence: %s, cause: %v", key.String(), err) + return nil, err } - if err := batch.Commit(); err != nil { - log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err) + + // The key embeds a hash of the value, so if it existed, we can safely skip the insert. + if existed[i] { continue } - mgr.ttlManager.setTTLs(keys, ttl, add) - return + + // Attempt to add the key. + if err = txn.Put(key, addrs[i].Bytes()); err != nil { + log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err) + return nil, err + } } - log.Errorf("failed to avoid write conflict for peer %s after %d retries\n", p.Pretty(), dsWriteRetries) + + if err = txn.Commit(); err != nil { + log.Errorf("failed to commit transaction when setting keys, cause: %v", err) + return nil, err + } + + return existed, nil } // UpdateAddrs will update any addresses for a given peer and TTL combination to // have a new TTL. func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { prefix := ds.NewKey(p.Pretty()) - mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL) + mgr.ttlManager.adjustTTLs(prefix, oldTTL, newTTL) } -// Addrs Returns all of the non-expired addresses for a given peer. +// Addrs returns all of the non-expired addresses for a given peer. func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { - prefix := ds.NewKey(p.Pretty()) - q := query.Query{Prefix: prefix.String(), KeysOnly: true} - results, err := mgr.ds.Query(q) - if err != nil { + var ( + prefix = ds.NewKey(p.Pretty()) + q = query.Query{Prefix: prefix.String(), KeysOnly: false} + results query.Results + err error + ) + + // Check the cache. + if entry, ok := mgr.cache.Get(p.Pretty()); ok { + e := entry.([]ma.Multiaddr) + addrs := make([]ma.Multiaddr, len(e)) + copy(addrs, e) + return addrs + } + + txn := mgr.ds.NewTransaction(true) + defer txn.Discard() + + if results, err = txn.Query(q); err != nil { log.Error(err) return nil } + defer results.Close() var addrs []ma.Multiaddr for result := range results.Next() { - key := ds.RawKey(result.Key) - var addri interface{} - addri, ok := mgr.cache.Get(key) - if !ok { - addri, err = mgr.ds.Get(key) - if err != nil { - log.Error(err) - continue - } - } - addrbytes := addri.([]byte) - addr, err := ma.NewMultiaddrBytes(addrbytes) - if err != nil { - log.Error(err) - continue + if addr, err := ma.NewMultiaddrBytes(result.Value); err == nil { + addrs = append(addrs, addr) } - addrs = append(addrs, addr) } + // Store a copy in the cache. + addrsCpy := make([]ma.Multiaddr, len(addrs)) + copy(addrsCpy, addrs) + mgr.cache.Add(p.Pretty(), addrsCpy) + return addrs } // Peers returns all of the peer IDs for which the AddrBook has addresses. func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice { - q := query.Query{KeysOnly: true} - results, err := mgr.ds.Query(q) - if err != nil { + var ( + q = query.Query{KeysOnly: true} + results query.Results + err error + ) + + txn := mgr.ds.NewTransaction(true) + defer txn.Discard() + + if results, err = txn.Query(q); err != nil { log.Error(err) return peer.IDSlice{} } - idset := make(map[peer.ID]struct{}) + defer results.Close() + + idset := make(map[string]struct{}) for result := range results.Next() { key := ds.RawKey(result.Key) - id, err := peerIDFromKey(key) - if err != nil { - continue - } - idset[id] = struct{}{} + idset[key.Parent().Name()] = struct{}{} } - ids := make(peer.IDSlice, 0, len(idset)) + if len(idset) == 0 { + return peer.IDSlice{} + } + + ids := make(peer.IDSlice, len(idset)) + i := 0 for id := range idset { - ids = append(ids, id) + pid, _ := peer.IDB58Decode(id) + ids[i] = pid + i++ } return ids } @@ -227,70 +313,128 @@ func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mult // ClearAddrs will delete all known addresses for a peer ID. func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { - prefix := ds.NewKey(p.Pretty()) - for i := 0; i < dsWriteRetries; i++ { - q := query.Query{Prefix: prefix.String(), KeysOnly: true} - results, err := mgr.ds.Query(q) - if err != nil { - log.Error(err) - return + var ( + err error + prefix = ds.NewKey(p.Pretty()) + deleteFn func() error + ) + + if e, ok := mgr.cache.Peek(p.Pretty()); ok { + mgr.cache.Remove(p.Pretty()) + keys, _, _ := keysAndAddrs(p, e.([]ma.Multiaddr)) + deleteFn = func() error { + return mgr.dbDelete(keys) } - batch, err := mgr.ds.Batch() - if err != nil { - log.Error(err) - return + } else { + deleteFn = func() error { + _, err := mgr.dbDeleteIter(prefix) + return err } + } - for result := range results.Next() { - key := ds.NewKey(result.Key) - err := batch.Delete(key) - if err != nil { - // From inspectin badger, errors here signify a problem with - // the transaction as a whole, so we can log and abort. - log.Error(err) - return - } - mgr.cache.Remove(key) + // Attempt transactional KV deletion. + for i := 0; i < mgr.writeRetries; i++ { + if err = deleteFn(); err == nil { + break } - if err = batch.Commit(); err != nil { - log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err) - continue + log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err) + } + + if err != nil { + log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), mgr.writeRetries) + } + + // Perform housekeeping. + mgr.ttlManager.clear(prefix) +} + +// dbDelete transactionally deletes the provided keys. +func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error { + txn := mgr.ds.NewTransaction(false) + defer txn.Discard() + + for _, key := range keys { + if err := txn.Delete(key); err != nil { + log.Errorf("failed to delete key: %s, cause: %v", key.String(), err) + return err } - mgr.ttlManager.clear(ds.NewKey(p.Pretty())) - return } - log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries) + + if err := txn.Commit(); err != nil { + log.Errorf("failed to commit transaction when deleting keys, cause: %v", err) + return err + } + + return nil } -type ttlentry struct { +// dbDeleteIter removes all entries whose keys are prefixed with the argument. +// it returns a slice of the removed keys in case it's needed +func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) { + q := query.Query{Prefix: prefix.String(), KeysOnly: true} + + txn := mgr.ds.NewTransaction(false) + defer txn.Discard() + + results, err := txn.Query(q) + if err != nil { + log.Errorf("failed to fetch all keys prefixed with: %s, cause: %v", prefix.String(), err) + return nil, err + } + + var keys []ds.Key + for result := range results.Next() { + key := ds.RawKey(result.Key) + keys = append(keys, key) + + if err = txn.Delete(key); err != nil { + log.Errorf("failed to delete key: %s, cause: %v", key.String(), err) + return nil, err + } + } + + if err := results.Close(); err != nil { + log.Errorf("failed to close cursor, cause: %v", err) + return nil, err + } + + if err = txn.Commit(); err != nil { + log.Errorf("failed to commit transaction when deleting keys, cause: %v", err) + return nil, err + } + + return keys, nil +} + +type ttlEntry struct { TTL time.Duration ExpiresAt time.Time } -type ttlmanager struct { +type ttlManager struct { sync.RWMutex - entries map[ds.Key]*ttlentry + entries map[ds.Key]*ttlEntry ctx context.Context cancel context.CancelFunc ticker *time.Ticker - ds ds.Batching - cache *lru.ARCCache + ds ds.TxnDatastore + cache cache } -func newTTLManager(parent context.Context, d ds.Datastore, c *lru.ARCCache, tick time.Duration) *ttlmanager { +func newTTLManager(parent context.Context, d ds.Datastore, c *cache, tick time.Duration) *ttlManager { ctx, cancel := context.WithCancel(parent) - batching, ok := d.(ds.Batching) + txnDs, ok := d.(ds.TxnDatastore) if !ok { - panic("must construct ttlmanager with batching datastore") + panic("must construct ttlManager with transactional datastore") } - mgr := &ttlmanager{ - entries: make(map[ds.Key]*ttlentry), + mgr := &ttlManager{ + entries: make(map[ds.Key]*ttlEntry), ctx: ctx, cancel: cancel, ticker: time.NewTicker(tick), - ds: batching, - cache: c, + ds: txnDs, + cache: *c, } go func() { @@ -309,57 +453,72 @@ func newTTLManager(parent context.Context, d ds.Datastore, c *lru.ARCCache, tick } // To be called by TTL manager's coroutine only. -func (mgr *ttlmanager) tick() { +func (mgr *ttlManager) tick() { mgr.Lock() defer mgr.Unlock() now := time.Now() - batch, err := mgr.ds.Batch() - if err != nil { - log.Error(err) + var toDel []ds.Key + for key, entry := range mgr.entries { + if entry.ExpiresAt.After(now) { + continue + } + toDel = append(toDel, key) + } + + if len(toDel) == 0 { return } - for key, entry := range mgr.entries { - if entry.ExpiresAt.Before(now) { - if err := batch.Delete(key); err != nil { - log.Error(err) - } else { - mgr.cache.Remove(key) - } - delete(mgr.entries, key) + + txn := mgr.ds.NewTransaction(false) + defer txn.Discard() + + for _, key := range toDel { + if err := txn.Delete(key); err != nil { + log.Error("failed to delete TTL key: %v, cause: %v", key.String(), err) + break } + mgr.cache.Remove(key.Parent().Name()) + delete(mgr.entries, key) } - err = batch.Commit() - if err != nil { - log.Error(err) + + if err := txn.Commit(); err != nil { + log.Error("failed to commit TTL deletion, cause: %v", err) + } +} + +func (mgr *ttlManager) deleteTTLs(keys []ds.Key) { + mgr.Lock() + defer mgr.Unlock() + + for _, key := range keys { + delete(mgr.entries, key) } } -func (mgr *ttlmanager) setTTLs(keys []ds.Key, ttl time.Duration, add bool) { +func (mgr *ttlManager) insertOrExtendTTLs(keys []ds.Key, ttl time.Duration) { mgr.Lock() defer mgr.Unlock() expiration := time.Now().Add(ttl) for _, key := range keys { - update := true - if add { - if entry, ok := mgr.entries[key]; ok { - if entry.ExpiresAt.After(expiration) { - update = false - } - } - } - if update { - if ttl <= 0 { - delete(mgr.entries, key) - } else { - mgr.entries[key] = &ttlentry{TTL: ttl, ExpiresAt: expiration} - } + if entry, ok := mgr.entries[key]; !ok || (ok && entry.ExpiresAt.Before(expiration)) { + mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration} } } } -func (mgr *ttlmanager) updateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) { +func (mgr *ttlManager) setTTLs(keys []ds.Key, ttl time.Duration) { + mgr.Lock() + defer mgr.Unlock() + + expiration := time.Now().Add(ttl) + for _, key := range keys { + mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration} + } +} + +func (mgr *ttlManager) adjustTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) { mgr.Lock() defer mgr.Unlock() @@ -374,7 +533,7 @@ func (mgr *ttlmanager) updateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) { } } -func (mgr *ttlmanager) clear(prefix ds.Key) { +func (mgr *ttlManager) clear(prefix ds.Key) { mgr.Lock() defer mgr.Unlock() diff --git a/p2p/host/peerstore/pstoreds/cache.go b/p2p/host/peerstore/pstoreds/cache.go new file mode 100644 index 0000000000..2e20ae6294 --- /dev/null +++ b/p2p/host/peerstore/pstoreds/cache.go @@ -0,0 +1,33 @@ +package pstoreds + +// cache abstracts all methods we access from ARCCache, to enable alternate +// implementations such as a no-op one. +type cache interface { + Get(key interface{}) (value interface{}, ok bool) + Add(key, value interface{}) + Remove(key interface{}) + Contains(key interface{}) bool + Peek(key interface{}) (value interface{}, ok bool) +} + +// noopCache is a dummy implementation that's used when the cache is disabled. +type noopCache struct { +} + +func (*noopCache) Get(key interface{}) (value interface{}, ok bool) { + return nil, false +} + +func (*noopCache) Add(key, value interface{}) { +} + +func (*noopCache) Remove(key interface{}) { +} + +func (*noopCache) Contains(key interface{}) bool { + return false +} + +func (*noopCache) Peek(key interface{}) (value interface{}, ok bool) { + return nil, false +} diff --git a/p2p/host/peerstore/pstoreds/ds_test.go b/p2p/host/peerstore/pstoreds/ds_test.go index 34b5d33b83..20bef6a9c3 100644 --- a/p2p/host/peerstore/pstoreds/ds_test.go +++ b/p2p/host/peerstore/pstoreds/ds_test.go @@ -2,19 +2,159 @@ package pstoreds import ( "context" + "fmt" "io/ioutil" "os" "testing" "time" + bd "github.com/dgraph-io/badger" + ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-ds-badger" + badger "github.com/ipfs/go-ds-badger" pstore "github.com/libp2p/go-libp2p-peerstore" pt "github.com/libp2p/go-libp2p-peerstore/test" ) -func setupBadgerDatastore(t testing.TB) (ds.Batching, func()) { +func BenchmarkBaselineBadgerDatastorePutEntry(b *testing.B) { + bds, closer := badgerStore(b) + defer closer() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + txn := bds.NewTransaction(false) + + key := ds.RawKey(fmt.Sprintf("/key/%d", i)) + txn.Put(key, []byte(fmt.Sprintf("/value/%d", i))) + + txn.Commit() + txn.Discard() + } +} + +func BenchmarkBaselineBadgerDatastoreGetEntry(b *testing.B) { + bds, closer := badgerStore(b) + defer closer() + + txn := bds.NewTransaction(false) + keys := make([]ds.Key, 1000) + for i := 0; i < 1000; i++ { + key := ds.RawKey(fmt.Sprintf("/key/%d", i)) + txn.Put(key, []byte(fmt.Sprintf("/value/%d", i))) + keys[i] = key + } + if err := txn.Commit(); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + txn := bds.NewTransaction(true) + if _, err := txn.Get(keys[i%1000]); err != nil { + b.Fatal(err) + } + txn.Discard() + } +} + +func BenchmarkBaselineBadgerDirectPutEntry(b *testing.B) { + opts := bd.DefaultOptions + + dataPath, err := ioutil.TempDir(os.TempDir(), "badger") + if err != nil { + b.Fatal(err) + } + + opts.Dir = dataPath + opts.ValueDir = dataPath + opts.SyncWrites = false + + db, err := bd.Open(opts) + if err != nil { + b.Fatal(err) + } + + defer db.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + txn := db.NewTransaction(true) + txn.Set([]byte(fmt.Sprintf("/key/%d", i)), []byte(fmt.Sprintf("/value/%d", i))) + txn.Commit(nil) + } +} + +func BenchmarkBaselineBadgerDirectGetEntry(b *testing.B) { + opts := bd.DefaultOptions + + dataPath, err := ioutil.TempDir(os.TempDir(), "badger") + if err != nil { + b.Fatal(err) + } + + opts.Dir = dataPath + opts.ValueDir = dataPath + + db, err := bd.Open(opts) + if err != nil { + b.Fatal(err) + } + + defer db.Close() + + txn := db.NewTransaction(true) + for i := 0; i < 1000; i++ { + txn.Set([]byte(fmt.Sprintf("/key/%d", i)), []byte(fmt.Sprintf("/value/%d", i))) + } + txn.Commit(nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + txn := db.NewTransaction(false) + txn.Get([]byte(fmt.Sprintf("/key/%d", i%1000))) + txn.Discard() + } +} + +func TestBadgerDsPeerstore(t *testing.T) { + pt.TestPeerstore(t, peerstoreFactory(t, DefaultOpts())) +} + +func TestBadgerDsAddrBook(t *testing.T) { + t.Run("Cacheful", func(t *testing.T) { + t.Parallel() + + opts := DefaultOpts() + opts.TTLInterval = 100 * time.Microsecond + opts.CacheSize = 1024 + + pt.TestAddrBook(t, addressBookFactory(t, opts)) + }) + + t.Run("Cacheless", func(t *testing.T) { + t.Parallel() + + opts := DefaultOpts() + opts.TTLInterval = 100 * time.Microsecond + opts.CacheSize = 0 + + pt.TestAddrBook(t, addressBookFactory(t, opts)) + }) +} + +func BenchmarkBadgerDsPeerstore(b *testing.B) { + caching := DefaultOpts() + caching.CacheSize = 1024 + + cacheless := DefaultOpts() + cacheless.CacheSize = 0 + + pt.BenchmarkPeerstore(b, peerstoreFactory(b, caching), "Caching") + pt.BenchmarkPeerstore(b, peerstoreFactory(b, cacheless), "Cacheless") +} + +func badgerStore(t testing.TB) (ds.TxnDatastore, func()) { dataPath, err := ioutil.TempDir(os.TempDir(), "badger") if err != nil { t.Fatal(err) @@ -30,11 +170,11 @@ func setupBadgerDatastore(t testing.TB) (ds.Batching, func()) { return ds, closer } -func newPeerstoreFactory(tb testing.TB) pt.PeerstoreFactory { +func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory { return func() (pstore.Peerstore, func()) { - ds, closeFunc := setupBadgerDatastore(tb) + ds, closeFunc := badgerStore(tb) - ps, err := NewPeerstore(context.Background(), ds) + ps, err := NewPeerstore(context.Background(), ds, opts) if err != nil { tb.Fatal(err) } @@ -43,17 +183,13 @@ func newPeerstoreFactory(tb testing.TB) pt.PeerstoreFactory { } } -func TestBadgerDsPeerstore(t *testing.T) { - pt.TestPeerstore(t, newPeerstoreFactory(t)) -} +func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory { + return func() (pstore.AddrBook, func()) { + ds, closeDB := badgerStore(tb) -func TestBadgerDsAddrBook(t *testing.T) { - pt.TestAddrBook(t, func() (pstore.AddrBook, func()) { - ds, closeDB := setupBadgerDatastore(t) - - mgr, err := NewAddrBook(context.Background(), ds, 100*time.Microsecond) + mgr, err := NewAddrBook(context.Background(), ds, opts) if err != nil { - t.Fatal(err) + tb.Fatal(err) } closeFunc := func() { @@ -61,9 +197,5 @@ func TestBadgerDsAddrBook(t *testing.T) { closeDB() } return mgr, closeFunc - }) -} - -func BenchmarkBadgerDsPeerstore(b *testing.B) { - pt.BenchmarkPeerstore(b, newPeerstoreFactory(b)) + } } diff --git a/p2p/host/peerstore/pstoreds/peerstore.go b/p2p/host/peerstore/pstoreds/peerstore.go index 5125c38649..a3b4d3f174 100644 --- a/p2p/host/peerstore/pstoreds/peerstore.go +++ b/p2p/host/peerstore/pstoreds/peerstore.go @@ -10,9 +10,34 @@ import ( pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" ) +// Configuration object for the peerstore. +type Options struct { + // The size of the in-memory cache. A value of 0 or lower disables the cache. + CacheSize uint + + // Sweep interval to expire entries, only used when TTL is *not* natively managed + // by the underlying datastore. + TTLInterval time.Duration + + // Number of times to retry transactional writes. + WriteRetries uint +} + +// DefaultOpts returns the default options for a persistent peerstore: +// * Cache size: 1024 +// * TTL sweep interval: 1 second +// * WriteRetries: 5 +func DefaultOpts() Options { + return Options{ + CacheSize: 1024, + TTLInterval: time.Second, + WriteRetries: 5, + } +} + // NewPeerstore creates a peerstore backed by the provided persistent datastore. -func NewPeerstore(ctx context.Context, ds ds.Batching) (pstore.Peerstore, error) { - addrBook, err := NewAddrBook(ctx, ds, time.Second) +func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pstore.Peerstore, error) { + addrBook, err := NewAddrBook(ctx, store, opts) if err != nil { return nil, err } diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 6db498ad2e..311c436ccb 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -162,10 +162,8 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t } exp := time.Now().Add(newTTL) - // TODO: RK - Shorthand. for i := range addrs { - aexp := &addrs[i] - if oldTTL == aexp.TTL { + if aexp := &addrs[i]; oldTTL == aexp.TTL { aexp.TTL = newTTL aexp.Expires = exp } diff --git a/p2p/host/peerstore/pstoremem/inmem_test.go b/p2p/host/peerstore/pstoremem/inmem_test.go index 5d0d4f3283..510399dabd 100644 --- a/p2p/host/peerstore/pstoremem/inmem_test.go +++ b/p2p/host/peerstore/pstoremem/inmem_test.go @@ -28,5 +28,5 @@ func TestInMemoryKeyBook(t *testing.T) { func BenchmarkInMemoryPeerstore(b *testing.B) { pt.BenchmarkPeerstore(b, func() (pstore.Peerstore, func()) { return NewPeerstore(), nil - }) + }, "InMem") } diff --git a/p2p/host/peerstore/test/addr_book_suite.go b/p2p/host/peerstore/test/addr_book_suite.go index 441e2fa77f..d78e7ccd34 100644 --- a/p2p/host/peerstore/test/addr_book_suite.go +++ b/p2p/host/peerstore/test/addr_book_suite.go @@ -18,6 +18,8 @@ var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){ "UpdateTTLs": testUpdateTTLs, "NilAddrsDontBreak": testNilAddrsDontBreak, "AddressesExpire": testAddressesExpire, + "ClearWithIter": testClearWithIterator, + "PeersWithAddresses": testPeersWithAddrs, } type AddrBookFactory func() (pstore.AddrBook, func()) @@ -152,25 +154,25 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { testHas(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs1[0]. - m.UpdateAddrs(ids[0], time.Hour, time.Second) + m.UpdateAddrs(ids[0], time.Hour, 100*time.Microsecond) // No immediate effect. testHas(t, addrs1, m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) // After a wait, addrs[0] is gone. - time.Sleep(1200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs2[0]. - m.UpdateAddrs(ids[1], time.Hour, time.Second) + m.UpdateAddrs(ids[1], time.Hour, 100*time.Microsecond) // No immediate effect. testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) - time.Sleep(1200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) // First addrs is gone in both. testHas(t, addrs1[1:], m.Addrs(ids[0])) @@ -207,33 +209,86 @@ func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) { testHas(t, addrs1, m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) - m.SetAddr(ids[0], addrs1[0], time.Millisecond) - <-time.After(time.Millisecond * 5) + m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond) + <-time.After(100 * time.Millisecond) testHas(t, addrs1[1:3], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) - m.SetAddr(ids[0], addrs1[2], time.Millisecond) - <-time.After(time.Millisecond * 5) + m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond) + <-time.After(100 * time.Millisecond) testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) - m.SetAddr(ids[1], addrs2[0], time.Millisecond) - <-time.After(time.Millisecond * 5) + m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond) + <-time.After(100 * time.Millisecond) testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2[1:], m.Addrs(ids[1])) - m.SetAddr(ids[1], addrs2[1], time.Millisecond) - <-time.After(time.Millisecond * 5) + m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond) + <-time.After(100 * time.Millisecond) testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, nil, m.Addrs(ids[1])) - m.SetAddr(ids[0], addrs1[1], time.Millisecond) - <-time.After(time.Millisecond * 5) + m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond) + <-time.After(100 * time.Millisecond) testHas(t, nil, m.Addrs(ids[0])) testHas(t, nil, m.Addrs(ids[1])) } } +func testClearWithIterator(m pstore.AddrBook) func(t *testing.T) { + return func(t *testing.T) { + ids := generatePeerIds(2) + addrs := generateAddrs(100) + + // Add the peers with 50 addresses each. + m.AddAddrs(ids[0], addrs[:50], pstore.PermanentAddrTTL) + m.AddAddrs(ids[1], addrs[50:], pstore.PermanentAddrTTL) + + if all := append(m.Addrs(ids[0]), m.Addrs(ids[1])...); len(all) != 100 { + t.Fatal("expected pstore to contain both peers with all their maddrs") + } + + // Since we don't fetch these peers, they won't be present in cache. + + m.ClearAddrs(ids[0]) + if all := append(m.Addrs(ids[0]), m.Addrs(ids[1])...); len(all) != 50 { + t.Fatal("expected pstore to contain only addrs of peer 2") + } + + m.ClearAddrs(ids[1]) + if all := append(m.Addrs(ids[0]), m.Addrs(ids[1])...); len(all) != 0 { + t.Fatal("expected pstore to contain no addresses") + } + } +} + +func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) { + return func(t *testing.T) { + // cannot run in parallel as the store is modified. + // go runs sequentially in the specified order + // see https://blog.golang.org/subtests + + t.Run("empty addrbook", func(t *testing.T) { + if peers := m.PeersWithAddrs(); len(peers) != 0 { + t.Fatal("expected to find no peers") + } + }) + + t.Run("non-empty addrbook", func(t *testing.T) { + ids := generatePeerIds(2) + addrs := generateAddrs(10) + + m.AddAddrs(ids[0], addrs[:5], pstore.PermanentAddrTTL) + m.AddAddrs(ids[1], addrs[5:], pstore.PermanentAddrTTL) + + if peers := m.PeersWithAddrs(); len(peers) != 2 { + t.Fatal("expected to find 2 peers") + } + }) + } +} + func testHas(t *testing.T, exp, act []ma.Multiaddr) { t.Helper() if len(exp) != len(act) { diff --git a/p2p/host/peerstore/test/benchmarks_suite.go b/p2p/host/peerstore/test/benchmarks_suite.go new file mode 100644 index 0000000000..69fde2d2a4 --- /dev/null +++ b/p2p/host/peerstore/test/benchmarks_suite.go @@ -0,0 +1,115 @@ +package test + +import ( + "context" + "fmt" + "testing" + + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +var peerstoreBenchmarks = map[string]func(pstore.Peerstore, chan *peerpair) func(*testing.B){ + "AddAddrs": benchmarkAddAddrs, + "SetAddrs": benchmarkSetAddrs, + "GetAddrs": benchmarkGetAddrs, + // The in-between get allows us to benchmark the read-through cache. + "AddGetAndClearAddrs": benchmarkAddGetAndClearAddrs, + // Calls PeersWithAddr on a peerstore with 1000 peers. + "Get1000PeersWithAddrs": benchmarkGet1000PeersWithAddrs, +} + +func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory, variant string) { + // Parameterises benchmarks to tackle peers with 1, 10, 100 multiaddrs. + params := []struct { + n int + ch chan *peerpair + }{ + {1, make(chan *peerpair, 100)}, + {10, make(chan *peerpair, 100)}, + {100, make(chan *peerpair, 100)}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start all test peer producing goroutines, where each produces peers with as many + // multiaddrs as the n field in the param struct. + for _, p := range params { + go addressProducer(ctx, b, p.ch, p.n) + } + + for name, bench := range peerstoreBenchmarks { + for _, p := range params { + // Create a new peerstore. + ps, closeFunc := factory() + + // Run the test. + b.Run(fmt.Sprintf("%s-%dAddrs-%s", name, p.n, variant), bench(ps, p.ch)) + + // Cleanup. + if closeFunc != nil { + closeFunc() + } + } + } +} + +func benchmarkAddAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) { + return func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + pp := <-addrs + ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL) + } + } +} + +func benchmarkSetAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) { + return func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + pp := <-addrs + ps.SetAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL) + } + } +} + +func benchmarkGetAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) { + return func(b *testing.B) { + pp := <-addrs + ps.SetAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = ps.Addrs(pp.ID) + } + } +} + +func benchmarkAddGetAndClearAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) { + return func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + pp := <-addrs + ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL) + ps.Addrs(pp.ID) + ps.ClearAddrs(pp.ID) + } + } +} + +func benchmarkGet1000PeersWithAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) { + return func(b *testing.B) { + var peers = make([]*peerpair, 1000) + for i, _ := range peers { + pp := <-addrs + ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL) + peers[i] = pp + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = ps.PeersWithAddrs() + } + } +} diff --git a/p2p/host/peerstore/test/peerstore_suite.go b/p2p/host/peerstore/test/peerstore_suite.go index 6b08cab1da..cd4bb634cc 100644 --- a/p2p/host/peerstore/test/peerstore_suite.go +++ b/p2p/host/peerstore/test/peerstore_suite.go @@ -15,7 +15,7 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" ) -var peerstoreSuite = map[string]func(book pstore.Peerstore) func(*testing.T){ +var peerstoreSuite = map[string]func(pstore.Peerstore) func(*testing.T){ "AddrStream": testAddrStream, "GetStreamBeforePeerAdded": testGetStreamBeforePeerAdded, "AddStreamDuplicates": testAddrStreamDuplicates, @@ -40,16 +40,6 @@ func TestPeerstore(t *testing.T, factory PeerstoreFactory) { } } -func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory) { - ps, closeFunc := factory() - - b.Run("Peerstore", benchmarkPeerstore(ps)) - - if closeFunc != nil { - closeFunc() - } -} - func testAddrStream(ps pstore.Peerstore) func(t *testing.T) { return func(t *testing.T) { addrs, pid := getAddrs(t, 100), peer.ID("testpeer") @@ -289,22 +279,6 @@ func testBasicPeerstore(ps pstore.Peerstore) func(t *testing.T) { } } -func benchmarkPeerstore(ps pstore.Peerstore) func(*testing.B) { - return func(b *testing.B) { - addrs := make(chan *peerpair, 100) - - ctx, cancel := context.WithCancel(context.Background()) - go addressProducer(ctx, b, addrs) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - pp := <-addrs - ps.AddAddr(pp.ID, pp.Addr, pstore.PermanentAddrTTL) - } - cancel() - } -} - func getAddrs(t *testing.T, n int) []ma.Multiaddr { var addrs []ma.Multiaddr for i := 0; i < n; i++ { diff --git a/p2p/host/peerstore/test/utils.go b/p2p/host/peerstore/test/utils.go index 8630e4e914..6649820785 100644 --- a/p2p/host/peerstore/test/utils.go +++ b/p2p/host/peerstore/test/utils.go @@ -10,31 +10,45 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +func multiaddr(m string) ma.Multiaddr { + maddr, err := ma.NewMultiaddr(m) + if err != nil { + panic(err) + } + return maddr +} + type peerpair struct { ID peer.ID - Addr ma.Multiaddr + Addr []ma.Multiaddr } -func randomPeer(b *testing.B) *peerpair { - var pid peer.ID - var err error - var addr ma.Multiaddr +func randomPeer(b *testing.B, addrCount int) *peerpair { + var ( + pid peer.ID + err error + addrs = make([]ma.Multiaddr, addrCount) + aFmt = "/ip4/127.0.0.1/tcp/%d/ipfs/%s" + ) + b.Helper() if pid, err = pt.RandPeerID(); err != nil { b.Fatal(err) } - if addr, err = ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/6666/ipfs/%s", pid.Pretty())); err != nil { - b.Fatal(err) + for i := 0; i < addrCount; i++ { + if addrs[i], err = ma.NewMultiaddr(fmt.Sprintf(aFmt, i, pid.Pretty())); err != nil { + b.Fatal(err) + } } - - return &peerpair{pid, addr} + return &peerpair{pid, addrs} } -func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair) { +func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, addrsPerPeer int) { + b.Helper() defer close(addrs) for { - p := randomPeer(b) + p := randomPeer(b, addrsPerPeer) select { case addrs <- p: case <-ctx.Done(): @@ -42,11 +56,3 @@ func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair) { } } } - -func multiaddr(m string) ma.Multiaddr { - maddr, err := ma.NewMultiaddr(m) - if err != nil { - panic(err) - } - return maddr -}