diff --git a/bucket.go b/bucket.go index 6a26f7b..0108799 100644 --- a/bucket.go +++ b/bucket.go @@ -9,45 +9,93 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -// Bucket holds a list of peers. -type Bucket struct { +// PeerState is the state of the peer as seen by the Routing Table. +type PeerState int + +const ( + // PeerStateActive indicates that we know the peer is active/alive. + PeerStateActive PeerState = iota + // PeerStateMissing indicates that we do not know the state of the peer. + PeerStateMissing +) + +// PeerInfo holds all related information for a peer in the K-Bucket. +type PeerInfo struct { + Id peer.ID + State PeerState +} + +// bucket holds a list of peers. +type bucket struct { lk sync.RWMutex list *list.List } -func newBucket() *Bucket { - b := new(Bucket) +func newBucket() *bucket { + b := new(bucket) b.list = list.New() return b } -func (b *Bucket) Peers() []peer.ID { +// returns all peers in the bucket +func (b *bucket) peers() []PeerInfo { + b.lk.RLock() + defer b.lk.RUnlock() + var ps []PeerInfo + for e := b.list.Front(); e != nil; e = e.Next() { + p := e.Value.(PeerInfo) + ps = append(ps, p) + } + return ps +} + +// return the Ids of all the peers in the bucket. +func (b *bucket) peerIds() []peer.ID { b.lk.RLock() defer b.lk.RUnlock() ps := make([]peer.ID, 0, b.list.Len()) for e := b.list.Front(); e != nil; e = e.Next() { - id := e.Value.(peer.ID) - ps = append(ps, id) + p := e.Value.(PeerInfo) + ps = append(ps, p.Id) } return ps } -func (b *Bucket) Has(id peer.ID) bool { +// returns the peer with the given Id and true if peer exists +// returns false if the peerId does not exist +func (b *bucket) getPeer(p peer.ID) (PeerInfo, bool) { b.lk.RLock() defer b.lk.RUnlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(PeerInfo).Id == p { + return e.Value.(PeerInfo), true + } + } + return PeerInfo{}, false +} + +// replaces the peer based on the Id. +// returns true if the replace was successful, false otherwise. +func (b *bucket) replace(p PeerInfo) bool { + b.lk.Lock() + defer b.lk.Unlock() + for e := b.list.Front(); e != nil; e = e.Next() { + if e.Value.(PeerInfo).Id == p.Id { + b.list.Remove(e) + b.list.PushBack(p) return true } } return false } -func (b *Bucket) Remove(id peer.ID) bool { +// removes the peer with the given Id from the bucket. +// returns true if successful, false otherwise. +func (b *bucket) remove(id peer.ID) bool { b.lk.Lock() defer b.lk.Unlock() for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(PeerInfo).Id == id { b.list.Remove(e) return true } @@ -55,40 +103,33 @@ func (b *Bucket) Remove(id peer.ID) bool { return false } -func (b *Bucket) MoveToFront(id peer.ID) { +func (b *bucket) moveToFront(id peer.ID) { b.lk.Lock() defer b.lk.Unlock() + for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(peer.ID) == id { + if e.Value.(PeerInfo).Id == id { b.list.MoveToFront(e) } } } -func (b *Bucket) PushFront(p peer.ID) { +func (b *bucket) pushFront(p PeerInfo) { b.lk.Lock() b.list.PushFront(p) b.lk.Unlock() } -func (b *Bucket) PopBack() peer.ID { - b.lk.Lock() - defer b.lk.Unlock() - last := b.list.Back() - b.list.Remove(last) - return last.Value.(peer.ID) -} - -func (b *Bucket) Len() int { +func (b *bucket) len() int { b.lk.RLock() defer b.lk.RUnlock() return b.list.Len() } -// Split splits a buckets peers into two buckets, the methods receiver will have +// splits a buckets peers into two buckets, the methods receiver will have // peers with CPL equal to cpl, the returned bucket will have peers with CPL // greater than cpl (returned bucket has closer peers) -func (b *Bucket) Split(cpl int, target ID) *Bucket { +func (b *bucket) split(cpl int, target ID) *bucket { b.lk.Lock() defer b.lk.Unlock() @@ -97,7 +138,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket { newbuck.list = out e := b.list.Front() for e != nil { - peerID := ConvertPeerID(e.Value.(peer.ID)) + peerID := ConvertPeerID(e.Value.(PeerInfo).Id) peerCPL := CommonPrefixLen(peerID, target) if peerCPL > cpl { cur := e diff --git a/cpl_replacement_cache.go b/cpl_replacement_cache.go new file mode 100644 index 0000000..7f17991 --- /dev/null +++ b/cpl_replacement_cache.go @@ -0,0 +1,98 @@ +package kbucket + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/wangjia184/sortedset" +) + +// TODO Should ideally use a Circular queue for this +// maintains a bounded, de-duplicated and FIFO peer candidate queue for each Cpl +type cplReplacementCache struct { + localPeer ID + maxQueueSize int + + sync.Mutex + candidates map[uint]*sortedset.SortedSet // candidates for a Cpl +} + +func newCplReplacementCache(localPeer ID, maxQueueSize int) *cplReplacementCache { + return &cplReplacementCache{ + localPeer: localPeer, + maxQueueSize: maxQueueSize, + candidates: make(map[uint]*sortedset.SortedSet), + } +} + +// pushes a candidate to the end of the queue for the corresponding Cpl +// returns false if the queue is full or it already has the peer +// returns true if was successfully added +func (c *cplReplacementCache) push(p peer.ID) bool { + c.Lock() + defer c.Unlock() + + // create queue if not created + cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) + if c.candidates[cpl] == nil { + c.candidates[cpl] = sortedset.New() + } + + q := c.candidates[cpl] + + // queue is full + if (q.GetCount()) >= c.maxQueueSize { + return false + } + // queue already has the peer + if q.GetByKey(string(p)) != nil { + return false + } + + // push + q.AddOrUpdate(string(p), sortedset.SCORE(q.GetCount()+1), nil) + return true +} + +// pops a candidate from the top of the candidate queue for the given Cpl +// returns false if the queue is empty +// returns the peerId and true if successful +func (c *cplReplacementCache) pop(cpl uint) (peer.ID, bool) { + c.Lock() + c.Unlock() + + q := c.candidates[cpl] + if q != nil && q.GetCount() > 0 { + n := q.PopMin() + + // delete the queue if it's empty + if q.GetCount() == 0 { + delete(c.candidates, cpl) + } + + return peer.ID(n.Key()), true + } + return "", false +} + +// removes a given peer if it's present +// returns false if the peer is absent +func (c *cplReplacementCache) remove(p peer.ID) bool { + c.Lock() + defer c.Unlock() + + cpl := uint(CommonPrefixLen(c.localPeer, ConvertPeerID(p))) + q := c.candidates[cpl] + if q != nil { + q.Remove(string(p)) + + // remove the queue if it's empty + if q.GetCount() == 0 { + delete(c.candidates, cpl) + } + + return true + } + return false +} diff --git a/cpl_replacement_cache_test.go b/cpl_replacement_cache_test.go new file mode 100644 index 0000000..2d34bea --- /dev/null +++ b/cpl_replacement_cache_test.go @@ -0,0 +1,82 @@ +package kbucket + +import ( + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + + "github.com/stretchr/testify/require" +) + +func TestCandidateQueue(t *testing.T) { + t.Parallel() + + maxQSize := 2 + local := ConvertPeerID(test.RandPeerIDFatal(t)) + c := newCplReplacementCache(local, maxQSize) + + // pop an empty queue fails + p, b := c.pop(1) + require.Empty(t, p) + require.False(t, b) + + // push two elements to an empty queue works + testPeer1 := genPeer(t, local, 1) + testPeer2 := genPeer(t, local, 1) + + // pushing first peer works + require.True(t, c.push(testPeer1)) + // pushing a duplicate fails + require.False(t, c.push(testPeer1)) + // pushing another peers works + require.True(t, c.push(testPeer2)) + + // popping the above pushes works + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer1, p) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer2, p) + + // pushing & popping again works + require.True(t, c.push(testPeer1)) + require.True(t, c.push(testPeer2)) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer1, p) + p, b = c.pop(1) + require.True(t, b) + require.Equal(t, testPeer2, p) + + // fill up a queue + p1 := genPeer(t, local, 2) + p2 := genPeer(t, local, 2) + require.True(t, c.push(p1)) + require.True(t, c.push(p2)) + + // push should not work on a full queue + p3 := genPeer(t, local, 2) + require.False(t, c.push(p3)) + + // remove a peer & verify it's been removed + require.NotNil(t, c.candidates[2].GetByKey(string(p2))) + require.True(t, c.remove(p2)) + c.Lock() + require.Nil(t, c.candidates[2].GetByKey(string(p2))) + c.Unlock() + + // now push should work + require.True(t, c.push(p3)) +} + +func genPeer(t *testing.T, local ID, cpl int) peer.ID { + var p peer.ID + for { + p = test.RandPeerIDFatal(t) + if CommonPrefixLen(local, ConvertPeerID(p)) == cpl { + return p + } + } +} diff --git a/go.mod b/go.mod index 980d86b..c4125e8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/minio/sha256-simd v0.1.1 github.com/multiformats/go-multihash v0.0.13 github.com/stretchr/testify v1.4.0 + github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 ) go 1.13 diff --git a/go.sum b/go.sum index 46342c1..593300d 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -204,6 +206,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnUy4kzoKJemjH8eqwCl3RjW1r1ITCjjW7G8= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= diff --git a/options.go b/options.go new file mode 100644 index 0000000..730d96c --- /dev/null +++ b/options.go @@ -0,0 +1,72 @@ +package kbucket + +import ( + "fmt" + "time" +) + +// Option is the Routing Table functional option type. +type Option func(*Options) error + +// Options is a structure containing all the functional options that can be used when constructing a Routing Table. +type Options struct { + TableCleanup struct { + PeersForValidationFnc PeerSelectionFnc + PeerValidationTimeout time.Duration + Interval time.Duration + } +} + +// Apply applies the given options to this Option. +func (o *Options) Apply(opts ...Option) error { + for i, opt := range opts { + if err := opt(o); err != nil { + return fmt.Errorf("routing table option %d failed: %s", i, err) + } + } + return nil +} + +// PeersForValidationFnc configures the function that will be used to select the peers that need to be validated during cleanup. +func PeersForValidationFnc(f PeerSelectionFnc) Option { + return func(o *Options) error { + o.TableCleanup.PeersForValidationFnc = f + return nil + } +} + +// TableCleanupInterval configures the interval between two runs of the Routing Table cleanup routine. +func TableCleanupInterval(i time.Duration) Option { + return func(o *Options) error { + o.TableCleanup.Interval = i + return nil + } +} + +// PeerValidationTimeout sets the timeout for a single peer validation during cleanup. +func PeerValidationTimeout(timeout time.Duration) Option { + return func(o *Options) error { + o.TableCleanup.PeerValidationTimeout = timeout + return nil + } +} + +// Defaults are the default options. This option will be automatically +// prepended to any options you pass to the Routing Table constructor. +var Defaults = func(o *Options) error { + o.TableCleanup.PeerValidationTimeout = 30 * time.Second + o.TableCleanup.Interval = 2 * time.Minute + + // default selector function selects all peers that are in missing state. + o.TableCleanup.PeersForValidationFnc = func(peers []PeerInfo) []PeerInfo { + var selectedPeers []PeerInfo + for _, p := range peers { + if p.State == PeerStateMissing { + selectedPeers = append(selectedPeers, p) + } + } + return selectedPeers + } + + return nil +} diff --git a/sorting.go b/sorting.go index 3b67072..b5c2a8c 100644 --- a/sorting.go +++ b/sorting.go @@ -36,7 +36,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID) { // Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted. func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) { for e := l.Front(); e != nil; e = e.Next() { - pds.appendPeer(e.Value.(peer.ID)) + pds.appendPeer(e.Value.(PeerInfo).Id) } } diff --git a/table.go b/table.go index 0b4455e..59b9512 100644 --- a/table.go +++ b/table.go @@ -2,6 +2,7 @@ package kbucket import ( + "context" "encoding/binary" "errors" "fmt" @@ -11,9 +12,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - mh "github.com/multiformats/go-multihash" logging "github.com/ipfs/go-log" + mh "github.com/multiformats/go-multihash" ) var log = logging.Logger("table") @@ -21,6 +22,13 @@ var log = logging.Logger("table") var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high") var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity") +// PeerSelectionFnc is the signature of a function that selects zero or more peers from the given peers +// based on some criteria. +type PeerSelectionFnc func(peers []PeerInfo) []PeerInfo + +// PeerValidationFnc is the signature of a function that determines the validity a peer for Routing Table membership. +type PeerValidationFnc func(ctx context.Context, p peer.ID) bool + // maxCplForRefresh is the maximum cpl we support for refresh. // This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now. const maxCplForRefresh uint = 15 @@ -34,6 +42,8 @@ type CplRefresh struct { // RoutingTable defines the routing table. type RoutingTable struct { + ctx context.Context + // ID of the local peer local ID @@ -47,31 +57,133 @@ type RoutingTable struct { maxLatency time.Duration // kBuckets define all the fingers to other nodes. - Buckets []*Bucket + Buckets []*bucket bucketsize int cplRefreshLk sync.RWMutex cplRefreshedAt map[uint]time.Time + // replacement candidates for a Cpl + cplReplacementCache *cplReplacementCache + // notification functions PeerRemoved func(peer.ID) PeerAdded func(peer.ID) + + // function to determine the validity of a peer for RT membership + PeerValidationFnc PeerValidationFnc + + // timeout for a single call to the peer validation function + peerValidationTimeout time.Duration + // interval between two runs of the table cleanup routine + tableCleanupInterval time.Duration + // function to select peers that need to be validated + peersForValidationFnc PeerSelectionFnc } // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. -func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable { +// Passing a nil PeerValidationFnc disables periodic table cleanup. +func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, + peerValidationFnc PeerValidationFnc, options ...Option) (*RoutingTable, error) { + + var cfg Options + if err := cfg.Apply(append([]Option{Defaults}, options...)...); err != nil { + return nil, err + } + rt := &RoutingTable{ - Buckets: []*Bucket{newBucket()}, - bucketsize: bucketsize, - local: localID, - maxLatency: latency, - metrics: m, + ctx: ctx, + Buckets: []*bucket{newBucket()}, + bucketsize: bucketsize, + local: localID, + + maxLatency: latency, + metrics: m, + cplRefreshedAt: make(map[uint]time.Time), - PeerRemoved: func(peer.ID) {}, - PeerAdded: func(peer.ID) {}, + + PeerRemoved: func(peer.ID) {}, + PeerAdded: func(peer.ID) {}, + + PeerValidationFnc: peerValidationFnc, + peersForValidationFnc: cfg.TableCleanup.PeersForValidationFnc, + peerValidationTimeout: cfg.TableCleanup.PeerValidationTimeout, + tableCleanupInterval: cfg.TableCleanup.Interval, } - return rt + rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize) + + // schedule periodic RT cleanup + if peerValidationFnc != nil { + go rt.cleanup() + } + + return rt, nil +} + +func (rt *RoutingTable) cleanup() { + validatePeerF := func(p peer.ID) bool { + queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout) + defer cancel() + return rt.PeerValidationFnc(queryCtx, p) + } + + cleanupTickr := time.NewTicker(rt.tableCleanupInterval) + defer cleanupTickr.Stop() + for { + select { + case <-rt.ctx.Done(): + return + case <-cleanupTickr.C: + ps := rt.peersToValidate() + for _, pinfo := range ps { + // continue if we are able to successfully validate the peer + // it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive() + // TODO Should we revisit this ? It makes more sense for the RT to mark it as active here + if validatePeerF(pinfo.Id) { + log.Infof("successfully validated missing peer=%s", pinfo.Id) + continue + } + + // peer does not seem to be alive, let's try candidates now + log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id) + // evict missing peer + rt.HandlePeerDead(pinfo.Id) + + // keep trying replacement candidates for the missing peer till we get a successful validation or + // we run out of candidates + cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local)) + c, notEmpty := rt.cplReplacementCache.pop(cpl) + for notEmpty { + if validatePeerF(c) { + log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id) + break + } + log.Infof("failed to validated candidate=%s", c) + // remove candidate + rt.HandlePeerDead(c) + + c, notEmpty = rt.cplReplacementCache.pop(cpl) + } + + if !notEmpty { + log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id) + } + } + } + } +} + +// returns the peers that need to be validated. +func (rt *RoutingTable) peersToValidate() []PeerInfo { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + var peers []PeerInfo + for _, b := range rt.Buckets { + peers = append(peers, b.peers()...) + } + return rt.peersForValidationFnc(peers) } // GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh. @@ -128,24 +240,41 @@ func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) { rt.cplRefreshedAt[uint(cpl)] = newTime } -// Update adds or moves the given peer to the front of its respective bucket -func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { - peerID := ConvertPeerID(p) - cpl := CommonPrefixLen(peerID, rt.local) - +// HandlePeerDisconnect should be called when the caller detects a disconnection with the peer. +// This enables the Routing Table to mark the peer as missing. +func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) { rt.tabLock.Lock() defer rt.tabLock.Unlock() - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 + + // mark the peer as missing + bucketId := rt.bucketIdForPeer(p) + b := rt.Buckets[bucketId] + if peer, has := b.getPeer(p); has { + peer.State = PeerStateMissing + b.replace(peer) } +} + +// HandlePeerAlive should be called when the caller detects that a peer is alive. +// This could be a successful incoming/outgoing connection with the peer or even a successful message delivery to/from the peer. +// This enables the RT to update it's internal state to mark the peer as active. +func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error) { + rt.tabLock.Lock() + defer rt.tabLock.Unlock() + bucketID := rt.bucketIdForPeer(p) bucket := rt.Buckets[bucketID] - if bucket.Has(p) { + if peer, has := bucket.getPeer(p); has { + // mark the peer as active if it was missing + if peer.State == PeerStateMissing { + peer.State = PeerStateActive + bucket.replace(peer) + } + // If the peer is already in the table, move it to the front. // This signifies that it it "more active" and the less active nodes // Will as a result tend towards the back of the list - bucket.MoveToFront(p) + bucket.moveToFront(p) return "", nil } @@ -155,8 +284,8 @@ func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { } // We have enough space in the bucket (whether spawned or grouped). - if bucket.Len() < rt.bucketsize { - bucket.PushFront(p) + if bucket.len() < rt.bucketsize { + bucket.pushFront(PeerInfo{p, PeerStateActive}) rt.PeerAdded(p) return "", nil } @@ -165,39 +294,34 @@ func (rt *RoutingTable) Update(p peer.ID) (evicted peer.ID, err error) { // if the bucket is too large and this is the last bucket (i.e. wildcard), unfold it. rt.nextBucket() // the structure of the table has changed, so let's recheck if the peer now has a dedicated bucket. - bucketID = cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 - } + bucketID = rt.bucketIdForPeer(p) bucket = rt.Buckets[bucketID] - if bucket.Len() >= rt.bucketsize { - // if after all the unfolding, we're unable to find room for this peer, scrap it. - return "", ErrPeerRejectedNoCapacity + + // push the peer only if the bucket isn't overflowing after slitting + if bucket.len() < rt.bucketsize { + bucket.pushFront(PeerInfo{p, PeerStateActive}) + rt.PeerAdded(p) + return "", nil } - bucket.PushFront(p) - rt.PeerAdded(p) - return "", nil } + // try to push it as a candidate in the replacement cache + rt.cplReplacementCache.push(p) return "", ErrPeerRejectedNoCapacity } -// Remove deletes a peer from the routing table. This is to be used -// when we are sure a node has disconnected completely. -func (rt *RoutingTable) Remove(p peer.ID) { - peerID := ConvertPeerID(p) - cpl := CommonPrefixLen(peerID, rt.local) +// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable. +// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one. +func (rt *RoutingTable) HandlePeerDead(p peer.ID) { + // remove it as a candidate + rt.cplReplacementCache.remove(p) + // remove it from the RT rt.tabLock.Lock() defer rt.tabLock.Unlock() - - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 - } - + bucketID := rt.bucketIdForPeer(p) bucket := rt.Buckets[bucketID] - if bucket.Remove(p) { + if bucket.remove(p) { rt.PeerRemoved(p) } } @@ -207,11 +331,11 @@ func (rt *RoutingTable) nextBucket() { // _allegedly_ is used here to denote that *all* peers in the last bucket might feasibly belong to another bucket. // This could happen if e.g. we've unfolded 4 buckets, and all peers in folded bucket 5 really belong in bucket 8. bucket := rt.Buckets[len(rt.Buckets)-1] - newBucket := bucket.Split(len(rt.Buckets)-1, rt.local) + newBucket := bucket.split(len(rt.Buckets)-1, rt.local) rt.Buckets = append(rt.Buckets, newBucket) // The newly formed bucket still contains too many peers. We probably just unfolded a empty bucket. - if newBucket.Len() >= rt.bucketsize { + if newBucket.len() >= rt.bucketsize { // Keep unfolding the table until the last bucket is not overflowing. rt.nextBucket() } @@ -248,7 +372,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { // It's assumed that this also protects the buckets. rt.tabLock.RLock() - // Get bucket index or last bucket + // getPeer bucket index or last bucket if cpl >= len(rt.Buckets) { cpl = len(rt.Buckets) - 1 } @@ -307,7 +431,7 @@ func (rt *RoutingTable) Size() int { var tot int rt.tabLock.RLock() for _, buck := range rt.Buckets { - tot += buck.Len() + tot += buck.len() } rt.tabLock.RUnlock() return tot @@ -318,7 +442,7 @@ func (rt *RoutingTable) ListPeers() []peer.ID { var peers []peer.ID rt.tabLock.RLock() for _, buck := range rt.Buckets { - peers = append(peers, buck.Peers()...) + peers = append(peers, buck.peerIds()...) } rt.tabLock.RUnlock() return peers @@ -341,3 +465,14 @@ func (rt *RoutingTable) Print() { } rt.tabLock.RUnlock() } + +// the caller is responsible for the locking +func (rt *RoutingTable) bucketIdForPeer(p peer.ID) int { + peerID := ConvertPeerID(p) + cpl := CommonPrefixLen(peerID, rt.local) + bucketID := cpl + if bucketID >= len(rt.Buckets) { + bucketID = len(rt.Buckets) - 1 + } + return bucketID +} diff --git a/table_test.go b/table_test.go index ce83aff..e02219c 100644 --- a/table_test.go +++ b/table_test.go @@ -1,16 +1,22 @@ package kbucket import ( + "context" "math/rand" "testing" "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/test" + pstore "github.com/libp2p/go-libp2p-peerstore" "github.com/stretchr/testify/require" ) +var PeerAlwaysValidFnc = func(ctx context.Context, p peer.ID) bool { + return true +} + // Test basic features of the bucket struct func TestBucket(t *testing.T) { t.Parallel() @@ -20,43 +26,55 @@ func TestBucket(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - b.PushFront(peers[i]) + b.pushFront(PeerInfo{peers[i], PeerStateActive}) } local := test.RandPeerIDFatal(t) localID := ConvertPeerID(local) - i := rand.Intn(len(peers)) - if !b.Has(peers[i]) { - t.Errorf("Failed to find peer: %v", peers[i]) - } + infos := b.peers() + require.Len(t, infos, 100) - spl := b.Split(0, ConvertPeerID(local)) + i := rand.Intn(len(peers)) + p, has := b.getPeer(peers[i]) + require.True(t, has) + require.Equal(t, peers[i], p.Id) + require.Equal(t, PeerStateActive, p.State) + + // replace + require.True(t, b.replace(PeerInfo{peers[i], PeerStateMissing})) + p, has = b.getPeer(peers[i]) + require.True(t, has) + require.Equal(t, PeerStateMissing, p.State) + + spl := b.split(0, ConvertPeerID(local)) llist := b.list for e := llist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(peer.ID)) + p := ConvertPeerID(e.Value.(PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl > 0 { - t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") + t.Fatalf("split failed. found id with cpl > 0 in 0 bucket") } } rlist := spl.list for e := rlist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(peer.ID)) + p := ConvertPeerID(e.Value.(PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl == 0 { - t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") + t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket") } } } func TestGenRandPeerID(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) // generate above maxCplForRefresh fails p, err := rt.GenRandPeerID(maxCplForRefresh + 1) @@ -74,12 +92,14 @@ func TestGenRandPeerID(t *testing.T) { func TestRefreshAndGetTrackedCpls(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 1, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) - // add cpl's for tracking + // push cpl's for tracking for cpl := uint(0); cpl < maxCplForRefresh; cpl++ { peerID, err := rt.GenRandPeerID(cpl) require.NoError(t, err) @@ -100,12 +120,52 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { } } +func TestHandlePeerDead(t *testing.T) { + t.Parallel() + ctx := context.Background() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(ctx, 2, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) + + // push 3 peers -> 2 for the first bucket, and 1 as candidates + var peers []peer.ID + for i := 0; i < 3; i++ { + p, err := rt.GenRandPeerID(uint(0)) + require.NoError(t, err) + require.NotEmpty(t, p) + rt.HandlePeerAlive(p) + peers = append(peers, p) + } + + // ensure we have 1 candidate + rt.cplReplacementCache.Lock() + require.NotNil(t, rt.cplReplacementCache.candidates[uint(0)]) + require.True(t, rt.cplReplacementCache.candidates[uint(0)].GetCount() == 1) + rt.cplReplacementCache.Unlock() + + // mark a peer as dead and ensure it's not in the RT + require.NotEmpty(t, rt.Find(peers[0])) + rt.HandlePeerDead(peers[0]) + require.Empty(t, rt.Find(peers[0])) + + // mark the peer as dead & verify we don't get it as a candidate + rt.HandlePeerDead(peers[2]) + + rt.cplReplacementCache.Lock() + require.Nil(t, rt.cplReplacementCache.candidates[uint(0)]) + rt.cplReplacementCache.Unlock() +} + func TestTableCallbacks(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { @@ -120,18 +180,18 @@ func TestTableCallbacks(t *testing.T) { delete(pset, p) } - rt.Update(peers[0]) + rt.HandlePeerAlive(peers[0]) if _, ok := pset[peers[0]]; !ok { t.Fatal("should have this peer") } - rt.Remove(peers[0]) + rt.HandlePeerDead(peers[0]) if _, ok := pset[peers[0]]; ok { t.Fatal("should not have this peer") } for _, p := range peers { - rt.Update(p) + rt.HandlePeerAlive(p) } out := rt.ListPeers() @@ -147,22 +207,55 @@ func TestTableCallbacks(t *testing.T) { } } +func TestHandlePeerDisconnect(t *testing.T) { + t.Parallel() + ctx := context.Background() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) + + p := test.RandPeerIDFatal(t) + // mark a peer as alive + rt.HandlePeerAlive(p) + + // verify it's active + rt.tabLock.Lock() + bp, has := rt.Buckets[0].getPeer(p) + require.True(t, has) + require.NotNil(t, bp) + require.Equal(t, PeerStateActive, bp.State) + rt.tabLock.Unlock() + + //now mark it as disconnected & verify it's in missing state + rt.HandlePeerDisconnect(p) + rt.tabLock.Lock() + bp, has = rt.Buckets[0].getPeer(p) + require.True(t, has) + require.NotNil(t, bp) + require.Equal(t, PeerStateMissing, bp.State) + rt.tabLock.Unlock() +} + // Right now, this just makes sure that it doesnt hang or crash -func TestTableUpdate(t *testing.T) { +func TestHandlePeerAlive(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) } - // Testing Update + // Testing HandlePeerAlive for i := 0; i < 10000; i++ { - rt.Update(peers[rand.Intn(len(peers))]) + rt.HandlePeerAlive(peers[rand.Intn(len(peers))]) } for i := 0; i < 100; i++ { @@ -176,15 +269,17 @@ func TestTableUpdate(t *testing.T) { func TestTableFind(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 5; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -194,12 +289,45 @@ func TestTableFind(t *testing.T) { } } +func TestCandidateAddition(t *testing.T) { + t.Parallel() + ctx := context.Background() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) + + // generate 6 peers for the first bucket, 3 to push to it, and 3 as candidates + var peers []peer.ID + for i := 0; i < 6; i++ { + p, err := rt.GenRandPeerID(uint(0)) + require.NoError(t, err) + require.NotEmpty(t, p) + rt.HandlePeerAlive(p) + peers = append(peers, p) + } + + // fetch & verify candidates + for _, p := range peers[3:] { + ap, b := rt.cplReplacementCache.pop(0) + require.True(t, b) + require.Equal(t, p, ap) + } + + // now pop should fail as queue should be empty + _, b := rt.cplReplacementCache.pop(0) + require.False(t, b) +} + func TestTableEldestPreferred(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 10, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) // generate size + 1 peers to saturate a bucket peers := make([]peer.ID, 15) @@ -212,14 +340,14 @@ func TestTableEldestPreferred(t *testing.T) { // test 10 first peers are accepted. for _, p := range peers[:10] { - if _, err := rt.Update(p); err != nil { + if _, err := rt.HandlePeerAlive(p); err != nil { t.Errorf("expected all 10 peers to be accepted; instead got: %v", err) } } // test next 5 peers are rejected. for _, p := range peers[10:] { - if _, err := rt.Update(p); err != ErrPeerRejectedNoCapacity { + if _, err := rt.HandlePeerAlive(p); err != ErrPeerRejectedNoCapacity { t.Errorf("expected extra 5 peers to be rejected; instead got: %v", err) } } @@ -227,15 +355,17 @@ func TestTableEldestPreferred(t *testing.T) { func TestTableFindMultiple(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() - rt := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) + rt, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 18; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } t.Logf("Searching for peer: '%s'", peers[2]) @@ -259,17 +389,19 @@ func assertSortedPeerIdsEqual(t *testing.T, a, b []peer.ID) { func TestTableFindMultipleBuckets(t *testing.T) { t.Parallel() + ctx := context.Background() local := test.RandPeerIDFatal(t) localID := ConvertPeerID(local) m := pstore.NewMetrics() - rt := NewRoutingTable(5, localID, time.Hour, m) + rt, err := NewRoutingTable(ctx, 5, localID, time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - rt.Update(peers[i]) + rt.HandlePeerAlive(peers[i]) } targetID := ConvertPeerID(peers[2]) @@ -277,7 +409,7 @@ func TestTableFindMultipleBuckets(t *testing.T) { closest := SortClosestPeers(rt.ListPeers(), targetID) targetCpl := CommonPrefixLen(localID, targetID) - // Split the peers into closer, same, and further from the key (than us). + // split the peers into closer, same, and further from the key (than us). var ( closer, same, further []peer.ID ) @@ -374,10 +506,12 @@ func TestTableFindMultipleBuckets(t *testing.T) { // and set GOMAXPROCS above 1 func TestTableMultithreaded(t *testing.T) { t.Parallel() + ctx := context.Background() local := peer.ID("localPeer") m := pstore.NewMetrics() - tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) + tab, err := NewRoutingTable(ctx, 20, ConvertPeerID(local), time.Hour, m, PeerAlwaysValidFnc) + require.NoError(t, err) var peers []peer.ID for i := 0; i < 500; i++ { peers = append(peers, test.RandPeerIDFatal(t)) @@ -387,7 +521,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.Update(peers[n]) + tab.HandlePeerAlive(peers[n]) } done <- struct{}{} }() @@ -395,7 +529,7 @@ func TestTableMultithreaded(t *testing.T) { go func() { for i := 0; i < 1000; i++ { n := rand.Intn(len(peers)) - tab.Update(peers[n]) + tab.HandlePeerAlive(peers[n]) } done <- struct{}{} }() @@ -412,11 +546,107 @@ func TestTableMultithreaded(t *testing.T) { <-done } -func BenchmarkUpdates(b *testing.B) { +func TestTableCleanup(t *testing.T) { + t.Parallel() + ctx := context.Background() + local := test.RandPeerIDFatal(t) + + // Generate: + // 6 peers with CPL 0 + // 6 peers with CPL 1 + cplPeerMap := make(map[int][]peer.ID) + for cpl := 0; cpl < 2; cpl++ { + i := 0 + + for { + p := test.RandPeerIDFatal(t) + if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl { + cplPeerMap[cpl] = append(cplPeerMap[cpl], p) + + i++ + if i == 6 { + break + } + } + } + } + + // create RT with a very short cleanup interval + rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerAlwaysValidFnc, + TableCleanupInterval(100*time.Millisecond)) + require.NoError(t, err) + + // mock peer validation fnc that successfully validates p[1], p[3] & p[5] + f := func(ctx context.Context, p peer.ID) bool { + cpl := CommonPrefixLen(rt.local, ConvertPeerID(p)) + if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p { + rt.HandlePeerAlive(p) + return true + + } else { + return false + } + } + + // for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates + for _, peers := range cplPeerMap { + for _, p := range peers { + rt.HandlePeerAlive(p) + + } + } + + // validate current state + rt.tabLock.RLock() + require.Len(t, rt.ListPeers(), 6) + ps0 := rt.Buckets[0].peerIds() + require.Len(t, ps0, 3) + ps1 := rt.Buckets[1].peerIds() + require.Len(t, ps1, 3) + require.Contains(t, ps0, cplPeerMap[0][0]) + require.Contains(t, ps0, cplPeerMap[0][1]) + require.Contains(t, ps0, cplPeerMap[0][2]) + require.Contains(t, ps1, cplPeerMap[1][0]) + require.Contains(t, ps1, cplPeerMap[1][1]) + require.Contains(t, ps1, cplPeerMap[1][2]) + rt.tabLock.RUnlock() + + // now change peer validation fnc + rt.PeerValidationFnc = f + + // now mark p[0],p[1] & p[2] as dead so p[3] & p[5] replace p[0] and p[2] + for _, peers := range cplPeerMap { + rt.HandlePeerDisconnect(peers[0]) + rt.HandlePeerDisconnect(peers[1]) + rt.HandlePeerDisconnect(peers[2]) + } + + // let RT cleanup complete + time.Sleep(1 * time.Second) + + // verify RT state + rt.tabLock.RLock() + require.Len(t, rt.ListPeers(), 6) + ps0 = rt.Buckets[0].peerIds() + require.Len(t, ps0, 3) + ps1 = rt.Buckets[1].peerIds() + require.Len(t, ps1, 3) + require.Contains(t, ps0, cplPeerMap[0][1]) + require.Contains(t, ps0, cplPeerMap[0][3]) + require.Contains(t, ps0, cplPeerMap[0][5]) + require.Contains(t, ps1, cplPeerMap[1][1]) + require.Contains(t, ps1, cplPeerMap[1][3]) + require.Contains(t, ps1, cplPeerMap[1][5]) + rt.tabLock.RUnlock() +} + +func BenchmarkHandlePeerAlive(b *testing.B) { + ctx := context.Background() b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab := NewRoutingTable(20, local, time.Hour, m) + tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerAlwaysValidFnc) + require.NoError(b, err) var peers []peer.ID for i := 0; i < b.N; i++ { @@ -425,20 +655,22 @@ func BenchmarkUpdates(b *testing.B) { b.StartTimer() for i := 0; i < b.N; i++ { - tab.Update(peers[i]) + tab.HandlePeerAlive(peers[i]) } } func BenchmarkFinds(b *testing.B) { + ctx := context.Background() b.StopTimer() local := ConvertKey("localKey") m := pstore.NewMetrics() - tab := NewRoutingTable(20, local, time.Hour, m) + tab, err := NewRoutingTable(ctx, 20, local, time.Hour, m, PeerAlwaysValidFnc) + require.NoError(b, err) var peers []peer.ID for i := 0; i < b.N; i++ { peers = append(peers, test.RandPeerIDFatal(b)) - tab.Update(peers[i]) + tab.HandlePeerAlive(peers[i]) } b.StartTimer()