Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Subnet Search #8048

Merged
merged 24 commits into from
Dec 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"handshake.go",
"info.go",
"interfaces.go",
"iterator.go",
"log.go",
"monitoring.go",
"options.go",
Expand Down
24 changes: 8 additions & 16 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
// GossipTypeMapping.
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")

// Max number of attempts to search the network for a specific subnet.
const maxSubnetDiscoveryAttempts = 3

// Broadcast a message to the p2p network.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
Expand Down Expand Up @@ -75,7 +72,7 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *

// Ensure we have peers with this subnet.
s.subnetLocker(subnet).RLock()
hasPeer := s.hasPeerWithSubnet(subnet)
hasPeer := s.hasPeerWithSubnet(attestationToTopic(subnet, forkDigest))
s.subnetLocker(subnet).RUnlock()

span.AddAttributes(
Expand All @@ -89,18 +86,13 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *
if err := func() error {
s.subnetLocker(subnet).Lock()
defer s.subnetLocker(subnet).Unlock()
for i := 0; i < maxSubnetDiscoveryAttempts; i++ {
if err := ctx.Err(); err != nil {
return err
}
ok, err := s.FindPeersWithSubnet(ctx, subnet)
if err != nil {
return err
}
if ok {
savedAttestationBroadcasts.Inc()
return nil
}
ok, err := s.FindPeersWithSubnet(ctx, attestationToTopic(subnet, forkDigest), subnet, 1)
if err != nil {
return err
}
if ok {
savedAttestationBroadcasts.Inc()
return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ func (s *Service) startDiscoveryV5(
// 6) Peer's fork digest in their ENR matches that of
// our localnodes.
func (s *Service) filterPeer(node *enode.Node) bool {
// Ignore nil node entries passed in.
if node == nil {
return false
}
// ignore nodes with no ip address stored.
if node.IP() == nil {
return false
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type PeerManager interface {
Host() host.Host
ENR() *enr.Record
RefreshENR()
FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error)
FindPeersWithSubnet(ctx context.Context, topic string, index, threshold uint64) (bool, error)
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}

Expand Down
36 changes: 36 additions & 0 deletions beacon-chain/p2p/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package p2p

import (
"context"

"github.com/ethereum/go-ethereum/p2p/enode"
)

// filterNodes wraps an iterator such that Next only returns nodes for which
// the 'check' function returns true. This custom implementation also
// checks for context deadlines so that in the event the parent context has
// expired, we do exit from the search rather than perform more network
// lookups for additional peers.
func filterNodes(ctx context.Context, it enode.Iterator, check func(*enode.Node) bool) enode.Iterator {
return &filterIter{ctx, it, check}
}

type filterIter struct {
context.Context
enode.Iterator
check func(*enode.Node) bool
}

// Next looks up for the next valid node according to our
// filter criteria.
func (f *filterIter) Next() bool {
prestonvanloon marked this conversation as resolved.
Show resolved Hide resolved
for f.Iterator.Next() {
if f.Context.Err() != nil {
return false
}
if f.check(f.Node()) {
return true
}
}
return false
}
87 changes: 48 additions & 39 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/go-bitfield"
"go.opencensus.io/trace"

Expand All @@ -20,8 +19,10 @@ var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then we try to connect
// with those peers.
func (s *Service) FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error) {
// with those peers. This method will block until the required amount of
// peers are found, the method only exits in the event of context timeouts.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index, threshold uint64) (bool, error) {
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()

Expand All @@ -31,59 +32,67 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, index uint64) (bool,
// return if discovery isn't set
return false, nil
}

topic += s.Encoding().ProtocolSuffix()
iterator := s.dv5Listener.RandomNodes()
nodes := enode.ReadNodes(iterator, lookupLimit)
exists := false
for _, node := range nodes {
iterator = filterNodes(ctx, iterator, s.filterPeerForSubnet(index))

currNum := uint64(len(s.pubsub.ListPeers(topic)))
wg := new(sync.WaitGroup)
for {
if err := ctx.Err(); err != nil {
return false, err
}
if node.IP() == nil {
continue
if currNum >= threshold {
break
}
// do not look for nodes with no tcp port set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
nodes := enode.ReadNodes(iterator, int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch))
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {
continue
}
continue
wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}()
}
// Wait for all dials to be completed.
wg.Wait()
currNum = uint64(len(s.pubsub.ListPeers(topic)))
}
return true, nil
}

// returns a method with filters peers specifically for a particular attestation subnet.
func (s *Service) filterPeerForSubnet(index uint64) func(node *enode.Node) bool {
return func(node *enode.Node) bool {
if !s.filterPeer(node) {
return false
}
subnets, err := retrieveAttSubnets(node.Record())
if err != nil {
log.Debugf("could not retrieve subnets: %v", err)
continue
return false
}
indExists := false
for _, comIdx := range subnets {
if comIdx == index {
info, multiAddr, err := convertToAddrInfo(node)
if err != nil {
return false, err
}
if s.peers.IsActive(info.ID) {
exists = true
continue
}
if s.host.Network().Connectedness(info.ID) == network.Connected {
exists = true
continue
}
if !s.peers.IsReadyToDial(info.ID) {
continue
}
s.peers.Add(node.Record(), info.ID, multiAddr, network.DirUnknown)
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
continue
}
exists = true
indExists = true
break
}
}
return indExists
}
return exists, nil
}

func (s *Service) hasPeerWithSubnet(subnet uint64) bool {
return len(s.Peers().SubscribedToSubnet(subnet)) > 0
// lower threshold to broadcast object compared to searching
// for a subnet. So that even in the event of poor peer
// connectivity, we can still broadcast an attestation.
func (s *Service) hasPeerWithSubnet(topic string) bool {
return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is the only way you can access this information, but i worry it will be expensive. We might want to add HasPeer(topic string) to libp2p-pubsub. Just a thought.

Copy link
Member Author

@nisdas nisdas Dec 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, although the API isn't in our direct control and any proposed changes will take some time to be discussed/implemented upstream

}

// Updates the service's discv5 listener record's attestation subnet
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
Expand Down Expand Up @@ -105,11 +106,11 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {

// look up 3 different subnets
ctx := context.Background()
exists, err := s.FindPeersWithSubnet(ctx, 1)
exists, err := s.FindPeersWithSubnet(ctx, "", 1, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)
exists2, err := s.FindPeersWithSubnet(ctx, 2)
exists2, err := s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)
exists3, err := s.FindPeersWithSubnet(ctx, 3)
exists3, err := s.FindPeersWithSubnet(ctx, "", 3, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)
if !exists || !exists2 || !exists3 {
t.Fatal("Peer with subnet doesn't exist")
Expand All @@ -126,7 +127,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
testService.RefreshENR()
time.Sleep(2 * time.Second)

exists, err = s.FindPeersWithSubnet(ctx, 2)
exists, err = s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)

assert.Equal(t, true, exists, "Peer with subnet doesn't exist")
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (p *FakeP2P) ENR() *enr.Record {
}

// FindPeersWithSubnet mocks the p2p func.
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ uint64) (bool, error) {
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/mock_peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m MockPeerManager) ENR() *enr.Record {
func (m MockPeerManager) RefreshENR() {}

// FindPeersWithSubnet .
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ uint64) (bool, error) {
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return true, nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (p *TestP2P) Peers() *peers.Status {
}

// FindPeersWithSubnet mocks the p2p func.
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ uint64) (bool, error) {
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return false, nil
}

Expand Down
48 changes: 19 additions & 29 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,14 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.Vali
}
// Check every slot that there are enough peers
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
if !s.validPeersExist(topic, i) {
if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, i)) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", i)
go func(idx uint64) {
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
if err != nil {
log.Debugf("Could not search for peers: %v", err)
return
}
}(i)
return
_, err := s.p2p.FindPeersWithSubnet(s.ctx, s.addDigestAndIndexToTopic(topic, i), i, params.BeaconNetworkConfig().MinimumPeersInSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
return
}
}
}
}
Expand Down Expand Up @@ -330,42 +327,35 @@ func (s *Service) subscribeAggregatorSubnet(
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
}
if !s.validPeersExist(subnetTopic, idx) {
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
if err != nil {
log.Debugf("Could not search for peers: %v", err)
return
}
}(idx)
return
_, err := s.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}

