Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network: hive bug: needed shallow peers are not sent to nodes beyond connection's proximity order #19326

Merged
merged 6 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions swarm/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

// discovery bzz extension for requesting and relaying node address records

var sortPeers = noSortPeers

// Peer wraps BzzPeer and embeds Kademlia overlay connectivity driver
type Peer struct {
*BzzPeer
Expand Down Expand Up @@ -156,28 +158,39 @@ func (msg subPeersMsg) String() string {
return fmt.Sprintf("%T: request peers > PO%02d. ", msg, msg.Depth)
}

// handleSubPeersMsg handles incoming subPeersMsg
// this message represents the saturation depth of the remote peer
// saturation depth is the radius within which the peer subscribes to peers
// the first time this is received we send peer info on all
// our connected peers that fall within peers saturation depth
// otherwise this depth is just recorded on the peer, so that
// subsequent new connections are sent iff they fall within the radius
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: 'iff' => 'if'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iff means if and only if

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: then I would prefer 'if and only if' because that requires no knowledge of the abbreviation; that has to be written just once and read many times

func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error {
d.setDepth(msg.Depth)
// only send peers after the initial subPeersMsg
if !d.sentPeers {
d.setDepth(msg.Depth)
var peers []*BzzAddr
// iterate connection in ascending order of disctance from the remote address
frncmx marked this conversation as resolved.
Show resolved Hide resolved
d.kad.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
if pob, _ := Pof(d, d.kad.BaseAddr(), 0); pob > po {
// terminate if we are beyond the radius
if uint8(po) < msg.Depth {
return false
}
if !d.seen(p.BzzAddr) {
if !d.seen(p.BzzAddr) { // here just records the peer sent
peers = append(peers, p.BzzAddr)
}
return true
})
// if useful peers are found, send them over
if len(peers) > 0 {
go d.Send(context.TODO(), &peersMsg{Peers: peers})
go d.Send(context.TODO(), &peersMsg{Peers: sortPeers(peers)})
}
}
d.sentPeers = true
return nil
}

// seen takes an peer address and checks if it was sent to a peer already
// seen takes a peer address and checks if it was sent to a peer already
// if not, marks the peer as sent
func (d *Peer) seen(p *BzzAddr) bool {
d.mtx.Lock()
Expand All @@ -201,3 +214,7 @@ func (d *Peer) setDepth(depth uint8) {
defer d.mtx.Unlock()
d.depth = depth
}

func noSortPeers(peers []*BzzAddr) []*BzzAddr {
return peers
}
206 changes: 204 additions & 2 deletions swarm/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,32 @@
package network

import (
"crypto/ecdsa"
crand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"net"
"sort"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/pot"
)

/***
*
* - after connect, that outgoing subpeersmsg is sent
*
*/
func TestDiscovery(t *testing.T) {
func TestSubPeersMsg(t *testing.T) {
params := NewHiveParams()
s, pp, err := newHiveTester(t, params, 1, nil)
s, pp, err := newHiveTester(params, 1, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -58,3 +71,192 @@ func TestDiscovery(t *testing.T) {
t.Fatal(err)
}
}

const (
frncmx marked this conversation as resolved.
Show resolved Hide resolved
maxPO = 8 // PO of pivot and control; chosen to test enough cases but not run too long
maxPeerPO = 6 // pivot has no peers closer than this to the control peer
maxPeersPerPO = 3
)

// TestInitialPeersMsg tests if peersMsg response to incoming subPeersMsg is correct
func TestInitialPeersMsg(t *testing.T) {
for po := 0; po < maxPO; po++ {
for depth := 0; depth < maxPO; depth++ {
t.Run(fmt.Sprintf("PO=%d,advertised depth=%d", po, depth), func(t *testing.T) {
testInitialPeersMsg(t, po, depth)
})
}
}
}

// testInitialPeersMsg tests that the correct set of peer info is sent
// to another peer after receiving their subPeersMsg request
func testInitialPeersMsg(t *testing.T, peerPO, peerDepth int) {
// generate random pivot address
prvkey, err := crypto.GenerateKey()
if err != nil {
t.Fatal(err)
}

defer func(orig func([]*BzzAddr) []*BzzAddr) {
sortPeers = orig
}(sortPeers)
sortPeers = testSortPeers
pivotAddr := pot.NewAddressFromBytes(PrivateKeyToBzzKey(prvkey))
// generate control peers address at peerPO wrt pivot
peerAddr := pot.RandomAddressAt(pivotAddr, peerPO)
// construct kademlia and hive
to := NewKademlia(pivotAddr[:], NewKadParams())
hive := NewHive(NewHiveParams(), to, nil)

// expected addrs in peersMsg response
var expBzzAddrs []*BzzAddr
connect := func(a pot.Address, po int) (addrs []*BzzAddr) {
n := rand.Intn(maxPeersPerPO)
for i := 0; i < n; i++ {
peer, err := newDiscPeer(pot.RandomAddressAt(a, po))
if err != nil {
t.Fatal(err)
}
hive.On(peer)
addrs = append(addrs, peer.BzzAddr)
}
return addrs
}
register := func(a pot.Address, po int) {
addr := pot.RandomAddressAt(a, po)
hive.Register(&BzzAddr{OAddr: addr[:]})
}

// generate connected and just registered peers
for po := maxPeerPO; po >= 0; po-- {
// create a fake connected peer at po from peerAddr
ons := connect(peerAddr, po)
// create a fake registered address at po from peerAddr
register(peerAddr, po)
// we collect expected peer addresses only up till peerPO
if po < peerDepth {
continue
}
expBzzAddrs = append(expBzzAddrs, ons...)
}

// add extra connections closer to pivot than control
for po := peerPO + 1; po < maxPO; po++ {
ons := connect(pivotAddr, po)
if peerDepth <= peerPO {
expBzzAddrs = append(expBzzAddrs, ons...)
}
}

// create a special bzzBaseTester in which we can associate `enode.ID` to the `bzzAddr` we created above
s, _, err := newBzzBaseTesterWithAddrs(prvkey, [][]byte{peerAddr[:]}, DiscoverySpec, hive.Run)
if err != nil {
t.Fatal(err)
}

// peerID to use in the protocol tester testExchange expect/trigger
peerID := s.Nodes[0].ID()
// block until control peer is found among hive peers
found := false
for attempts := 0; attempts < 20; attempts++ {
if _, found = hive.peers[peerID]; found {
break
}
time.Sleep(1 * time.Millisecond)
}

if !found {
t.Fatal("timeout waiting for peer connection to start")
}

// pivotDepth is the advertised depth of the pivot node we expect in the outgoing subPeersMsg
pivotDepth := hive.saturation()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find a compelling explanation why this is the correct function to use for this test and not hive.depth. And why is hive.depth accessible at all? Why was NeighbourhoodDepth() wrong and/or producing flaky results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the logic. Polemic on the ambiguous naming aside.
saturation is the depth that the peer is advertising, basically, the shallowest bin, that has less than MinBinSize connections

// the test exchange is as follows:
// 1. pivot sends to the control peer a `subPeersMsg` advertising its depth (ignored)
// 2. peer sends to pivot a `subPeersMsg` advertising its own depth (arbitrarily chosen)
// 3. pivot responds with `peersMsg` with the set of expected peers
err = s.TestExchanges(
p2ptest.Exchange{
Label: "outgoing subPeersMsg",
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &subPeersMsg{Depth: uint8(pivotDepth)},
Peer: peerID,
},
},
},
p2ptest.Exchange{
Label: "trigger subPeersMsg and expect peersMsg",
Triggers: []p2ptest.Trigger{
{
Code: 1,
Msg: &subPeersMsg{Depth: uint8(peerDepth)},
Peer: peerID,
},
},
Expects: []p2ptest.Expect{
{
Code: 0,
Msg: &peersMsg{Peers: testSortPeers(expBzzAddrs)},
Peer: peerID,
Timeout: 100 * time.Millisecond,
},
},
})

// for values MaxPeerPO < peerPO < MaxPO the pivot has no peers to offer to the control peer
// in this case, no peersMsg will be sent out, and we would run into a time out
if len(expBzzAddrs) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I still hold the intuition that maybe this test tries to test too many things at once. (clean code: test one concept per test, ideally with one assert) But I let that go. On the other hand, please consider my version for the same logic. What do you think? Is it simpler?

  • I put the simplest case first, so I can "forget" about that quickly. (smaller working memory)
  • I did not like the return statement was 3 indents deep.
if len(expBzzAddrs) > 0 {
  if err != nil {
    t.Fatal(err)
   }
} else {
  if err == nil {   // <= stating we are expecting an error when `len(expBzzAddrs)` is not nill (I usually try to aoid `err == nil` but I think in this case it fits.
    t.Fatalf("expected timeout, got no error")
  }
  
   timeoutMessage := "exchange #1 \"trigger subPeersMsg and expect peersMsg\": timed out" 
   if err.Error() != timeoutMessage {
      t.Fatalf("expected timeout, got %v", err)
   } 
}

Note:

  • Since the function scope is big 100+ lines, I use if...else to press that I'm checking related stuff, i.e., I'm still making decisions based on len(expBzzAddrs) not something else coming from the previous lines.

if err != nil {
if err.Error() != "exchange #1 \"trigger subPeersMsg and expect peersMsg\": timed out" {
t.Fatalf("expected timeout, got %v", err)
}
return
}
t.Fatalf("expected timeout, got no error")
}

if err != nil {
t.Fatal(err)
}
}

func testSortPeers(peers []*BzzAddr) []*BzzAddr {
comp := func(i, j int) bool {
vi := binary.BigEndian.Uint64(peers[i].OAddr)
vj := binary.BigEndian.Uint64(peers[j].OAddr)
return vi < vj
}
sort.Slice(peers, comp)
return peers
}

// as we are not creating a real node via the protocol,
// we need to create the discovery peer objects for the additional kademlia
// nodes manually
func newDiscPeer(addr pot.Address) (*Peer, error) {
pKey, err := ecdsa.GenerateKey(crypto.S256(), crand.Reader)
if err != nil {
return nil, err
}
pubKey := pKey.PublicKey
nod := enode.NewV4(&pubKey, net.IPv4(127, 0, 0, 1), 0, 0)
bzzAddr := &BzzAddr{OAddr: addr[:], UAddr: []byte(nod.String())}
id := nod.ID()
p2pPeer := p2p.NewPeer(id, id.String(), nil)
return NewPeer(&BzzPeer{
Peer: protocols.NewPeer(p2pPeer, &dummyMsgRW{}, DiscoverySpec),
BzzAddr: bzzAddr,
}, nil), nil
}

type dummyMsgRW struct{}

func (d *dummyMsgRW) ReadMsg() (p2p.Msg, error) {
return p2p.Msg{}, nil
}
func (d *dummyMsgRW) WriteMsg(msg p2p.Msg) error {
return nil
}
Loading