Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: announce to lightnodes #2351

Merged
merged 1 commit into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 115 additions & 12 deletions pkg/p2p/libp2p/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,98 @@ func TestTopologyNotifier(t *testing.T) {
waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1)
}

// TestTopologyAnnounce checks that announcement
// works correctly for full nodes and light nodes.
func TestTopologyAnnounce(t *testing.T) {
var (
mtx sync.Mutex
ctx = context.Background()

ab1, ab2, ab3 = addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore())

announceCalled = false
announceToCalled = false

n1a = func(context.Context, swarm.Address, bool) error {
mtx.Lock()
announceCalled = true
mtx.Unlock()
return nil
}
n1at = func(context.Context, swarm.Address, swarm.Address, bool) error {
mtx.Lock()
announceToCalled = true
mtx.Unlock()
return nil
}
)
// test setup: 2 full nodes and one light
// light connect to full(1), then full(2)
// connects to full(1), check that full(1)
// tried to announce full(2) to light.

notifier1 := mockAnnouncingNotifier(n1a, n1at)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab1,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s1.SetPickyNotifier(notifier1)

s2, overlay2 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab2,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})

s3, overlay3 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab3,
libp2pOpts: libp2p.Options{
FullNode: false,
},
})

addr := serviceUnderlayAddress(t, s1)

// s3 (light) connects to s1 (full)
_, err := s3.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}

expectPeers(t, s3, overlay1)
expectPeersEventually(t, s1, overlay3)

mtx.Lock()
if !announceCalled {
t.Error("expected announce to be called")
}
if announceToCalled {
t.Error("announceTo called but should not")
}
mtx.Unlock()

// check address book entries are there
checkAddressbook(t, ab3, overlay1, addr)

// s2 (full) connects to s1 (full)
_, err = s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}

expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2, overlay3)

mtx.Lock()
if !announceToCalled {
t.Error("expected announceTo to be called")
}
mtx.Unlock()
}

func TestTopologyOverSaturated(t *testing.T) {
var (
mtx sync.Mutex
Expand Down Expand Up @@ -773,9 +865,11 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address
}

type notifiee struct {
connected func(context.Context, p2p.Peer, bool) error
disconnected func(p2p.Peer)
connected cFunc
disconnected dFunc
pick bool
announce announceFunc
announceTo announceToFunc
}

func (n *notifiee) Connected(c context.Context, p p2p.Peer, f bool) error {
Expand All @@ -790,21 +884,30 @@ func (n *notifiee) Pick(p p2p.Peer) bool {
return n.pick
}

func (n *notifiee) Announce(context.Context, swarm.Address, bool) error {
return nil
func (n *notifiee) Announce(ctx context.Context, a swarm.Address, full bool) error {
return n.announce(ctx, a, full)
}

func (n *notifiee) AnnounceTo(ctx context.Context, a, b swarm.Address, full bool) error {
return n.announceTo(ctx, a, b, full)
}

func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick}
return &notifiee{connected: c, disconnected: d, pick: pick, announce: noopAnnounce, announceTo: noopAnnounceTo}
}

func mockAnnouncingNotifier(a announceFunc, at announceToFunc) p2p.PickyNotifier {
return &notifiee{connected: noopCf, disconnected: noopDf, pick: true, announce: a, announceTo: at}
}

type (
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
announceFunc func(context.Context, swarm.Address, bool) error
announceToFunc func(context.Context, swarm.Address, swarm.Address, bool) error
)

var noopCf = func(_ context.Context, _ p2p.Peer, _ bool) error {
return nil
}

var noopDf = func(p p2p.Peer) {}
var noopCf = func(context.Context, p2p.Peer, bool) error { return nil }
var noopDf = func(p2p.Peer) {}
var noopAnnounce = func(context.Context, swarm.Address, bool) error { return nil }
var noopAnnounceTo = func(context.Context, swarm.Address, swarm.Address, bool) error { return nil }
46 changes: 31 additions & 15 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p"
Expand Down Expand Up @@ -82,6 +83,7 @@ type lightnodes interface {
Disconnected(p2p.Peer)
Count() int
RandomPeer(swarm.Address) (swarm.Address, error)
EachPeer(pf topology.EachPeerFunc) error
}

type Options struct {
Expand Down Expand Up @@ -396,20 +398,33 @@ func (s *Service) handleIncoming(stream network.Stream) {
return
}
}
} else if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
// resulting in a flaky response from the Connect method on
// the other side.
// that is why the Pick method has been added to the notifier
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(overlay)
return
} else {
if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
// resulting in a flaky response from the Connect method on
// the other side.
// that is why the Pick method has been added to the notifier
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(overlay)
return
}
// when a full node connects, we gossip about it to the
// light nodes so that they can also have a chance at building
// a solid topology.
_ = s.lightNodes.EachPeer(func(addr swarm.Address, _ uint8) (bool, bool, error) {
go func(addressee, peer swarm.Address, fullnode bool) {
if err := s.notifier.AnnounceTo(s.ctx, addressee, peer, fullnode); err != nil {
s.logger.Debugf("stream handler: notifier.Announce to light node %s %s: %v", addressee.String(), peer.String(), err)
}
}(addr, peer.Address, i.FullNode)
return false, false, nil
})
}
}

