Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip validation if peer is known #2491

Merged
merged 10 commits into from
Sep 15, 2021
Merged
131 changes: 94 additions & 37 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/ratelimit"
"github.com/ethersphere/bee/pkg/swarm"
lru "github.com/hashicorp/golang-lru"
ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -39,6 +40,8 @@ const (
maxBatchSize = 30
pingTimeout = time.Second * 5 // time to wait for ping to succeed
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
cacheSize = 25000
cachePrefix = 4 // enough bytes (32 bits) to uniquely identify a peer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure if this is strictly needed. even when keeping 25k addresses in memory, the total overhead is around 800kb.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I increased the cache size to 100,000 and kept the cachePrefix, it's best if cache is big for hive.

)

var (
Expand All @@ -62,9 +65,16 @@ type Service struct {
wg sync.WaitGroup
peersChan chan pb.Peers
sem *semaphore.Weighted
lru *lru.Cache // cache for unreachable peers
bootnode bool
}

func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service {
func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, logger logging.Logger) (*Service, error) {
lruCache, err := lru.New(cacheSize)
if err != nil {
return nil, err
}

svc := &Service{
streamer: streamer,
logger: logger,
Expand All @@ -76,9 +86,11 @@ func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, network
quit: make(chan struct{}),
peersChan: make(chan pb.Peers),
sem: semaphore.NewWeighted(int64(31)),
lru: lruCache,
bootnode: bootnode,
}
svc.startCheckPeersHandler()
return svc
return svc, nil
}

func (s *Service) Protocol() p2p.ProtocolSpec {
Expand Down Expand Up @@ -260,57 +272,102 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {

var peersToAdd []swarm.Address
mtx := sync.Mutex{}
wg := sync.WaitGroup{}

for _, p := range peers.Peers {
err := s.sem.Acquire(ctx, 1)
if err != nil {
return
}
if s.bootnode {
for _, p := range peers.Peers {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what is the desirable outcome here. this basically means - if we are a bootnode we persist every peer we get over discovery. if anything, we should not persist any node which we are not connected to

Copy link
Member Author

@istae istae Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because bootnodes do not dial, the newest commit discards the received peers without validating or persisting them. thanks.

multiUnderlay, err := ma.NewMultiaddrBytes(p.Underlay)
if err != nil {
s.logger.Errorf("hive: multi address underlay err: %v", err)
continue
}

wg.Add(1)
go func(newPeer *pb.BzzAddress) {
defer func() {
s.sem.Release(1)
wg.Done()
}()
bzzAddress := bzz.Address{
Overlay: swarm.NewAddress(p.Overlay),
Underlay: multiUnderlay,
Signature: p.Signature,
Transaction: p.Transaction,
}

multiUnderlay, err := ma.NewMultiaddrBytes(newPeer.Underlay)
err = s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
if err != nil {
s.logger.Errorf("hive: multi address underlay err: %v", err)
return
s.logger.Warningf("skipping peer in response %s: %v", p.String(), err)
continue
}

ctx, cancel := context.WithTimeout(ctx, pingTimeout)
defer cancel()
peersToAdd = append(peersToAdd, bzzAddress.Overlay)
}
} else {

// check if the underlay is usable by doing a raw ping using libp2p
if _, err = s.streamer.Ping(ctx, multiUnderlay); err != nil {
s.metrics.UnreachablePeers.Inc()
s.logger.Debugf("hive: peer %s: underlay %s not reachable", hex.EncodeToString(newPeer.Overlay), multiUnderlay)
return
wg := sync.WaitGroup{}
for _, p := range peers.Peers {

overlay := swarm.NewAddress(p.Overlay)
cacheOverlay := overlay.ByteString()[:cachePrefix]

// cached peer, skip
if _, ok := s.lru.Get(cacheOverlay); ok {
continue
}

bzzAddress := bzz.Address{
Overlay: swarm.NewAddress(newPeer.Overlay),
Underlay: multiUnderlay,
Signature: newPeer.Signature,
Transaction: newPeer.Transaction,
// mark peer as seen
_ = s.lru.Add(cacheOverlay, nil)

// if peer exists already in the addressBook, skip
_, err := s.addressBook.Get(overlay)
if err == nil {
continue
} else {
s.logger.Errorf("hive: addressbook %v", err)
Copy link
Member

@acud acud Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that you return an error here if the peer is not in the address book, so you basically print an error for every peer you don't know

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrekucci's request was that we should at least log it, but I agree, logging here is does not add much value.

}

err = s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
err = s.sem.Acquire(ctx, 1)
if err != nil {
s.logger.Warningf("skipping peer in response %s: %v", newPeer.String(), err)
return
}

mtx.Lock()
peersToAdd = append(peersToAdd, bzzAddress.Overlay)
mtx.Unlock()
}(p)
}
wg.Add(1)
go func(newPeer *pb.BzzAddress) {
defer func() {
s.sem.Release(1)
wg.Done()
}()

wg.Wait()
multiUnderlay, err := ma.NewMultiaddrBytes(newPeer.Underlay)
if err != nil {
s.logger.Errorf("hive: multi address underlay err: %v", err)
return
}

ctx, cancel := context.WithTimeout(ctx, pingTimeout)
defer cancel()

// check if the underlay is usable by doing a raw ping using libp2p
if _, err = s.streamer.Ping(ctx, multiUnderlay); err != nil {
s.metrics.UnreachablePeers.Inc()
s.logger.Debugf("hive: peer %s: underlay %s not reachable", hex.EncodeToString(newPeer.Overlay), multiUnderlay)
return
}

bzzAddress := bzz.Address{
Overlay: swarm.NewAddress(newPeer.Overlay),
Underlay: multiUnderlay,
Signature: newPeer.Signature,
Transaction: newPeer.Transaction,
}

err = s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
if err != nil {
s.logger.Warningf("skipping peer in response %s: %v", newPeer.String(), err)
return
}

mtx.Lock()
peersToAdd = append(peersToAdd, bzzAddress.Overlay)
mtx.Unlock()
}(p)
}
wg.Wait()
}

if s.addPeersHandler != nil && len(peersToAdd) > 0 {
s.addPeersHandler(peersToAdd...)
Expand Down
8 changes: 4 additions & 4 deletions pkg/hive/hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestHandlerRateLimit(t *testing.T) {
// new recorder for handling Ping
streamer := streamtest.New()
// create a hive server that handles the incoming stream
server := hive.New(streamer, addressbookclean, networkID, logger)
server, _ := hive.New(streamer, addressbookclean, networkID, false, logger)

serverAddress := test.RandomAddress()

Expand Down Expand Up @@ -88,7 +88,7 @@ func TestHandlerRateLimit(t *testing.T) {
}

// create a hive client that will do broadcast
client := hive.New(serverRecorder, addressbook, networkID, logger)
client, _ := hive.New(serverRecorder, addressbook, networkID, false, logger)
err := client.BroadcastPeers(context.Background(), serverAddress, peers...)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -229,15 +229,15 @@ func TestBroadcastPeers(t *testing.T) {
streamer = streamtest.New()
}
// create a hive server that handles the incoming stream
server := hive.New(streamer, addressbookclean, networkID, logger)
server, _ := hive.New(streamer, addressbookclean, networkID, false, logger)

// setup the stream recorder to record stream data
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
)

// create a hive client that will do broadcast
client := hive.New(recorder, addressbook, networkID, logger)
client, _ := hive.New(recorder, addressbook, networkID, false, logger)
if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,11 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
return nil, fmt.Errorf("pingpong service: %w", err)
}

hive := hive.New(p2ps, addressbook, networkID, logger)
hive, err := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, logger)
if err != nil {
return nil, fmt.Errorf("hive: %w", err)
}

if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
return nil, fmt.Errorf("hive service: %w", err)
}
Expand Down