Skip to content

Commit

Permalink
feat: announce to lightnodes
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Jul 22, 2021
1 parent 6f3a382 commit 3571b4c
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 28 deletions.
4 changes: 4 additions & 0 deletions pkg/p2p/libp2p/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,10 @@ func (n *notifiee) Announce(context.Context, swarm.Address, bool) error {
return nil
}

func (n *notifiee) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}

func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick}
}
Expand Down
60 changes: 36 additions & 24 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p"
Expand Down Expand Up @@ -82,6 +83,7 @@ type lightnodes interface {
Disconnected(p2p.Peer)
Count() int
RandomPeer(swarm.Address) (swarm.Address, error)
EachBin(pf topology.EachPeerFunc) error
}

type Options struct {
Expand All @@ -93,7 +95,6 @@ type Options struct {
LightNodeLimit int
WelcomeMessage string
Transaction []byte
hostFactory func(context.Context, ...libp2p.Option) (host.Host, error)
}

func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, swapBackend handshake.SenderMatcher, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) {
Expand Down Expand Up @@ -182,19 +183,14 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay

opts = append(opts, transports...)

if o.hostFactory == nil {
// Use the default libp2p host creation
o.hostFactory = libp2p.New
}

h, err := o.hostFactory(ctx, opts...)
h, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, err
}

// Support same non default security and transport options as
// original host.
dialer, err := o.hostFactory(ctx, append(transports, security)...)
dialer, err := libp2p.New(ctx, append(transports, security)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,7 +226,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// the addresses used are not dialable and hence should be cleaned up. We should create
// this host with the same transports and security options to be able to dial to other
// peers.
pingDialer, err := o.hostFactory(ctx, append(transports, security, libp2p.NoListenAddrs)...)
pingDialer, err := libp2p.New(ctx, append(transports, security, libp2p.NoListenAddrs)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -396,20 +392,35 @@ func (s *Service) handleIncoming(stream network.Stream) {
return
}
}
} else if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
// resulting in a flaky response from the Connect method on
// the other side.
// that is why the Pick method has been added to the notifier
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(overlay)
return
} else {
if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly

s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
// resulting in a flaky response from the Connect method on
// the other side.
// that is why the Pick method has been added to the notifier
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(overlay)
return
}

// when a full node connects, we gossip about it to the
// light nodes so that they can also have a chance at building
// a solid topology.
_ = s.lightNodes.EachBin(func(addr swarm.Address, _ uint8) (bool, bool, error) {
go func(addressee, peer swarm.Address, fullnode bool) {
if err := s.notifier.AnnounceTo(s.ctx, addressee, peer, fullnode); err != nil {
s.logger.Debugf("stream handler: notifier.Announce to light node %s %s: %v", addressee.String(), peer.String(), err)
}
}(addr, peer.Address, i.FullNode)
return false, false, nil
})
}
}

Expand Down Expand Up @@ -481,6 +492,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if err := ss.Handler(ctx, p2p.Peer{Address: overlay, FullNode: full}, stream); err != nil {
var de *p2p.DisconnectError
if errors.As(err, &de) {
logger.Tracef("libp2p handler(%s): disconnecting %s", p.Name, overlay.String())
_ = stream.Reset()
_ = s.Disconnect(overlay)
}
Expand All @@ -492,7 +504,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err)
logger.Errorf("unable to blocklist peer %v", peerID)
}
logger.Tracef("blocklisted a peer %s", peerID)
logger.Tracef("libp2p handler(%s): blocklisted %s", p.Name, overlay.String())
}
// count unexpected requests
if errors.Is(err, p2p.ErrUnexpected) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type PickyNotifier interface {
type Notifier interface {
Connected(context.Context, Peer, bool) error
Disconnected(Peer)
Announce(context.Context, swarm.Address, bool) error
Announce(ctx context.Context, peer swarm.Address, fullnode bool) error
AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error
}

// DebugService extends the Service with method used for debugging.
Expand Down
16 changes: 13 additions & 3 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ var (
)

var (
errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin")
errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin")
errAnnounceLightNode = errors.New("announcing light node")
)

type (
Expand Down Expand Up @@ -842,6 +843,15 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e
return err
}

// AnnounceTo announces a selected peer to another.
func (k *Kad) AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error {
if !fullnode {
return errAnnounceLightNode
}

return k.discovery.BroadcastPeers(ctx, addressee, peer)
}

// AddPeers adds peers to the knownPeers list.
// This does not guarantee that a connection will immediately
// be made to the peer.
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/kademlia/mock/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (m *Mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}

func (m *Mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}

func (m *Mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
channel := make(chan struct{}, 1)
var closeOnce sync.Once
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/lightnode/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ PICKPEER:
return addr, nil
}

func (c *Container) EachBin(pf topology.EachPeerFunc) error {
return c.connectedPeers.EachBin(pf)
}

func (c *Container) PeerInfo() topology.BinInfo {
return topology.BinInfo{
BinPopulation: uint(c.connectedPeers.Length()),
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (d *mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}

func (d *mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}

func (d *mock) Peers() []swarm.Address {
return d.peers
}
Expand Down

0 comments on commit 3571b4c

Please sign in to comment.