Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

feat: Use a clock interface in pstoreds as well #200

Merged
merged 2 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
retract v0.2.9 // Contains backwards-incompatible changes. Use v0.3.0 instead.

require (
github.com/benbjohnson/clock v1.3.0
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-datastore v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
Expand Down
44 changes: 33 additions & 11 deletions pstoreds/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.)
//
// If the return value is true, the caller should perform a flush immediately to sync the record with the store.
func (r *addrsRecord) clean() (chgd bool) {
now := time.Now().Unix()
func (r *addrsRecord) clean(now time.Time) (chgd bool) {
nowUnix := now.Unix()
addrsLen := len(r.Addrs)

if !r.dirty && !r.hasExpiredAddrs(now) {
if !r.dirty && !r.hasExpiredAddrs(nowUnix) {
// record is not dirty, and we have no expired entries to purge.
return false
}
Expand All @@ -104,7 +104,7 @@ func (r *addrsRecord) clean() (chgd bool) {
})
}

r.Addrs = removeExpired(r.Addrs, now)
r.Addrs = removeExpired(r.Addrs, nowUnix)

return r.dirty || len(r.Addrs) != addrsLen
}
Expand Down Expand Up @@ -144,6 +144,23 @@ type dsAddrBook struct {
// controls children goroutine lifetime.
childrenDone sync.WaitGroup
cancelFn func()

clock clock
}

type clock interface {
Now() time.Time
After(d time.Duration) <-chan time.Time
}

type realclock struct{}

func (rc realclock) Now() time.Time {
return time.Now()
}

func (rc realclock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}

var _ pstore.AddrBook = (*dsAddrBook)(nil)
Expand Down Expand Up @@ -176,6 +193,11 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd
opts: opts,
cancelFn: cancelFn,
subsManager: pstoremem.NewAddrSubManager(),
clock: realclock{},
}

if opts.Clock != nil {
ab.clock = opts.Clock
}

