Skip to content

Commit

Permalink
feat: announce to lightnodes (#2351)
Browse files Browse the repository at this point in the history
  • Loading branch information
acud authored Jul 22, 2021
1 parent 4237d08 commit 23cee42
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 31 deletions.
127 changes: 115 additions & 12 deletions pkg/p2p/libp2p/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,98 @@ func TestTopologyNotifier(t *testing.T) {
waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1)
}

// TestTopologyAnnounce checks that announcement
// works correctly for full nodes and light nodes.
func TestTopologyAnnounce(t *testing.T) {
var (
mtx sync.Mutex
ctx = context.Background()

ab1, ab2, ab3 = addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore())

announceCalled = false
announceToCalled = false

n1a = func(context.Context, swarm.Address, bool) error {
mtx.Lock()
announceCalled = true
mtx.Unlock()
return nil
}
n1at = func(context.Context, swarm.Address, swarm.Address, bool) error {
mtx.Lock()
announceToCalled = true
mtx.Unlock()
return nil
}
)
// test setup: 2 full nodes and one light
// light connect to full(1), then full(2)
// connects to full(1), check that full(1)
// tried to announce full(2) to light.

notifier1 := mockAnnouncingNotifier(n1a, n1at)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab1,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s1.SetPickyNotifier(notifier1)

s2, overlay2 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab2,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})

s3, overlay3 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab3,
libp2pOpts: libp2p.Options{
FullNode: false,
},
})

addr := serviceUnderlayAddress(t, s1)

// s3 (light) connects to s1 (full)
_, err := s3.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}

expectPeers(t, s3, overlay1)
expectPeersEventually(t, s1, overlay3)

mtx.Lock()
if !announceCalled {
t.Error("expected announce to be called")
}
if announceToCalled {
t.Error("announceTo called but should not")
}
mtx.Unlock()

// check address book entries are there
checkAddressbook(t, ab3, overlay1, addr)

// s2 (full) connects to s1 (full)
_, err = s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}

expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2, overlay3)

mtx.Lock()
if !announceToCalled {
t.Error("expected announceTo to be called")
}
mtx.Unlock()
}

func TestTopologyOverSaturated(t *testing.T) {
var (
mtx sync.Mutex
Expand Down Expand Up @@ -773,9 +865,11 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address
}

type notifiee struct {
connected func(context.Context, p2p.Peer, bool) error
disconnected func(p2p.Peer)
connected cFunc
disconnected dFunc
pick bool
announce announceFunc
announceTo announceToFunc
}

func (n *notifiee) Connected(c context.Context, p p2p.Peer, f bool) error {
Expand All @@ -790,21 +884,30 @@ func (n *notifiee) Pick(p p2p.Peer) bool {
return n.pick
}

func (n *notifiee) Announce(context.Context, swarm.Address, bool) error {
return nil
func (n *notifiee) Announce(ctx context.Context, a swarm.Address, full bool) error {
return n.announce(ctx, a, full)
}

func (n *notifiee) AnnounceTo(ctx context.Context, a, b swarm.Address, full bool) error {
return n.announceTo(ctx, a, b, full)
}

func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick}
return &notifiee{connected: c, disconnected: d, pick: pick, announce: noopAnnounce, announceTo: noopAnnounceTo}
}

func mockAnnouncingNotifier(a announceFunc, at announceToFunc) p2p.PickyNotifier {
return &notifiee{connected: noopCf, disconnected: noopDf, pick: true, announce: a, announceTo: at}
}

type (
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
announceFunc func(context.Context, swarm.Address, bool) error
announceToFunc func(context.Context, swarm.Address, swarm.Address, bool) error
)

var noopCf = func(_ context.Context, _ p2p.Peer, _ bool) error {
return nil
}

var noopDf = func(p p2p.Peer) {}
var noopCf = func(context.Context, p2p.Peer, bool) error { return nil }
var noopDf = func(p2p.Peer) {}
var noopAnnounce = func(context.Context, swarm.Address, bool) error { return nil }
var noopAnnounceTo = func(context.Context, swarm.Address, swarm.Address, bool) error { return nil }
46 changes: 31 additions & 15 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p"
Expand Down Expand Up @@ -82,6 +83,7 @@ type lightnodes interface {
Disconnected(p2p.Peer)
Count() int
RandomPeer(swarm.Address) (swarm.Address, error)
EachPeer(pf topology.EachPeerFunc) error
}

type Options struct {
Expand Down Expand Up @@ -396,20 +398,33 @@ func (s *Service) handleIncoming(stream network.Stream) {
return
}
}
} else if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
// resulting in a flaky response from the Connect method on
// the other side.
// that is why the Pick method has been added to the notifier
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(overlay)
return
} else {
if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
// resulting in a flaky response from the Connect method on
// the other side.
// that is why the Pick method has been added to the notifier
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(overlay)
return
}
// when a full node connects, we gossip about it to the
// light nodes so that they can also have a chance at building
// a solid topology.
_ = s.lightNodes.EachPeer(func(addr swarm.Address, _ uint8) (bool, bool, error) {
go func(addressee, peer swarm.Address, fullnode bool) {
if err := s.notifier.AnnounceTo(s.ctx, addressee, peer, fullnode); err != nil {
s.logger.Debugf("stream handler: notifier.Announce to light node %s %s: %v", addressee.String(), peer.String(), err)
}
}(addr, peer.Address, i.FullNode)
return false, false, nil
})
}
}