Expand Down Expand Up @@ -483,6 +498,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if errors.As(err, &de) {
_ = stream.Reset()
_ = s.Disconnect(overlay)
logger.Tracef("handler(%s): disconnecting %s due to disconnect error", p.Name, overlay.String())
}

var bpe *p2p.BlockPeerError
Expand All @@ -492,7 +508,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err)
logger.Errorf("unable to blocklist peer %v", peerID)
}
logger.Tracef("blocklisted a peer %s", peerID)
logger.Tracef("handler(%s): blocklisted %s", p.Name, overlay.String())
}
// count unexpected requests
if errors.Is(err, p2p.ErrUnexpected) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type PickyNotifier interface {
type Notifier interface {
Connected(context.Context, Peer, bool) error
Disconnected(Peer)
Announce(context.Context, swarm.Address, bool) error
Announce(ctx context.Context, peer swarm.Address, fullnode bool) error
AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error
}

// DebugService extends the Service with method used for debugging.
Expand Down
16 changes: 13 additions & 3 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ var (
)

var (
errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin")
errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin")
errAnnounceLightNode = errors.New("announcing light node")
)

type (
Expand Down Expand Up @@ -842,6 +843,15 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e
return err
}

// AnnounceTo announces a selected peer to another.
func (k *Kad) AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error {
if !fullnode {
return errAnnounceLightNode
}

return k.discovery.BroadcastPeers(ctx, addressee, peer)
}

// AddPeers adds peers to the knownPeers list.
// This does not guarantee that a connection will immediately
// be made to the peer.
Expand Down
26 changes: 26 additions & 0 deletions pkg/topology/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,32 @@ func TestDiscoveryHooks(t *testing.T) {
waitBcast(t, disc, p3, p1, p2)
}

func TestAnnounceTo(t *testing.T) {
var (
conns int32
_, kad, ab, disc, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
p1, p2 = test.RandomAddress(), test.RandomAddress()
)

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()

// first add a peer from AddPeers, wait for the connection
addOne(t, signer, kad, ab, p1)
waitConn(t, &conns)

if err := kad.AnnounceTo(context.Background(), p1, p2, true); err != nil {
t.Fatal(err)
}
waitBcast(t, disc, p1, p2)

if err := kad.AnnounceTo(context.Background(), p1, p2, false); err == nil {
t.Fatal("expected error")
}
}

func TestBackoff(t *testing.T) {
// cheat and decrease the timer
defer func(t time.Duration) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/kademlia/mock/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (m *Mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}

func (m *Mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}

func (m *Mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
channel := make(chan struct{}, 1)
var closeOnce sync.Once
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/lightnode/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ PICKPEER:
return addr, nil
}

func (c *Container) EachPeer(pf topology.EachPeerFunc) error {
return c.connectedPeers.EachBin(pf)
}

func (c *Container) PeerInfo() topology.BinInfo {
return topology.BinInfo{
BinPopulation: uint(c.connectedPeers.Length()),
Expand Down
13 changes: 13 additions & 0 deletions pkg/topology/lightnode/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package lightnode_test

import (
"context"
"errors"
"reflect"
"testing"

Expand Down Expand Up @@ -62,6 +63,18 @@ func TestContainer(t *testing.T) {
if !p.Equal(p1) {
t.Fatalf("expected p2 but got %s", p.String())
}

i := 0
peers := []swarm.Address{p2, p1}
if err = c.EachPeer(func(p swarm.Address, _ uint8) (bool, bool, error) {
if !p.Equal(peers[i]) {
return false, false, errors.New("peer not in order")
}
i++
return false, false, nil
}); err != nil {
t.Fatal(err)
}
})
t.Run("empty container after peer disconnect", func(t *testing.T) {
c := lightnode.NewContainer(base)
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (d *mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}

func (d *mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}

func (d *mock) Peers() []swarm.Address {
return d.peers
}
Expand Down