Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

network: Kademlia Load Balancing #1774

Merged
merged 36 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0f31b5a
Proposed solution for peers load balancing in Kademlia
kortatu Sep 18, 2019
c1c9ac5
network: created global capabilityIndex
kortatu Sep 19, 2019
fc65d5a
typo
kortatu Sep 19, 2019
7dc9568
renamed globalIndex to defaultIndex
kortatu Sep 19, 2019
ca11c52
Load balancing capability test
kortatu Sep 20, 2019
97ab47e
Removed color balancing and using a KademliaLoadBalancer
kortatu Sep 25, 2019
f0fc99d
Missing file in commit
kortatu Sep 25, 2019
0941717
Merge branch 'master' into issue-1757
kortatu Sep 26, 2019
e9263d7
Fixed lint and test when mergin master
kortatu Sep 26, 2019
ed1f9a9
Subscription to peer changed closed only by writer, not by subscriptors
kortatu Sep 26, 2019
904e204
Added an alternative method for initializing a new peer count
kortatu Sep 27, 2019
29303f1
Merge branch 'master' into issue-1757
kortatu Sep 27, 2019
e53ae25
go fmt
kortatu Sep 27, 2019
f82db7f
extracted pubsub channel to a package
kortatu Sep 30, 2019
0e346e6
network: fixed pr comments
kortatu Oct 1, 2019
5665ea9
Merge branch 'master' into issue-1757
kortatu Oct 3, 2019
4c81a24
network/kademlia: Pr comments, tests commented and fixed
kortatu Oct 14, 2019
5b44ab3
network/kademlia: better naming for pub/sub channels in kademlialoadb…
kortatu Oct 14, 2019
52dacb0
Fixed wrong test in kademlia load balancer. Also fixed waiting methods.
kortatu Oct 15, 2019
3301920
Merge branch 'master' into issue-1757
kortatu Oct 15, 2019
49e7d09
Debug functions moved out of kademlialoadbalancer.go
kortatu Oct 15, 2019
ad5eac5
More comments, fixed EachConn po for peers in the same bin as the piv…
kortatu Oct 16, 2019
e78e2b3
resourceUseStats moved to a diffrente file. Use() renamed to AddUseCo…
kortatu Oct 16, 2019
21a0d2f
added gopubsub unit tests
kortatu Oct 29, 2019
c615cdd
Moved resource_use_stats to its own package. Renamed gopubsub to pubs…
kortatu Oct 29, 2019
340ce15
Pubsub now closes all go routines when closing. Removed commented code
kortatu Oct 29, 2019
96a8245
Merge branch 'master' into issue-1757
kortatu Oct 29, 2019
697b049
fix closing channel
kortatu Oct 29, 2019
5cb6d46
Added close channel for stopping blocked publishing goroutines
kortatu Oct 30, 2019
abc51ab
Added unit tests for pubsubchannel to check ongoing goroutines
kortatu Oct 30, 2019
9f951d1
Better accounting of pending messages/goroutines
kortatu Oct 30, 2019
d8f8059
Reverted metrics to regular int counters
kortatu Oct 30, 2019
c34062c
Removed testKademliaBackend, using testKademlia instead. Minor PR fixes
kortatu Oct 31, 2019
e7eefaf
PubSubChannel now publish messages semi-asynchronously filling first …
kortatu Nov 4, 2019
3019822
Exit publishing goroutine on psc.quitC close.
kortatu Nov 6, 2019
2879509
Avoid data race in getAllUseCounts
kortatu Nov 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 176 additions & 69 deletions network/kademlia.go

Large diffs are not rendered by default.

254 changes: 254 additions & 0 deletions network/kademlia_load_balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright 2019 The Swarm Authors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me that it would be nice to have kademlia lb i its own package. It even has KademliaBackend interface, which looks like a step in that direction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to a different package will be hard, as I am using these tests functions in network pacakge:

  • testKadPeerAddr
  • newTestKademlia

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, good design is not easy, and this is a sign that there is a problem with modularity. I do not insist, but I want to point to the red flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that the testing of kademlia is hard, maybe we should move those functions to a network/kademliatest so it can be used from elsewhere. And also, it seems that kademlia should be in its own package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree completely. This could be a good improvement in the future.

// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package network

import (
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network/pubsubchannel"
"github.com/ethersphere/swarm/network/resourceusestats"
)

// KademliaBackend is the required interface of KademliaLoadBalancer.
type KademliaBackend interface {
SubscribeToPeerChanges() (addedSub *pubsubchannel.Subscription, removedPeerSub *pubsubchannel.Subscription)
BaseAddr() []byte
EachBinDesc(base []byte, minProximityOrder int, consumer PeerBinConsumer)
EachBinDescFiltered(base []byte, capKey string, minProximityOrder int, consumer PeerBinConsumer) error
EachConn(base []byte, o int, f func(*Peer, int) bool)
}

