Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network: remove isproxbin bool from kad.Each* iterfunc #18239

Merged
merged 2 commits into from
Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions swarm/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error {

// NotifyDepth sends a message to all connections if depth of saturation is changed
func NotifyDepth(depth uint8, kad *Kademlia) {
f := func(val *Peer, po int, _ bool) bool {
f := func(val *Peer, po int) bool {
val.NotifyDepth(depth)
return true
}
Expand All @@ -74,7 +74,7 @@ func NotifyDepth(depth uint8, kad *Kademlia) {

// NotifyPeer informs all peers about a newly added node
func NotifyPeer(p *BzzAddr, k *Kademlia) {
f := func(val *Peer, po int, _ bool) bool {
f := func(val *Peer, po int) bool {
val.NotifyPeer(p, uint8(po))
return true
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error {
if !d.sentPeers {
d.setDepth(msg.Depth)
var peers []*BzzAddr
d.kad.EachConn(d.Over(), 255, func(p *Peer, po int, isproxbin bool) bool {
d.kad.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
if pob, _ := Pof(d, d.kad.BaseAddr(), 0); pob > po {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions swarm/network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (h *Hive) Stop() error {
}
}
log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
h.EachConn(nil, 255, func(p *Peer, _ int, _ bool) bool {
h.EachConn(nil, 255, func(p *Peer, _ int) bool {
log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
p.Drop(nil)
return true
Expand Down Expand Up @@ -228,7 +228,7 @@ func (h *Hive) loadPeers() error {
// savePeers, savePeer implement persistence callback/
func (h *Hive) savePeers() error {
var peers []*BzzAddr
h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int, _ bool) bool {
h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int) bool {
if pa == nil {
log.Warn(fmt.Sprintf("empty addr: %v", i))
return true
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestHiveStatePersistance(t *testing.T) {

pp.Start(s1.Server)
i := 0
pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int, nn bool) bool {
pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int) bool {
delete(peers, addr.String())
i++
return true
Expand Down
31 changes: 11 additions & 20 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,46 +390,42 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con
// EachConn is an iterator with args (base, po, f) applies f to each live peer
// that has proximity order po or less as measured from the base
// if base is nil, kademlia base address is used
// It returns peers in order deepest to shallowest
func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int, bool) bool) {
func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int) bool) {
k.lock.RLock()
defer k.lock.RUnlock()
k.eachConn(base, o, f)
}

func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) {
func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int) bool) {
if len(base) == 0 {
base = k.base
}
depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
k.conns.EachNeighbour(base, Pof, func(val pot.Val, po int) bool {
if po > o {
return true
}
return f(val.(*Peer), po, po >= depth)
return f(val.(*Peer), po)
})
}

// EachAddr called with (base, po, f) is an iterator applying f to each known peer
// that has proximity order o or less as measured from the base
// if base is nil, kademlia base address is used
// It returns peers in order deepest to shallowest
func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) {
func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
k.lock.RLock()
defer k.lock.RUnlock()
k.eachAddr(base, o, f)
}

func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) {
func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
if len(base) == 0 {
base = k.base
}
depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
k.addrs.EachNeighbour(base, Pof, func(val pot.Val, po int) bool {
if po > o {
return true
}
return f(val.(*entry).BzzAddr, po, po >= depth)
return f(val.(*entry).BzzAddr, po)
})
}

Expand Down Expand Up @@ -687,12 +683,11 @@ func (k *Kademlia) saturation() int {
// TODO move to separate testing tools file
func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]byte) {
pm := make(map[string]bool)

depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put the comments back.

// create a map with all peers at depth and deeper known in the kademlia
// in order deepest to shallowest compared to the kademlia base address
// all bins (except self) are included (0 <= bin <= 255)
depth := depthForPot(k.addrs, k.MinProxBinSize, k.base)
k.eachAddr(nil, 255, func(p *BzzAddr, po int, nn bool) bool {
k.eachAddr(nil, 255, func(p *BzzAddr, po int) bool {
// in order deepest to shallowest compared to the kademlia base address
// all bins (except self) are included (0 <= bin <= 255)
if po < depth {
return false
}
Expand Down Expand Up @@ -724,12 +719,8 @@ func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]
// It is used in Healthy function for testing only
func (k *Kademlia) connectedNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) {
pm := make(map[string]bool)

// create a map with all peers at depth and deeper that are connected in the kademlia
// in order deepest to shallowest compared to the kademlia base address
// all bins (except self) are included (0 <= bin <= 255)
depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
k.eachConn(nil, 255, func(p *Peer, po int, nn bool) bool {
k.eachConn(nil, 255, func(p *Peer, po int) bool {
if po < depth {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func assertHealth(t *testing.T, k *Kademlia, expectHealthy bool, expectSaturatio
t.Helper()
kid := common.Bytes2Hex(k.BaseAddr())
addrs := [][]byte{k.BaseAddr()}
k.EachAddr(nil, 255, func(addr *BzzAddr, po int, _ bool) bool {
k.EachAddr(nil, 255, func(addr *BzzAddr, po int) bool {
addrs = append(addrs, addr.Address())
return true
})
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/networkid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestNetworkID(t *testing.T) {
if kademlias[node].addrs.Size() != len(netIDGroup)-1 {
t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1)
}
kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int, _ bool) bool {
kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int) bool {
found := false
for _, nd := range netIDGroup {
if bytes.Equal(kademlias[nd].BaseAddr(), addr.Address()) {
Expand Down
3 changes: 1 addition & 2 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stream
import (
"context"
"errors"

"fmt"

"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -245,7 +244,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
}
} else {
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
id := p.ID()
if p.LightNode {
// skip light nodes
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
log.Warn("SendOfferedHashes error", "err", err)
log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
// go p.SendOfferedHashes(s, req.From, req.To)
Expand Down
2 changes: 1 addition & 1 deletion swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (p *Pss) forward(msg *PssMsg) error {
onlySendOnce = true
}

p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
if po < broadcastThreshold && sent > 0 {
return false // stop iterating
}
Expand Down
6 changes: 3 additions & 3 deletions swarm/pss/pss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,12 +491,12 @@ func TestAddressMatchProx(t *testing.T) {
// meanwhile test regression for kademlia since we are compiling the test parameters from different packages
var proxes int
var conns int
kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool {
depth := kad.NeighbourhoodDepth()
kad.EachConn(nil, peerCount, func(p *network.Peer, po int) bool {
conns++
if prox {
if po >= depth {
proxes++
}
log.Trace("kadconn", "po", po, "peer", p, "prox", prox)
return true
})
if proxes != nnPeerCount {
Expand Down