if opts.CacheSize > 0 {
Expand Down Expand Up @@ -212,7 +234,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
pr.Lock()
defer pr.Unlock()

if pr.clean() && update {
if pr.clean(ab.clock.Now()) && update {
err = pr.flush(ab.ds)
}
return pr, err
Expand All @@ -231,7 +253,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
return nil, err
}
// this record is new and local for now (not in cache), so we don't need to lock.
if pr.clean() && update {
if pr.clean(ab.clock.Now()) && update {
err = pr.flush(ab.ds)
}
default:
Expand Down Expand Up @@ -383,7 +405,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
pr.Lock()
defer pr.Unlock()

newExp := time.Now().Add(newTTL).Unix()
newExp := ab.clock.Now().Add(newTTL).Unix()
for _, entry := range pr.Addrs {
if entry.Ttl != int64(oldTTL) {
continue
Expand All @@ -392,7 +414,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
pr.dirty = true
}

if pr.clean() {
if pr.clean(ab.clock.Now()) {
pr.flush(ab.ds)
}
}
Expand Down Expand Up @@ -461,7 +483,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
// return nil
// }

newExp := time.Now().Add(ttl).Unix()
newExp := ab.clock.Now().Add(ttl).Unix()
addrsMap := make(map[string]*pb.AddrBookRecord_AddrEntry, len(pr.Addrs))
for _, addr := range pr.Addrs {
addrsMap[string(addr.Addr.Bytes())] = addr
Expand Down Expand Up @@ -521,7 +543,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
// }

pr.dirty = true
pr.clean()
pr.clean(ab.clock.Now())
return pr.flush(ab.ds)
}

Expand Down Expand Up @@ -567,7 +589,7 @@ func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
pr.Addrs = deleteInPlace(pr.Addrs, addrs)

pr.dirty = true
pr.clean()
pr.clean(ab.clock.Now())
return pr.flush(ab.ds)
}

Expand Down
36 changes: 29 additions & 7 deletions pstoreds/addr_book_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
// queries
purgeLookaheadQuery = query.Query{
Prefix: gcLookaheadBase.String(),
Orders: []query.Order{query.OrderByKey{}},
Orders: []query.Order{query.OrderByFunction(orderByTimestampInKey)},
KeysOnly: true,
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func (gc *dsAddrBookGc) background() {
defer gc.ab.childrenDone.Done()

select {
case <-time.After(gc.ab.opts.GCInitialDelay):
case <-gc.ab.clock.After(gc.ab.opts.GCInitialDelay):
case <-gc.ab.ctx.Done():
// yield if we have been cancelled/closed before the delay elapses.
return
Expand Down Expand Up @@ -180,7 +180,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
}
defer results.Close()

now := time.Now().Unix()
now := gc.ab.clock.Now().Unix()

// keys: /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32>
// values: nil
Expand Down Expand Up @@ -214,7 +214,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
if e, ok := gc.ab.cache.Peek(id); ok {
cached := e.(*addrsRecord)
cached.Lock()
if cached.clean() {
if cached.clean(gc.ab.clock.Now()) {
if err = cached.flush(batch); err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
Expand All @@ -239,7 +239,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
dropInError(gcKey, err, "unmarshalling entry")
continue
}
if record.clean() {
if record.clean(gc.ab.clock.Now()) {
err = record.flush(batch)
if err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
Expand Down Expand Up @@ -284,7 +284,7 @@ func (gc *dsAddrBookGc) purgeStore() {
}

id := record.Id.ID
if !record.clean() {
if !record.clean(gc.ab.clock.Now()) {
continue
}

Expand Down Expand Up @@ -317,7 +317,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
return
}

until := time.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()
until := gc.ab.clock.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()

var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
Expand Down Expand Up @@ -386,3 +386,25 @@ func (gc *dsAddrBookGc) populateLookahead() {

gc.currWindowEnd = until
}

// orderByTimestampInKey orders the results by comparing the timestamp in the
// key. A lexiographic sort by itself is wrong since "10" is less than "2", but
// as an int 2 is obviously less than 10.
func orderByTimestampInKey(a, b query.Entry) int {
aKey := ds.RawKey(a.Key)
aInt, err := strconv.ParseInt(aKey.Parent().Name(), 10, 64)
if err != nil {
return -1
}
bKey := ds.RawKey(b.Key)
bInt, err := strconv.ParseInt(bKey.Parent().Name(), 10, 64)
if err != nil {
return -1
}
if aInt < bInt {
return -1
} else if aInt == bInt {
return 0
}
return 1
}
19 changes: 13 additions & 6 deletions pstoreds/addr_book_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

mockClock "github.com/benbjohnson/clock"
query "github.com/ipfs/go-datastore/query"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
test "github.com/libp2p/go-libp2p-peerstore/test"
Expand Down Expand Up @@ -90,6 +91,8 @@ func TestGCPurging(t *testing.T) {
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 20 * time.Second
opts.GCPurgeInterval = 1 * time.Second
clk := mockClock.NewMock()
opts.Clock = clk

factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
Expand Down Expand Up @@ -120,7 +123,7 @@ func TestGCPurging(t *testing.T) {
t.Errorf("expected 4 GC lookahead entries, got: %v", i)
}

<-time.After(2 * time.Second)
clk.Add(2 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
Expand All @@ -129,13 +132,13 @@ func TestGCPurging(t *testing.T) {
// Purge the cache, to exercise a different path in the purge cycle.
tp.clearCache()

<-time.After(5 * time.Second)
clk.Add(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}

<-time.After(5 * time.Second)
clk.Add(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 GC lookahead entries, got: %v", i)
Expand All @@ -157,6 +160,8 @@ func TestGCDelay(t *testing.T) {
opts.GCInitialDelay = 2 * time.Second
opts.GCLookaheadInterval = 1 * time.Minute
opts.GCPurgeInterval = 30 * time.Second
clk := mockClock.NewMock()
opts.Clock = clk

factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
Expand All @@ -172,7 +177,7 @@ func TestGCDelay(t *testing.T) {
}

// after the initial delay has passed.
<-time.After(3 * time.Second)
clk.Add(3 * time.Second)
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 lookahead entry, got: %d", i)
}
Expand All @@ -188,6 +193,8 @@ func TestGCLookaheadDisabled(t *testing.T) {
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 0 // disable lookahead
opts.GCPurgeInterval = 9 * time.Hour
clk := mockClock.NewMock()
opts.Clock = clk

factory := addressBookFactory(t, leveldbStore, opts)
ab, closeFn := factory()
Expand All @@ -206,13 +213,13 @@ func TestGCLookaheadDisabled(t *testing.T) {
ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour)
ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour)

time.Sleep(100 * time.Millisecond)
clk.Add(100 * time.Millisecond)

if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no GC lookahead entries, got: %v", i)
}

time.Sleep(500 * time.Millisecond)
clk.Add(500 * time.Millisecond)
gc := ab.(*dsAddrBook).gc
gc.purgeFunc()

Expand Down
10 changes: 8 additions & 2 deletions pstoreds/ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

pstore "github.com/libp2p/go-libp2p-core/peerstore"
pt "github.com/libp2p/go-libp2p-peerstore/test"

mockClock "github.com/benbjohnson/clock"
)

type datastoreFactory func(tb testing.TB) (ds.Batching, func())
Expand Down Expand Up @@ -50,16 +52,20 @@ func TestDsAddrBook(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 1024
clk := mockClock.NewMock()
opts.Clock = clk

pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
})

t.Run(name+" Cacheless", func(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 0
clk := mockClock.NewMock()
opts.Clock = clk

pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Options struct {
// Initial delay before GC processes start. Intended to give the system breathing room to fully boot
// before starting GC.
GCInitialDelay time.Duration

Clock clock
}

// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm:
Expand All @@ -50,6 +52,7 @@ func DefaultOpts() Options {
GCPurgeInterval: 2 * time.Hour,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
Clock: realclock{},
}
}

Expand Down
6 changes: 4 additions & 2 deletions pstoremem/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

pstore "github.com/libp2p/go-libp2p-core/peerstore"

mockClock "github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)
Expand Down Expand Up @@ -43,11 +44,12 @@ func TestPeerstoreProtoStoreLimits(t *testing.T) {
}

func TestInMemoryAddrBook(t *testing.T) {
clk := mockClock.NewMock()
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
ps, err := NewPeerstore()
ps, err := NewPeerstore(WithClock(clk))
require.NoError(t, err)
return ps, func() { ps.Close() }
})
}, clk)
}

func TestInMemoryKeyBook(t *testing.T) {
Expand Down
Loading