// Creates a new KademliaLoadBalancer from a KademliaBackend.
// If useNearestNeighbourInit is true the nearest neighbour peer use count will be used when a peer is initialized.
// If not, least used peer use count in same bin as new peer will be used. It is not clear which one is better, when
// this load balancer would be used in several use cases we could do take some decision.
func NewKademliaLoadBalancer(kademlia KademliaBackend, useNearestNeighbourInit bool) *KademliaLoadBalancer {
onPeerSub, offPeerSub := kademlia.SubscribeToPeerChanges()
quitC := make(chan struct{})
klb := &KademliaLoadBalancer{
kademlia: kademlia,
resourceUseStats: resourceusestats.NewResourceUseStats(quitC),
onPeerSub: onPeerSub,
offPeerSub: offPeerSub,
quitC: quitC,
}
if useNearestNeighbourInit {
klb.initCountFunc = klb.nearestNeighbourUseCount
} else {
klb.initCountFunc = klb.leastUsedCountInBin
}

go klb.listenNewPeers()
go klb.listenOffPeers()
return klb
}

// Consumer functions. A consumer is a function that uses an element returned by an iterator. It usually also returns
// a boolean signaling if it wants to iterate more or not. We created an alias for consumer function (LBBinConsumer)
// for code clarity.

// An LBPeer represents a peer with a AddUseCount() function to signal that the peer has been used in order
// to account it for LB sorting criteria.
type LBPeer struct {
Peer *Peer
stats *resourceusestats.ResourceUseStats
}

// AddUseCount is called to account a use for these peer. Should be called if the peer is actually used.
func (lbPeer LBPeer) AddUseCount() {
kortatu marked this conversation as resolved.
Show resolved Hide resolved
lbPeer.stats.AddUse(lbPeer.Peer)
}

// LBBin represents a Bin of LBPeer's
type LBBin struct {
LBPeers []LBPeer
acud marked this conversation as resolved.
Show resolved Hide resolved
ProximityOrder int
}

// LBBinConsumer will be provided with a list of LBPeer's in LB criteria ordering (currently in least used ordering).
// Should return true if it must continue iterating LBBin's or stops if false.
type LBBinConsumer func(bin LBBin) bool

// KademliaLoadBalancer tries to balance request to the peers in Kademlia returning the peers sorted
// by least recent used whenever several will be returned with the same po to a particular address.
// The user of KademliaLoadBalancer should signal if the returned element (LBPeer) has been used with the
// function lbPeer.AddUseCount()
type KademliaLoadBalancer struct {
kademlia KademliaBackend // kademlia to obtain bins of peers
resourceUseStats *resourceusestats.ResourceUseStats // a resourceUseStats to count uses
onPeerSub *pubsubchannel.Subscription // a pubsub channel to be notified of new peers in kademlia
offPeerSub *pubsubchannel.Subscription // a pubsub channel to be notified of removed peers in kademlia
quitC chan struct{}

initCountFunc func(peer *Peer, po int) int //Function to use for initializing a new peer count
}

// Stop unsubscribe from notifiers
func (klb *KademliaLoadBalancer) Stop() {
klb.onPeerSub.Unsubscribe()
klb.offPeerSub.Unsubscribe()
close(klb.quitC)
}

// EachBinNodeAddress calls EachBinDesc with the base address of kademlia (the node address)
func (klb *KademliaLoadBalancer) EachBinNodeAddress(consumeBin LBBinConsumer) {
klb.EachBinDesc(klb.kademlia.BaseAddr(), consumeBin)
}

// EachBinFiltered returns all bins in descending order from the perspective of base address.
// Only peers with the provided capabilities capKey are considered.
// All peers in that bin will be provided to the LBBinConsumer sorted by least used first.
func (klb *KademliaLoadBalancer) EachBinFiltered(base []byte, capKey string, consumeBin LBBinConsumer) error {
return klb.kademlia.EachBinDescFiltered(base, capKey, 0, func(peerBin *PeerBin) bool {
peers := klb.peerBinToPeerList(peerBin)
return consumeBin(LBBin{LBPeers: peers, ProximityOrder: peerBin.ProximityOrder})
})
}

// EachBinDesc returns all bins in descending order from the perspective of base address.
// All peers in that bin will be provided to the LBBinConsumer sorted by least used first.
func (klb *KademliaLoadBalancer) EachBinDesc(base []byte, consumeBin LBBinConsumer) {
klb.kademlia.EachBinDesc(base, 0, func(peerBin *PeerBin) bool {
peers := klb.peerBinToPeerList(peerBin)
return consumeBin(LBBin{LBPeers: peers, ProximityOrder: peerBin.ProximityOrder})
})
}