Expand Down Expand Up @@ -483,6 +498,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if errors.As(err, &de) {
_ = stream.Reset()
_ = s.Disconnect(overlay)
logger.Tracef("handler(%s): disconnecting %s due to disconnect error", p.Name, overlay.String())
}

var bpe *p2p.BlockPeerError
Expand All @@ -492,7 +508,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err)
logger.Errorf("unable to blocklist peer %v", peerID)
}
logger.Tracef("blocklisted a peer %s", peerID)
logger.Tracef("handler(%s): blocklisted %s", p.Name, overlay.String())
}
// count unexpected requests
if errors.Is(err, p2p.ErrUnexpected) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type PickyNotifier interface {
type Notifier interface {
Connected(context.Context, Peer, bool) error
Disconnected(Peer)
Announce(context.Context, swarm.Address, bool) error
Announce(ctx context.Context, peer swarm.Address, fullnode bool) error
AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error
}

// DebugService extends the Service with method used for debugging.
Expand Down
16 changes: 13 additions & 3 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ var (
)

var (
errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin")
errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin")
errAnnounceLightNode = errors.New("announcing light node")
)

type (
Expand Down Expand Up @@ -842,6 +843,15 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e
return err
}

// AnnounceTo announces a selected peer to another.
func (k *Kad) AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error {
if !fullnode {
return errAnnounceLightNode
}

return k.discovery.BroadcastPeers(ctx, addressee, peer)
}

// AddPeers adds peers to the knownPeers list.
// This does not guarantee that a connection will immediately
// be made to the peer.
Expand Down
26 changes: 26 additions & 0 deletions pkg/topology/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,32 @@ func TestDiscoveryHooks(t *testing.T) {
waitBcast(t, disc, p3, p1, p2)
}

func TestAnnounceTo(t *testing.T) {
var (
conns int32
_, kad, ab, disc, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
p1, p2 = test.RandomAddress(), test.RandomAddress()
)

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()

// first add a peer from AddPeers, wait for the connection
addOne(t, signer, kad, ab, p1)
waitConn(t, &conns)

if err := kad.AnnounceTo(context.Background(), p1, p2, true); err != nil {
t.Fatal(err)
}
waitBcast(t, disc, p1, p2)

if err := kad.AnnounceTo(context.Background(), p1, p2, false); err == nil {
t.Fatal("expected error")
}
}

func TestBackoff(t *testing.T) {
// cheat and decrease the timer
defer func(t time.Duration) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/kademlia/mock/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (m *Mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}

func (m *Mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}

func (m *Mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
channel := make(chan struct{}, 1)
var closeOnce sync.Once
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/lightnode/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ PICKPEER:
return addr, nil
}

func (c *Container) EachPeer(pf topology.EachPeerFunc) error {
return c.connectedPeers.EachBin(pf)
}

func (c *Container) PeerInfo() topology.BinInfo {
return topology.BinInfo{
BinPopulation: uint(c.connectedPeers.Length()),
Expand Down
13 changes: 13 additions & 0 deletions pkg/topology/lightnode/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package lightnode_test

import (
"context"
"errors"
"reflect"
"testing"

Expand Down Expand Up @@ -62,6 +63,18 @@ func TestContainer(t *testing.T) {
if !p.Equal(p1) {
t.Fatalf("expected p2 but got %s", p.String())
}

i := 0
peers := []swarm.Address{p2, p1}
if err = c.EachPeer(func(p swarm.Address, _ uint8) (bool, bool, error) {
if !p.Equal(peers[i]) {
return false, false, errors.New("peer not in order")
}
i++
return false, false, nil
}); err != nil {
t.Fatal(err)
}
})
t.Run("empty container after peer disconnect", func(t *testing.T) {
c := lightnode.NewContainer(base)
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (d *mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}

func (d *mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}

func (d *mock) Peers() []swarm.Address {
return d.peers
}
Expand Down

0 comments on commit 23cee42

Please sign in to comment.