diff --git a/bucket.go b/bucket.go index 268e432..6a26f7b 100644 --- a/bucket.go +++ b/bucket.go @@ -5,7 +5,6 @@ package kbucket import ( "container/list" "sync" - "time" "github.com/libp2p/go-libp2p-core/peer" ) @@ -14,32 +13,14 @@ import ( type Bucket struct { lk sync.RWMutex list *list.List - - lastRefreshedAtLk sync.RWMutex - lastRefreshedAt time.Time // the last time we looked up a key in the bucket } func newBucket() *Bucket { b := new(Bucket) b.list = list.New() - b.lastRefreshedAt = time.Now() return b } -func (b *Bucket) RefreshedAt() time.Time { - b.lastRefreshedAtLk.RLock() - defer b.lastRefreshedAtLk.RUnlock() - - return b.lastRefreshedAt -} - -func (b *Bucket) ResetRefreshedAt(newTime time.Time) { - b.lastRefreshedAtLk.Lock() - defer b.lastRefreshedAtLk.Unlock() - - b.lastRefreshedAt = newTime -} - func (b *Bucket) Peers() []peer.ID { b.lk.RLock() defer b.lk.RUnlock() diff --git a/table.go b/table.go index c4c7be7..c24361e 100644 --- a/table.go +++ b/table.go @@ -21,6 +21,17 @@ var log = logging.Logger("table") var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high") var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity") +// maxCplForRefresh is the maximum cpl we support for refresh. +// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now. +const maxCplForRefresh uint = 15 + +// CplRefresh contains a CPL(common prefix length) with the host & the last time +// we refreshed that cpl/searched for an ID which has that cpl with the host. +type CplRefresh struct { + Cpl uint + LastRefreshAt time.Time +} + // RoutingTable defines the routing table. type RoutingTable struct { // ID of the local peer @@ -39,6 +50,9 @@ type RoutingTable struct { Buckets []*Bucket bucketsize int + cplRefreshLk sync.RWMutex + cplRefreshedAt map[uint]time.Time + // notification functions PeerRemoved func(peer.ID) PeerAdded func(peer.ID) @@ -47,84 +61,71 @@ type RoutingTable struct { // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable { rt := &RoutingTable{ - Buckets: []*Bucket{newBucket()}, - bucketsize: bucketsize, - local: localID, - maxLatency: latency, - metrics: m, - PeerRemoved: func(peer.ID) {}, - PeerAdded: func(peer.ID) {}, + Buckets: []*Bucket{newBucket()}, + bucketsize: bucketsize, + local: localID, + maxLatency: latency, + metrics: m, + cplRefreshedAt: make(map[uint]time.Time), + PeerRemoved: func(peer.ID) {}, + PeerAdded: func(peer.ID) {}, } return rt } -// GetAllBuckets is safe to call as rt.Buckets is append-only -// caller SHOULD NOT modify the returned slice -func (rt *RoutingTable) GetAllBuckets() []*Bucket { - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() - return rt.Buckets -} +// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh. +// Caller is free to modify the returned slice as it is a defensive copy. +func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh { + rt.cplRefreshLk.RLock() + defer rt.cplRefreshLk.RUnlock() -// GenRandPeerID generates a random peerID in bucket=bucketID -func (rt *RoutingTable) GenRandPeerID(bucketID int) peer.ID { - if bucketID < 0 { - panic(fmt.Sprintf("bucketID %d is not non-negative", bucketID)) - } - rt.tabLock.RLock() - bucketLen := len(rt.Buckets) - rt.tabLock.RUnlock() + cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt)) - var targetCpl uint - if bucketID > (bucketLen - 1) { - targetCpl = uint(bucketLen) - 1 - } else { - targetCpl = uint(bucketID) + for c, t := range rt.cplRefreshedAt { + cpls = append(cpls, CplRefresh{c, t}) } - // We can only handle upto 16 bit prefixes - if targetCpl > 16 { - targetCpl = 16 + return cpls +} + +// GenRandPeerID generates a random peerID for a given Cpl +func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) { + if targetCpl > maxCplForRefresh { + return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh) } - var targetPrefix uint16 localPrefix := binary.BigEndian.Uint16(rt.local) - if targetCpl < 16 { - // For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B. - // Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L - // to our randomly generated prefix. - toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl) - randPrefix := uint16(rand.Uint32()) - - // Combine the toggled local prefix and the random bits at the correct offset - // such that ONLY the first `targetCpl` bits match the local ID. - mask := (^uint16(0)) << (16 - (targetCpl + 1)) - targetPrefix = (toggledLocalPrefix & mask) | (randPrefix & ^mask) - } else { - targetPrefix = localPrefix - } + + // For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B. + // Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L + // to our randomly generated prefix. + toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl) + randPrefix := uint16(rand.Uint32()) + + // Combine the toggled local prefix and the random bits at the correct offset + // such that ONLY the first `targetCpl` bits match the local ID. + mask := (^uint16(0)) << (16 - (targetCpl + 1)) + targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask) // Convert to a known peer ID. key := keyPrefixMap[targetPrefix] id := [34]byte{mh.SHA2_256, 32} binary.BigEndian.PutUint32(id[2:], key) - return peer.ID(id[:]) + return peer.ID(id[:]), nil } -// Returns the bucket for a given ID -// should NOT modify the peer list on the returned bucket -func (rt *RoutingTable) BucketForID(id ID) *Bucket { +// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID. +func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) { cpl := CommonPrefixLen(id, rt.local) - - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 + if uint(cpl) > maxCplForRefresh { + return } - return rt.Buckets[bucketID] + rt.cplRefreshLk.Lock() + defer rt.cplRefreshLk.Unlock() + + rt.cplRefreshedAt[uint(cpl)] = newTime } // Update adds or moves the given peer to the front of its respective bucket diff --git a/table_test.go b/table_test.go index 4067ea8..ce83aff 100644 --- a/table_test.go +++ b/table_test.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/test" pstore "github.com/libp2p/go-libp2p-peerstore" + "github.com/stretchr/testify/require" ) // Test basic features of the bucket struct @@ -53,48 +54,49 @@ func TestBucket(t *testing.T) { func TestGenRandPeerID(t *testing.T) { t.Parallel() - nBuckets := 21 local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) - // create nBuckets - for i := 0; i < nBuckets; i++ { - for { - if p := test.RandPeerIDFatal(t); CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == i { - rt.Update(p) - break - } - } + // generate above maxCplForRefresh fails + p, err := rt.GenRandPeerID(maxCplForRefresh + 1) + require.Error(t, err) + require.Empty(t, p) + + // test generate rand peer ID + for cpl := uint(0); cpl <= maxCplForRefresh; cpl++ { + peerID, err := rt.GenRandPeerID(cpl) + require.NoError(t, err) + + require.True(t, uint(CommonPrefixLen(ConvertPeerID(peerID), rt.local)) == cpl, "failed for cpl=%d", cpl) } +} - // test bucket for peer - peers := rt.ListPeers() - for _, p := range peers { - b := rt.BucketForID(ConvertPeerID(p)) - if !b.Has(p) { - t.Fatalf("bucket should have peers %s", p.String()) - } +func TestRefreshAndGetTrackedCpls(t *testing.T) { + t.Parallel() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + + // add cpl's for tracking + for cpl := uint(0); cpl < maxCplForRefresh; cpl++ { + peerID, err := rt.GenRandPeerID(cpl) + require.NoError(t, err) + rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now()) } - // test generate rand peer ID - for bucketID := 0; bucketID < nBuckets; bucketID++ { - peerID := rt.GenRandPeerID(bucketID) - - // for bucketID upto maxPrefixLen of 16, CPL should be Exactly bucketID - if bucketID < 16 { - if CommonPrefixLen(ConvertPeerID(peerID), rt.local) != bucketID { - t.Fatalf("cpl should be %d for bucket %d but got %d, generated peerID is %s", bucketID, bucketID, - CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID) - } - } else { - // from bucketID 16 onwards, CPL should be ATLEAST 16 - if CommonPrefixLen(ConvertPeerID(peerID), rt.local) < 16 { - t.Fatalf("cpl should be ATLEAST 16 for bucket %d but got %d, generated peerID is %s", bucketID, - CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID) - } - } + // fetch cpl's + trackedCpls := rt.GetTrackedCplsForRefresh() + require.Len(t, trackedCpls, int(maxCplForRefresh)) + actualCpls := make(map[uint]struct{}) + for i := 0; i < len(trackedCpls); i++ { + actualCpls[trackedCpls[i].Cpl] = struct{}{} + } + for i := uint(0); i < maxCplForRefresh; i++ { + _, ok := actualCpls[i] + require.True(t, ok, "tracked cpl's should have cpl %d", i) } }