Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kademlia oversaturation based on an exponential decay formula #2530

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions pkg/topology/kademlia/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
package kademlia

var (
TimeToRetry = &timeToRetry
QuickSaturationPeers = &quickSaturationPeers
SaturationPeers = &saturationPeers
OverSaturationPeers = &overSaturationPeers
BootnodeOverSaturationPeers = &bootNodeOverSaturationPeers
LowWaterMark = &nnLowWatermark
PruneOversaturatedBinsFunc = func(k *Kad) func(uint8) {
TimeToRetry = &timeToRetry
QuickSaturationPeers = &quickSaturationPeers
SaturationPeers = &saturationPeers
LowWaterMark = &nnLowWatermark
PruneOversaturatedBinsFunc = func(k *Kad) func(uint8) {
return k.pruneOversaturatedBins
}
GenerateCommonBinPrefixes = generateCommonBinPrefixes
PeerPingPollTime = &peerPingPollTime
DefaultOverSaturationCalc = defaultOverSaturationCalc
)

type PeerFilterFunc = peerFilterFunc
177 changes: 94 additions & 83 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"net"
"sync"
Expand Down Expand Up @@ -47,15 +48,14 @@ const (
)

var (
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
quickSaturationPeers = 4
saturationPeers = 8
overSaturationPeers = 20
bootNodeOverSaturationPeers = 20
shortRetry = 30 * time.Second
timeToRetry = 2 * shortRetry
broadcastBinSize = 4
peerPingPollTime = 10 * time.Second // how often to ping a peer
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
quickSaturationPeers = 4
saturationPeers = 8
shortRetry = 30 * time.Second
timeToRetry = 2 * shortRetry
broadcastBinSize = 4
peerPingPollTime = 10 * time.Second // how often to ping a peer
maxFirstbin = float64(50)
)

var (
Expand All @@ -71,56 +71,59 @@ type (
pruneFunc func(depth uint8)
staticPeerFunc func(peer swarm.Address) bool
peerFilterFunc func(peer swarm.Address) bool
overSaturationCalc func(depth uint8) int
)

var noopSanctionedPeerFn = func(_ swarm.Address) bool { return false }

// Options for injecting services to Kademlia.
type Options struct {
SaturationFunc binSaturationFunc
Bootnodes []ma.Multiaddr
BootnodeMode bool
BitSuffixLength int
PruneFunc pruneFunc
StaticNodes []swarm.Address
ReachabilityFunc peerFilterFunc
SaturationFunc binSaturationFunc
Bootnodes []ma.Multiaddr
BootnodeMode bool
BitSuffixLength int
PruneFunc pruneFunc
StaticNodes []swarm.Address
ReachabilityFunc peerFilterFunc
OverSaturationCalc overSaturationCalc
}

// Kad is the Swarm forwarding kademlia implementation.
type Kad struct {
base swarm.Address // this node's overlay address
discovery discovery.Driver // the discovery driver
addressBook addressbook.Interface // address book to get underlays
p2p p2p.Service // p2p service to connect to nodes with
saturationFunc binSaturationFunc // pluggable saturation function
bitSuffixLength int // additional depth of common prefix for bin
commonBinPrefixes [][]swarm.Address // list of address prefixes for each bin
connectedPeers *pslice.PSlice // a slice of peers sorted and indexed by po, indexes kept in `bins`
knownPeers *pslice.PSlice // both are po aware slice of addresses
bootnodes []ma.Multiaddr
depth uint8 // current neighborhood depth
radius uint8 // storage area of responsibility
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
peerSig []chan struct{}
peerSigMtx sync.Mutex
logger logging.Logger // logger
bootnode bool // indicates whether the node is working in bootnode mode
collector *im.Collector
quit chan struct{} // quit channel
halt chan struct{} // halt channel
done chan struct{} // signal that `manage` has quit
wg sync.WaitGroup
waitNext *waitnext.WaitNext
metrics metrics
pruneFunc pruneFunc // pluggable prune function
pinger pingpong.Interface
staticPeer staticPeerFunc
bgBroadcastCtx context.Context
bgBroadcastCancel context.CancelFunc
blocker *blocker.Blocker
reachability p2p.ReachabilityStatus
peerFilter peerFilterFunc
base swarm.Address // this node's overlay address
discovery discovery.Driver // the discovery driver
addressBook addressbook.Interface // address book to get underlays
p2p p2p.Service // p2p service to connect to nodes with
saturationFunc binSaturationFunc // pluggable saturation function
bitSuffixLength int // additional depth of common prefix for bin
commonBinPrefixes [][]swarm.Address // list of address prefixes for each bin
connectedPeers *pslice.PSlice // a slice of peers sorted and indexed by po, indexes kept in `bins`
knownPeers *pslice.PSlice // both are po aware slice of addresses
bootnodes []ma.Multiaddr
depth uint8 // current neighborhood depth
radius uint8 // storage area of responsibility
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
peerSig []chan struct{}
peerSigMtx sync.Mutex
logger logging.Logger // logger
bootnode bool // indicates whether the node is working in bootnode mode
collector *im.Collector
quit chan struct{} // quit channel
halt chan struct{} // halt channel
done chan struct{} // signal that `manage` has quit
wg sync.WaitGroup
waitNext *waitnext.WaitNext
metrics metrics
pruneFunc pruneFunc // pluggable prune function
pinger pingpong.Interface
staticPeer staticPeerFunc
bgBroadcastCtx context.Context
bgBroadcastCancel context.CancelFunc
blocker *blocker.Blocker
reachability p2p.ReachabilityStatus
peerFilter peerFilterFunc
overSaturationCalc overSaturationCalc
}

// New returns a new Kademlia.
Expand All @@ -134,48 +137,50 @@ func New(
logger logging.Logger,
o Options,
) (*Kad, error) {
if o.SaturationFunc == nil {
os := overSaturationPeers
if o.BootnodeMode {
os = bootNodeOverSaturationPeers
}
o.SaturationFunc = binSaturated(os, isStaticPeer(o.StaticNodes))

start := time.Now()

if o.OverSaturationCalc == nil {
o.OverSaturationCalc = defaultOverSaturationCalc
}

o.SaturationFunc = binSaturated(o.OverSaturationCalc, isStaticPeer(o.StaticNodes))

if o.BitSuffixLength == 0 {
o.BitSuffixLength = defaultBitSuffixLength
}

start := time.Now()
imc, err := im.NewCollector(metricsDB)
if err != nil {
return nil, err
}
logger.Debugf("kademlia: NewCollector(...) took %v", time.Since(start))

k := &Kad{
base: base,
discovery: discovery,
addressBook: addressbook,
p2p: p2pSvc,
saturationFunc: o.SaturationFunc,
bitSuffixLength: o.BitSuffixLength,
commonBinPrefixes: make([][]swarm.Address, int(swarm.MaxBins)),
connectedPeers: pslice.New(int(swarm.MaxBins), base),
knownPeers: pslice.New(int(swarm.MaxBins), base),
bootnodes: o.Bootnodes,
manageC: make(chan struct{}, 1),
waitNext: waitnext.New(),
logger: logger,
bootnode: o.BootnodeMode,
collector: imc,
quit: make(chan struct{}),
halt: make(chan struct{}),
done: make(chan struct{}),
metrics: newMetrics(),
pruneFunc: o.PruneFunc,
pinger: pinger,
staticPeer: isStaticPeer(o.StaticNodes),
peerFilter: o.ReachabilityFunc,
base: base,
discovery: discovery,
addressBook: addressbook,
p2p: p2pSvc,
saturationFunc: o.SaturationFunc,
bitSuffixLength: o.BitSuffixLength,
commonBinPrefixes: make([][]swarm.Address, int(swarm.MaxBins)),
connectedPeers: pslice.New(int(swarm.MaxBins), base),
knownPeers: pslice.New(int(swarm.MaxBins), base),
bootnodes: o.Bootnodes,
manageC: make(chan struct{}, 1),
waitNext: waitnext.New(),
logger: logger,
bootnode: o.BootnodeMode,
collector: imc,
quit: make(chan struct{}),
halt: make(chan struct{}),
done: make(chan struct{}),
metrics: newMetrics(),
pruneFunc: o.PruneFunc,
pinger: pinger,
staticPeer: isStaticPeer(o.StaticNodes),
peerFilter: o.ReachabilityFunc,
overSaturationCalc: o.OverSaturationCalc,
}

blocklistCallback := func(a swarm.Address) {
Expand Down Expand Up @@ -203,6 +208,12 @@ func New(
return k, nil
}

// produces oversaturation values using an exponential decay formula, sequence is as follows:
// bin 0 -> 50, 44, 39, 34, 30, 27
func defaultOverSaturationCalc(bin uint8) int {
return int(math.Round(maxFirstbin * math.Exp(-float64(bin)/8.0)))
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the 50 and the 8.0 be constants declared above? Otherwise this completely hides the actual bin limits inside this function.

type peerConnInfo struct {
po uint8
addr swarm.Address
Expand Down Expand Up @@ -636,13 +647,13 @@ func (k *Kad) pruneOversaturatedBins(depth uint8) {
}

binPeersCount := k.connectedPeers.BinSize(uint8(i))
if binPeersCount < overSaturationPeers {
if binPeersCount < k.overSaturationCalc(uint8(i)) {
continue
}

binPeers := k.connectedPeers.BinPeers(uint8(i))

peersToRemove := binPeersCount - overSaturationPeers
peersToRemove := binPeersCount - k.overSaturationCalc(uint8(i))

for j := 0; peersToRemove > 0 && j < len(k.commonBinPrefixes[i]); j++ {

Expand Down Expand Up @@ -802,7 +813,7 @@ func (k *Kad) connectBootNodes(ctx context.Context) {
// binSaturated indicates whether a certain bin is saturated or not.
// when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin.
func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSaturationFunc {
func binSaturated(f overSaturationCalc, staticNode staticPeerFunc) binSaturationFunc {
return func(bin uint8, peers, connected *pslice.PSlice, filter peerFilterFunc) (bool, bool) {
potentialDepth := recalcDepth(peers, swarm.MaxPO, filter)

Expand All @@ -826,7 +837,7 @@ func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSatura
return false, false, nil
})

return size >= saturationPeers, size >= oversaturationAmount
return size >= saturationPeers, size >= f(bin)
}
}

Expand Down
Loading