Skip to content

Commit

Permalink
fix: dont consider protected nodes for saturation
Browse files Browse the repository at this point in the history
  • Loading branch information
alok committed Sep 20, 2021
1 parent 9774b55 commit 0b518bc
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 35 deletions.
53 changes: 22 additions & 31 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type (
binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool)
sanctionedPeerFunc func(peer swarm.Address) bool
pruneFunc func(depth uint8)
staticPeerFunc func(peer swarm.Address) bool
)

var noopSanctionedPeerFn = func(_ swarm.Address) bool { return false }
Expand Down Expand Up @@ -101,7 +102,7 @@ type Kad struct {
waitNext *waitnext.WaitNext
metrics metrics
pruneFunc pruneFunc // pluggable prune function
staticNodes []swarm.Address
staticPeer staticPeerFunc
}

// New returns a new Kademlia.
Expand All @@ -119,7 +120,7 @@ func New(
if o.BootnodeMode {
os = bootNodeOverSaturationPeers
}
o.SaturationFunc = binSaturated(os)
o.SaturationFunc = binSaturated(os, isStaticPeer(o.StaticNodes))
}
if o.BitSuffixLength == 0 {
o.BitSuffixLength = defaultBitSuffixLength
Expand Down Expand Up @@ -154,7 +155,7 @@ func New(
wg: sync.WaitGroup{},
metrics: newMetrics(),
pruneFunc: o.PruneFunc,
staticNodes: o.StaticNodes,
staticPeer: isStaticPeer(o.StaticNodes),
}

if k.pruneFunc == nil {
Expand Down Expand Up @@ -650,7 +651,7 @@ func (k *Kad) connectBootNodes(ctx context.Context) {
// binSaturated indicates whether a certain bin is saturated or not.
// when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin.
func binSaturated(oversaturationAmount int) binSaturationFunc {
func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSaturationFunc {
return func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
potentialDepth := recalcDepth(peers, swarm.MaxPO)

Expand All @@ -667,8 +668,8 @@ func binSaturated(oversaturationAmount int) binSaturationFunc {
// gaps measurement)

size := 0
_ = connected.EachBin(func(_ swarm.Address, po uint8) (bool, bool, error) {
if po == bin {
_ = connected.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) {
if po == bin && !staticNode(addr) {
size++
}
return false, false, nil
Expand Down Expand Up @@ -874,13 +875,16 @@ func (k *Kad) Pick(peer p2p.Peer) bool {
return false
}

func (k *Kad) isProtected(overlay swarm.Address) bool {
for _, addr := range k.staticNodes {
if addr.Equal(overlay) {
return true
func isStaticPeer(staticNodes []swarm.Address) func(overlay swarm.Address) bool {
return func(overlay swarm.Address) bool {
for _, addr := range staticNodes {
if addr.Equal(overlay) {
return true
}
}
return false

}
return false
}

// Connected is called when a peer has dialed in.
Expand All @@ -892,15 +896,13 @@ func (k *Kad) Connected(ctx context.Context, peer p2p.Peer, forceConnection bool
if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated {
if k.bootnode {
randPeer, err := k.randomPeer(po)
// For the rare case where we have an oversaturated bin with all protected peers
// we still want to be able to connect to this peer but we cannot kick anyone out
if err != nil && !errors.Is(err, topology.ErrProtectedOversaturation) {
if err != nil {
return fmt.Errorf("failed to get random peer to kick-out: %w", err)
} else if err == nil {
_ = k.p2p.Disconnect(randPeer, "kicking out random peer to accommodate node")
return k.onConnected(ctx, address)
}
} else if !forceConnection {
_ = k.p2p.Disconnect(randPeer, "kicking out random peer to accommodate node")
return k.onConnected(ctx, address)
}
if !forceConnection {
return topology.ErrOversaturated
}
}
Expand Down Expand Up @@ -1352,29 +1354,18 @@ func randomSubset(addrs []swarm.Address, count int) ([]swarm.Address, error) {

func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) {
peers := k.connectedPeers.BinPeers(bin)
// The following case should ideally never happen as this function is used to find a
// random peer to kick out and this is required only when the bin is oversaturated.
// Only reason why this would happen is if usage of randomPeer is wrong or somehow
// we manage to disconnect from oversaturation amount of peers from the time we check the
// bin and call this function.
if len(peers) == 0 {
return swarm.ZeroAddress, errEmptyBin
}

for idx := 0; idx < len(peers); {
// do not consider protected peers
if k.isProtected(peers[idx]) {
if k.staticPeer(peers[idx]) {
peers = append(peers[:idx], peers[idx+1:]...)
continue
}
idx++
}

if len(peers) == 0 {
// For the rare case when we are oversaturated with protected nodes in a single
// bin, we will return a different error and caller can handle this in whichever
// way he chooses
return swarm.ZeroAddress, topology.ErrProtectedOversaturation
return swarm.ZeroAddress, errEmptyBin
}

rndIndx, err := random.Int(random.Reader, big.NewInt(int64(len(peers))))
Expand Down
7 changes: 3 additions & 4 deletions pkg/topology/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
)

var (
ErrNotFound = errors.New("no peer found")
ErrWantSelf = errors.New("node wants self")
ErrOversaturated = errors.New("oversaturated")
ErrProtectedOversaturation = errors.New("oversaturated with protected nodes")
ErrNotFound = errors.New("no peer found")
ErrWantSelf = errors.New("node wants self")
ErrOversaturated = errors.New("oversaturated")
)

type Driver interface {
Expand Down

0 comments on commit 0b518bc

Please sign in to comment.