Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network: rewrite of peer suggestion engine, fix skipped tests #18404

Merged
merged 8 commits into from
Jan 17, 2019
220 changes: 133 additions & 87 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,85 +168,118 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
return nil
}

// SuggestPeer returns a known peer for the lowest proximity bin for the
// lowest bincount below depth
// naturally if there is an empty row it returns a peer for that
func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) {
// SuggestPeer returns an unconnected peer address as a peer suggestion for connection
func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
k.lock.Lock()
defer k.lock.Unlock()
minsize := k.MinBinSize
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
// if there is a callable neighbour within the current proxBin, connect
// this makes sure nearest neighbour set is fully connected
var ppo int
k.addrs.EachNeighbour(k.base, Pof, func(val pot.Val, po int) bool {
if po < depth {
return false
radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote against radius since it suggestive of a range of values instead of a concrete value and I fail to see the immediate meaning of this term

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i would call i t neighbourhoodDepth and the current depth i''d call saturationDepth of degree 1 etc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still don't understand what radius stands for

// collect undersaturated bins in ascending order of number of connected peers
// and from shallow to deep (ascending order of PO)
// insert them in a map of bin arrays, keyed with the number of connected peers
saturation := make(map[int][]int)
var lastPO int // the last non-empty PO bin in the iteration
saturationDepth = -1 // the deepest PO such that all shallower bins have >= k.MinBinSize peers
var pastDepth bool // whether po of iteration >= depth
k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
// process skipped empty bins
zelig marked this conversation as resolved.
Show resolved Hide resolved
for ; lastPO < po; lastPO++ {
// find the lowest unsaturated bin
if saturationDepth == -1 {
saturationDepth = lastPO
}
// if there is an empty bin, depth is surely passed
pastDepth = true
saturation[0] = append(saturation[0], lastPO)
}
e := val.(*entry)
c := k.callable(e)
if c {
a = e.BzzAddr
lastPO = po + 1
// past radius, depth is surely passed
if po >= radius {
pastDepth = true
}
ppo = po
return !c
})
if a != nil {
log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo))
return a, 0, false
}

var bpo []int
prev := -1
k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
prev++
for ; prev < po; prev++ {
bpo = append(bpo, prev)
minsize = 0
// beyond depth the bin is treated as unsaturated even if size >= k.MinBinSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is saturation a term relevant outside of depth at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes if depth is defined as no empty bin and saturation as minimum MinBinSize peers

// in order to achieve full connectivity to all neighbours
if pastDepth && size >= k.MinBinSize {
size = k.MinBinSize - 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big fan of changing parameter values

}
if size < minsize {
bpo = append(bpo, po)
minsize = size
// process non-empty unsaturated bins
if size < k.MinBinSize {
// find the lowest unsaturated bin
if saturationDepth == -1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so saturation depth == the lowest unsaturated bin? how does this differ from neighbourhood depth again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depth should really be called saturation depth of degree 1,
here the saturation depth is of degree MinBinSize.
And neighbourhood radius should be called neighbourhood depth.
my 2c

saturationDepth = po
}
saturation[size] = append(saturation[size], po)
}
return size > 0 && po < depth
return true
})
// to trigger peer requests for peers closer than closest connection, include
// all bins from nearest connection upto nearest address as unsaturated
var nearestAddrAt int
k.addrs.EachNeighbour(k.base, Pof, func(_ pot.Val, po int) bool {
nearestAddrAt = po
return false
})
// all buckets are full, ie., minsize == k.MinBinSize
if len(bpo) == 0 {
// including bins as size 0 has the effect that requesting connection
// is prioritised over non-empty shallower bins
for ; lastPO <= nearestAddrAt; lastPO++ {
saturation[0] = append(saturation[0], lastPO)
}
// all PO bins are saturated, ie., minsize >= k.MinBinSize, no peer suggested
if len(saturation) == 0 {
return nil, 0, false
}
// as long as we got candidate peers to connect to
// dont ask for new peers (want = false)
// try to select a candidate peer
// find the first callable peer
nxt := bpo[0]
k.addrs.EachBin(k.base, Pof, nxt, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
// for each bin (up until depth) we find callable candidate peers
if po >= depth {
return false
// find the first callable peer in the address book
// starting from the bins with smallest size proceeding from shallow to deep
// for each bin (up until neighbourhood radius) we find callable candidate peers
for size := 0; size < k.MinBinSize && suggestedPeer == nil; size++ {
bins, ok := saturation[size]
if !ok {
// no bin with this size
continue
}
return f(func(val pot.Val) bool {
e := val.(*entry)
c := k.callable(e)
if c {
a = e.BzzAddr
cur := 0
curPO := bins[0]
k.addrs.EachBin(k.base, Pof, curPO, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
curPO = bins[cur]
// find the next bin that has size size
if curPO == po {
cur++
} else {
// skip bins that have no addresses
for ; cur < len(bins) && curPO < po; cur++ {
curPO = bins[cur]
}
if po < curPO {
zelig marked this conversation as resolved.
Show resolved Hide resolved
cur--
return true
}
// stop if there are no addresses
if curPO < po {
zelig marked this conversation as resolved.
Show resolved Hide resolved
return false
}
}
return !c
// curPO found
// find a callable peer out of the addresses in the unsaturated bin
// stop if found
f(func(val pot.Val) bool {
e := val.(*entry)
if k.callable(e) {
suggestedPeer = e.BzzAddr
return false
}
return true
})
return cur < len(bins) && suggestedPeer == nil
})
})
// found a candidate
if a != nil {
return a, 0, false
}
// no candidate peer found, request for the short bin
var changed bool
if uint8(nxt) < k.depth {
k.depth = uint8(nxt)
changed = true

if uint8(saturationDepth) < k.depth {
k.depth = uint8(saturationDepth)
return suggestedPeer, saturationDepth, true
}
return a, nxt, changed
return suggestedPeer, 0, false
}

// On inserts the peer as a kademlia peer into the live peers
// On inserts the peer as a kademlia peer into the live peers
func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.lock.Lock()
defer k.lock.Unlock()
Expand Down Expand Up @@ -398,29 +431,25 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
})
}

// NeighbourhoodDepth returns the depth for the pot, see depthForPot
func (k *Kademlia) NeighbourhoodDepth() (depth int) {
k.lock.RLock()
defer k.lock.RUnlock()
return depthForPot(k.conns, k.NeighbourhoodSize, k.base)
}

// depthForPot returns the proximity order that defines the distance of
// the nearest neighbour set with cardinality >= NeighbourhoodSize
// if there is altogether less than NeighbourhoodSize peers it returns 0
// neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia
// neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize
// i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether
// contain at least neighbourhoodSize connected peers
// if there is altogether less than neighbourhoodSize peers connected, it returns 0
// caller must hold the lock
func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
func neighbourhoodRadiusForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
if p.Size() <= neighbourhoodSize {
return 0
}

// total number of peers in iteration
var size int

// determining the depth is a two-step process
// first we find the proximity bin of the shallowest of the NeighbourhoodSize peers
// the numeric value of depth cannot be higher than this
var maxDepth int

f := func(v pot.Val, i int) bool {
// po == 256 means that addr is the pivot address(self)
if i == 256 {
Expand All @@ -431,13 +460,30 @@ func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int
// this means we have all nn-peers.
// depth is by default set to the bin of the farthest nn-peer
if size == neighbourhoodSize {
maxDepth = i
depth = i
return false
}

return true
}
p.EachNeighbour(pivotAddr, Pof, f)
return depth
}

// depthForPot returns the depth for the pot
zelig marked this conversation as resolved.
Show resolved Hide resolved
// depth is the radius of the minimal extension of nearest neighbourhood that
// includes all empty PO bins. I.e., depth is the deepest PO such that
// - it is not deeper than neighbourhood radius
// - all bins shallower than depth are not empty
// caller must hold the lock
func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
if p.Size() <= neighbourhoodSize {
return 0
}
// determining the depth is a two-step process
// first we find the proximity bin of the shallowest of the neighbourhoodSize peers
// the numeric value of depth cannot be higher than this
maxDepth := neighbourhoodRadiusForPot(p, neighbourhoodSize, pivotAddr)

// the second step is to test for empty bins in order from shallowest to deepest
// if an empty bin is found, this will be the actual depth
Expand Down Expand Up @@ -627,23 +673,20 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot {
return ppmap
}

// saturation iterates through all peers and
// returns the smallest po value in which the node has less than n peers
// if the iterator reaches depth, then value for depth is returned
// TODO move to separate testing tools file
// TODO this function will stop at the first bin with less than MinBinSize peers, even if there are empty bins between that bin and the depth. This may not be correct behavior
// saturation returns the smallest po value in which the node has less than MinBinSize peers
// if the iterator reaches neighbourhood radius, then the last bin + 1 is returned
func (k *Kademlia) saturation() int {
prev := -1
k.addrs.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
prev++
if po >= radius {
return false
}
return prev == po && size >= k.MinBinSize
})
// TODO evaluate whether this check cannot just as well be done within the eachbin
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)

// if in the iterator above we iterated deeper than the neighbourhood depth - return depth
if depth < prev {
return depth
if prev < 0 {
return 0
}
return prev
}
Expand Down Expand Up @@ -745,6 +788,9 @@ type Health struct {
func (k *Kademlia) Healthy(pp *PeerPot) *Health {
k.lock.RLock()
defer k.lock.RUnlock()
if len(pp.NNSet) < k.NeighbourhoodSize {
log.Warn("peerpot NNSet < NeighbourhoodSize")
}
gotnn, countgotnn, culpritsgotnn := k.connectedNeighbours(pp.NNSet)
knownn, countknownn, culpritsknownn := k.knowNeighbours(pp.NNSet)
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
Expand Down
Loading