// lookup peers for attester specific subnets.
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
if !s.validPeersExist(subnetTopic, idx) {
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
// perform a search for peers with the desired committee index.
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
if err != nil {
log.Debugf("Could not search for peers: %v", err)
return
}
}(idx)
// perform a search for peers with the desired committee index.
_, err := s.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}

// find if we have peers who are subscribed to the same subnet
func (s *Service) validPeersExist(subnetTopic string, idx uint64) bool {
func (s *Service) validPeersExist(subnetTopic string) bool {
numOfPeers := s.p2p.PubSub().ListPeers(subnetTopic + s.p2p.Encoding().ProtocolSuffix())
return len(s.p2p.Peers().SubscribedToSubnet(idx)) > 0 || len(numOfPeers) > 0
return uint64(len(numOfPeers)) >= params.BeaconNetworkConfig().MinimumPeersInSubnet
}

// Add fork digest to topic.
Expand Down
2 changes: 2 additions & 0 deletions shared/params/mainnet_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var mainnetNetworkConfig = &NetworkConfig{
MessageDomainValidSnappy: [4]byte{01, 00, 00, 00},
ETH2Key: "eth2",
AttSubnetKey: "attnets",
MinimumPeersInSubnet: 4,
MinimumPeersInSubnetSearch: 20,
ContractDeploymentBlock: 11184524, // Note: contract was deployed in block 11052984 but no transactions were sent until 11184524.
DepositContractAddress: "0x00000000219ab540356cBB839Cbe05303d7705Fa",
ChainID: 1, // Chain ID of eth1 mainnet.
Expand Down
6 changes: 4 additions & 2 deletions shared/params/network_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ type NetworkConfig struct {
MessageDomainValidSnappy [4]byte `yaml:"MESSAGE_DOMAIN_VALID_SNAPPY"` // MessageDomainValidSnappy is the 4-byte domain for gossip message-id isolation of valid snappy messages.

// DiscoveryV5 Config
ETH2Key string // ETH2Key is the ENR key of the eth2 object in an enr.
AttSubnetKey string // AttSubnetKey is the ENR key of the subnet bitfield in the enr.
ETH2Key string // ETH2Key is the ENR key of the eth2 object in an enr.
AttSubnetKey string // AttSubnetKey is the ENR key of the subnet bitfield in the enr.
MinimumPeersInSubnet uint64 // MinimumPeersInSubnet is the required amount of peers that a node is to have its in subnet.
MinimumPeersInSubnetSearch uint64 // PeersInSubnetSearch is the required amount of peers that we need to be able to lookup in a subnet search.

// Chain Network Config
ContractDeploymentBlock uint64 // ContractDeploymentBlock is the eth1 block in which the deposit contract is deployed.
Expand Down
Loading