func (klb *KademliaLoadBalancer) peerBinToPeerList(bin *PeerBin) []LBPeer {
resources := make([]resourceusestats.Resource, bin.Size)
var i int
bin.PeerIterator(func(entry *entry) bool {
resources[i] = entry.conn
i++
return true
})
return klb.resourcesToLbPeers(resources)
}

func (klb *KademliaLoadBalancer) resourcesToLbPeers(resources []resourceusestats.Resource) []LBPeer {
sorted := klb.resourceUseStats.SortResources(resources)
peers := klb.toLBPeers(sorted)
return peers
}

func (klb *KademliaLoadBalancer) listenNewPeers() {
for {
select {
case <-klb.quitC:
return
case msg, ok := <-klb.onPeerSub.ReceiveChannel():
if !ok {
log.Warn("listenNewPeers closed channel, finishing subscriber to new peer")
return
}
signal, ok := msg.(newPeerSignal)
if !ok {
log.Warn("listenNewPeers received message is not a new peer signal")
continue
}
klb.addedPeer(signal.peer, signal.po)
}
}
}

func (klb *KademliaLoadBalancer) listenOffPeers() {
for {
select {
case <-klb.quitC:
return
case msg := <-klb.offPeerSub.ReceiveChannel():
kortatu marked this conversation as resolved.
Show resolved Hide resolved
peer, ok := msg.(*Peer)
if peer == nil {
log.Warn("nil peer received listening for off peers. Ignoring.")
continue
}
if !ok {
log.Warn("unexpected message received listening for off peers. Ignoring.")
continue
}
klb.removedPeer(peer)
}
}
}

// addedPeer is called back when a new peer is added to the kademlia. Its uses will be initialized
// to the use count of the least used peer in its bin. The po of the new peer is passed to avoid having
// to calculate it again.
func (klb *KademliaLoadBalancer) addedPeer(peer *Peer, po int) {
initCount := klb.initCountFunc(peer, 0)
log.Debug("Adding peer", "key", peer.Label(), "initCount", initCount)
klb.resourceUseStats.InitKey(peer.Key(), initCount)
}

// leastUsedCountInBin returns the use count for the least used peer in this bin excluding the excludePeer.
func (klb *KademliaLoadBalancer) leastUsedCountInBin(excludePeer *Peer, po int) int {
addr := klb.kademlia.BaseAddr()
peersInSamePo := klb.getPeersForPo(addr, po)
idx := 0
leastUsedCount := 0
for idx < len(peersInSamePo) {
kortatu marked this conversation as resolved.
Show resolved Hide resolved
leastUsed := peersInSamePo[idx]
if leastUsed.Peer.Key() != excludePeer.Key() {
leastUsedCount = klb.resourceUseStats.GetUses(leastUsed.Peer)
log.Debug("Least used peer is", "peer", leastUsed.Peer.Label(), "leastUsedCount", leastUsedCount)
break
}
idx++
}
return leastUsedCount
}

// nearestNeighbourUseCount returns the use count for the closest peer count.
func (klb *KademliaLoadBalancer) nearestNeighbourUseCount(newPeer *Peer, _ int) int {
var count int
klb.kademlia.EachConn(newPeer.Address(), 255, func(peer *Peer, po int) bool {
if peer != newPeer {
kortatu marked this conversation as resolved.
Show resolved Hide resolved
count = klb.resourceUseStats.GetUses(peer)
log.Debug("Nearest neighbour is", "peer", peer.Label(), "count", count)
return false
}
return true
})
return count
}

func (klb *KademliaLoadBalancer) removedPeer(peer *Peer) {
klb.resourceUseStats.RemoveResource(peer)
kortatu marked this conversation as resolved.
Show resolved Hide resolved
}

func (klb *KademliaLoadBalancer) toLBPeers(resources []resourceusestats.Resource) []LBPeer {
acud marked this conversation as resolved.
Show resolved Hide resolved
peers := make([]LBPeer, len(resources))
for i, res := range resources {
peer := res.(*Peer)
peers[i].Peer = peer
peers[i].stats = klb.resourceUseStats
}
return peers
}

func (klb *KademliaLoadBalancer) getPeersForPo(base []byte, po int) []LBPeer {
resources := make([]resourceusestats.Resource, 0)
klb.kademlia.EachBinDesc(base, po, func(bin *PeerBin) bool {
if bin.ProximityOrder == po {
return bin.PeerIterator(func(entry *entry) bool {
resources = append(resources, entry.conn)
return true
})
} else {
return true
}
})
return klb.resourcesToLbPeers(resources)
}
Loading