Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
Using gopubsub module to manage peeer changes subscriptions in kademlia
Browse files Browse the repository at this point in the history
  • Loading branch information
kortatu committed Sep 27, 2019
1 parent e53ae25 commit 57c000c
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 100 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/json-iterator/go v1.1.7 // indirect
github.com/karalabe/usb v0.0.0-20190819132248-550797b1cad8 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kortatu/gopubsub v0.0.0-20190927091405-5f16821b4792
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.2
github.com/mattn/go-isatty v0.0.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGi
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kortatu/gopubsub v0.0.0-20190927091405-5f16821b4792 h1:HzwY0rWW3umSl3YyD0tVmvCyakggj2ee2xKAk5Qx7So=
github.com/kortatu/gopubsub v0.0.0-20190927091405-5f16821b4792/go.mod h1:SfND/OENVorRd/oSO7XfkZTBMqnxvjQp/Pa4k1/xSJo=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
92 changes: 24 additions & 68 deletions network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/ethersphere/swarm/network/capability"
"github.com/ethersphere/swarm/pot"
sv "github.com/ethersphere/swarm/version"

"github.com/kortatu/gopubsub"
)

/*
Expand Down Expand Up @@ -91,14 +93,16 @@ func NewKadParams() *KadParams {
type Kademlia struct {
lock sync.RWMutex
capabilityIndex map[string]*capabilityIndex
defaultIndex *capabilityIndex // Index with pots
*KadParams // Kademlia configuration parameters
base []byte // immutable baseaddress of the table
depth uint8 // stores the last current depth of saturation
nDepth int // stores the last neighbourhood depth
nDepthMu sync.RWMutex // protects neighbourhood depth nDepth
nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed
peerChangedSig []peerSubscription // signals when new peer is added or removed
defaultIndex *capabilityIndex // Index with pots
*KadParams // Kademlia configuration parameters
base []byte // immutable baseaddress of the table
depth uint8 // stores the last current depth of saturation
nDepth int // stores the last neighbourhood depth
nDepthMu sync.RWMutex // protects neighbourhood depth nDepth
nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed

newPeerPubSub *gopubsub.PubSubChannel
removedPeerPubSub *gopubsub.PubSubChannel
}

type KademliaInfo struct {
Expand All @@ -121,22 +125,18 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia {
params.Capabilities = capability.NewCapabilities()
}
k := &Kademlia{
base: addr,
KadParams: params,
capabilityIndex: make(map[string]*capabilityIndex),
defaultIndex: NewDefaultIndex(),
base: addr,
KadParams: params,
capabilityIndex: make(map[string]*capabilityIndex),
defaultIndex: NewDefaultIndex(),
newPeerPubSub: gopubsub.New(),
removedPeerPubSub: gopubsub.New(),
}
k.RegisterCapabilityIndex("full", *fullCapability)
k.RegisterCapabilityIndex("light", *lightCapability)
return k
}

type peerSubscription struct {
newPeerC chan newPeerSignal
removedPeerC chan *Peer
closed bool
}

type newPeerSignal struct {
peer *Peer
po int
Expand Down Expand Up @@ -465,17 +465,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
})
k.addToCapabilityIndex(p)
// notify subscribers asynchronously
go func(p *Peer, po int) {
for _, c := range k.peerChangedSig {
if c.closed {
close(c.newPeerC)
close(c.removedPeerC)
} else {
c.newPeerC <- newPeerSignal{peer: p, po: po}
}

}
}(p, po)
k.newPeerPubSub.Publish(newPeerSignal{peer: p, po: po})

if ins {
a := newEntryFromBzzAddress(p.BzzAddr)
Expand Down Expand Up @@ -584,34 +574,10 @@ func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, uns
// when a new Peer is added or removed from the table. Returned function unsubscribes
// the channel from signaling and releases the resources. Returned function is safe
// to be called multiple times.
func (k *Kademlia) SubscribeToPeerChanges() (addedC <-chan newPeerSignal, removedC <-chan *Peer, unsubscribe func()) {
newPeerC := make(chan newPeerSignal, 1)
removedPeerC := make(chan *Peer, 1)

k.lock.Lock()
defer k.lock.Unlock()

k.peerChangedSig = append(k.peerChangedSig, peerSubscription{
newPeerC: newPeerC,
removedPeerC: removedPeerC,
closed: false,
})

unsubscribe = func() {
k.lock.Lock()
defer k.lock.Unlock()

for i, c := range k.peerChangedSig {
if c.newPeerC == newPeerC {
c.closed = true
k.peerChangedSig = append(k.peerChangedSig[:i], k.peerChangedSig[i+1:]...)
break
}
}

}

return newPeerC, removedPeerC, unsubscribe
func (k *Kademlia) SubscribeToPeerChanges() (addedSub *gopubsub.Subscription, removedPeerSub *gopubsub.Subscription) {
addedSub = k.newPeerPubSub.Subscribe()
removedPeerSub = k.removedPeerPubSub.Subscribe()
return
}

// Off removes a peer from among live peers
Expand All @@ -633,17 +599,7 @@ func (k *Kademlia) Off(p *Peer) {
})
k.removeFromCapabilityIndex(p, true)
k.setNeighbourhoodDepth()
go func(p *Peer) {
for _, c := range k.peerChangedSig {
if c.closed {
close(c.newPeerC)
close(c.removedPeerC)
} else {
c.removedPeerC <- p
}

}
}(p)
k.removedPeerPubSub.Publish(p)
}

// EachConnFiltered performs the same action as EachConn
Expand Down
42 changes: 24 additions & 18 deletions network/kademlialoadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethersphere/swarm/log"

"github.com/kortatu/gopubsub"
)

// KademliaBackend is the required interface of KademliaLoadBalancer.
type KademliaBackend interface {
SubscribeToPeerChanges() (addedC <-chan newPeerSignal, removedC <-chan *Peer, unsubscribe func())
SubscribeToPeerChanges() (addedSub *gopubsub.Subscription, removedPeerSub *gopubsub.Subscription)
BaseAddr() []byte
EachBinDesc(base []byte, minProximityOrder int, consumer PeerBinConsumer)
EachBinDescFiltered(base []byte, capKey string, minProximityOrder int, consumer PeerBinConsumer) error
Expand All @@ -20,14 +22,13 @@ type KademliaBackend interface {

// Creates a new KademliaLoadBalancer from a KademliaBackend
func NewKademliaLoadBalancer(kademlia KademliaBackend, useMostSimilarInit bool) *KademliaLoadBalancer {
chanNewPeerSignals, chanOffPeerSignals, unsubscribe := kademlia.SubscribeToPeerChanges()
newPeerSub, offPeerSub := kademlia.SubscribeToPeerChanges()
klb := &KademliaLoadBalancer{
kademlia: kademlia,
resourceUseStats: newResourceLoadBalancer(),
newPeerChannel: chanNewPeerSignals,
offPeerChannel: chanOffPeerSignals,
unsubscribeNotifier: unsubscribe,
quitC: make(chan struct{}),
kademlia: kademlia,
resourceUseStats: newResourceLoadBalancer(),
newPeerSub: newPeerSub,
offPeerSub: offPeerSub,
quitC: make(chan struct{}),
}
if useMostSimilarInit {
klb.initCountFunc = klb.mostSimilarPeerCount
Expand Down Expand Up @@ -65,19 +66,19 @@ type LBBinConsumer func(bin LBBin) bool
// The user of KademliaLoadBalancer should signal if the returned element (LBPeer) has been used with the
// function lbPeer.Use()
type KademliaLoadBalancer struct {
kademlia KademliaBackend //kademlia to obtain bins of peers
resourceUseStats *resourceUseStats //a resourceUseStats to count uses
newPeerChannel <-chan newPeerSignal //a channel to be notified of new peers in kademlia
offPeerChannel <-chan *Peer //a channel to be notified of removed peers in kademlia
unsubscribeNotifier func() //an unsubscribe function provided when subscribe to kademlia notifiers
quitC chan struct{}
kademlia KademliaBackend //kademlia to obtain bins of peers
resourceUseStats *resourceUseStats //a resourceUseStats to count uses
newPeerSub *gopubsub.Subscription //a pubsub channel to be notified of new peers in kademlia
offPeerSub *gopubsub.Subscription //a pubsub channel to be notified of removed peers in kademlia
quitC chan struct{}

initCountFunc func(peer *Peer, po int) int //Function to use for initializing a new peer count
}

// Stop unsubscribe from notifiers
func (klb KademliaLoadBalancer) Stop() {
klb.unsubscribeNotifier()
klb.newPeerSub.Unsubscribe()
klb.offPeerSub.Unsubscribe()
close(klb.quitC)
}

Expand Down Expand Up @@ -127,7 +128,11 @@ func (klb *KademliaLoadBalancer) listenNewPeers() {
select {
case <-klb.quitC:
return
case signal, ok := <-klb.newPeerChannel:
case msg, ok := <-klb.newPeerSub.ReceiveChannel():
if !ok {
return
}
signal, ok := msg.(newPeerSignal)
if !ok {
return
}
Expand All @@ -141,8 +146,9 @@ func (klb *KademliaLoadBalancer) listenOffPeers() {
select {
case <-klb.quitC:
return
case peer := <-klb.offPeerChannel:
if peer != nil {
case msg := <-klb.offPeerSub.ReceiveChannel():
peer, ok := msg.(*Peer)
if peer != nil && ok {
klb.removedPeer(peer)
}
}
Expand Down
27 changes: 13 additions & 14 deletions network/kademlialoadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethersphere/swarm/network/capability"
"github.com/ethersphere/swarm/pot"

"github.com/kortatu/gopubsub"
)

// TestAddedNodes checks that when adding a node it is assigned the correct number of uses.
Expand Down Expand Up @@ -204,8 +206,8 @@ func TestEachBinFiltered(t *testing.T) {

type testKademliaBackend struct {
baseAddr []byte
addedChannel chan newPeerSignal
removedChannel chan *Peer
addedChannel *gopubsub.PubSubChannel
removedChannel *gopubsub.PubSubChannel
bins map[int][]*Peer
subscribed bool
maxPo int
Expand Down Expand Up @@ -248,8 +250,8 @@ func (tkb *testKademliaBackend) EachConn(base []byte, maxPo int, consume func(*P
func newTestKademliaBackend(address string) *testKademliaBackend {
return &testKademliaBackend{
baseAddr: pot.NewAddressFromString(address),
addedChannel: make(chan newPeerSignal, 1),
removedChannel: make(chan *Peer, 1),
addedChannel: gopubsub.New(),
removedChannel: gopubsub.New(),
bins: make(map[int][]*Peer),
}
}
Expand All @@ -258,14 +260,11 @@ func (tkb testKademliaBackend) BaseAddr() []byte {
return tkb.baseAddr
}

func (tkb *testKademliaBackend) SubscribeToPeerChanges() (addedC <-chan newPeerSignal, removedC <-chan *Peer, unsubscribe func()) {
unsubscribe = func() {
tkb.subscribed = false
close(tkb.addedChannel)
close(tkb.removedChannel)
}
func (tkb *testKademliaBackend) SubscribeToPeerChanges() (addedSub *gopubsub.Subscription, removedPeerSub *gopubsub.Subscription) {
addedSub = tkb.addedChannel.Subscribe()
removedPeerSub = tkb.removedChannel.Subscribe()
tkb.subscribed = true
return tkb.addedChannel, tkb.removedChannel, unsubscribe
return
}

func (tkb testKademliaBackend) EachBinDescFiltered(base []byte, capKey string, minProximityOrder int, consumer PeerBinConsumer) error {
Expand Down Expand Up @@ -317,10 +316,10 @@ func (tkb *testKademliaBackend) addPeer(peer *Peer, po int) {
}
tkb.bins[po] = append(tkb.bins[po], peer)
if tkb.subscribed {
tkb.addedChannel <- newPeerSignal{
tkb.addedChannel.Publish(newPeerSignal{
peer: peer,
po: po,
}
})
}
time.Sleep(100 * time.Millisecond)
}
Expand All @@ -338,7 +337,7 @@ func (tkb *testKademliaBackend) removePeer(peer *Peer) {
}
}
if tkb.subscribed {
tkb.removedChannel <- peer
tkb.removedChannel.Publish(peer)
}
}

Expand Down
41 changes: 41 additions & 0 deletions vendor/github.com/kortatu/gopubsub/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/kortatu/gopubsub/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 57c000c

Please sign in to comment.