Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update P2P Service to Handle Local Metadata #5319

Merged
merged 21 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
log.WithField("currentState", peerConnectionState).WithField("reason", "already active").Trace("Ignoring connection request")
return
}
s.peers.Add(conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction, nil)
s.peers.Add(nil /* ENR */, conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction)
if len(s.peers.Active()) >= int(s.cfg.MaxPeers) {
log.WithField("reason", "at peer limit").Trace("Ignoring connection request")
if err := s.Disconnect(conn.RemotePeer()); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/p2p/peers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)

Expand All @@ -23,6 +26,7 @@ go_test(
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
Expand Down
74 changes: 62 additions & 12 deletions beacon-chain/p2p/peers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
Expand Down Expand Up @@ -66,9 +69,10 @@ type peerStatus struct {
direction network.Direction
peerState PeerConnectionState
chainState *pb.Status
enr *enr.Record
metaData *pb.MetaData
chainStateLastUpdated time.Time
badResponses int
committeeIndices []uint64
}

// NewStatus creates a new status entity.
Expand All @@ -86,27 +90,29 @@ func (p *Status) MaxBadResponses() int {

// Add adds a peer.
// If a peer already exists with this ID its address and direction are updated with the supplied data.
func (p *Status) Add(pid peer.ID, address ma.Multiaddr, direction network.Direction, indices []uint64) {
func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, direction network.Direction) {
p.lock.Lock()
defer p.lock.Unlock()

if status, ok := p.status[pid]; ok {
// Peer already exists, just update its address info.
status.address = address
status.direction = direction
if indices != nil {
status.committeeIndices = indices
if record != nil {
status.enr = record
}
return
}

p.status[pid] = &peerStatus{
status := &peerStatus{
address: address,
direction: direction,
// Peers start disconnected; state will be updated when the handshake process begins.
peerState: PeerDisconnected,
committeeIndices: indices,
peerState: PeerDisconnected,
}
if record != nil {
status.enr = record
}
p.status[pid] = status
}

// Address returns the multiaddress of the given remote peer.
Expand All @@ -133,6 +139,17 @@ func (p *Status) Direction(pid peer.ID) (network.Direction, error) {
return network.DirUnknown, ErrPeerUnknown
}

// ENR returns the enr for the corresponding peer id.
func (p *Status) ENR(pid peer.ID) (*enr.Record, error) {
p.lock.RLock()
defer p.lock.RUnlock()

if status, ok := p.status[pid]; ok {
return status.enr, nil
}
return nil, ErrPeerUnknown
}

// SetChainState sets the chain state of the given remote peer.
func (p *Status) SetChainState(pid peer.ID, chainState *pb.Status) {
p.lock.Lock()
Expand Down Expand Up @@ -165,16 +182,37 @@ func (p *Status) IsActive(pid peer.ID) bool {
return ok && (status.peerState == PeerConnected || status.peerState == PeerConnecting)
}

// SetMetadata sets the metadata of the given remote peer.
func (p *Status) SetMetadata(pid peer.ID, metaData *pb.MetaData) {
p.lock.Lock()
defer p.lock.Unlock()

status := p.fetch(pid)
status.metaData = metaData
}

// Metadata returns a copy of the metadata corresponding to the provided
// peer id.
func (p *Status) Metadata(pid peer.ID) (*pb.MetaData, error) {
p.lock.RLock()
defer p.lock.RUnlock()

if status, ok := p.status[pid]; ok {
return proto.Clone(status.metaData).(*pb.MetaData), nil
}
return nil, ErrPeerUnknown
}

// CommitteeIndices retrieves the committee subnets the peer is subscribed to.
func (p *Status) CommitteeIndices(pid peer.ID) ([]uint64, error) {
p.lock.RLock()
defer p.lock.RUnlock()

if status, ok := p.status[pid]; ok {
if status.committeeIndices == nil {
if status.enr == nil || status.metaData == nil {
return []uint64{}, nil
}
return status.committeeIndices, nil
return retrieveIndicesFromBitfield(status.metaData.Attnets), nil
}
return nil, ErrPeerUnknown
}
Expand All @@ -189,10 +227,12 @@ func (p *Status) SubscribedToSubnet(index uint64) []peer.ID {
for pid, status := range p.status {
// look at active peers
if status.peerState == PeerConnecting || status.peerState == PeerConnected &&
status.committeeIndices != nil {
for _, idx := range status.committeeIndices {
status.metaData != nil {
indices := retrieveIndicesFromBitfield(status.metaData.Attnets)
for _, idx := range indices {
if idx == index {
peers = append(peers, pid)
break
}
}
}
Expand Down Expand Up @@ -455,3 +495,13 @@ func (p *Status) CurrentEpoch() uint64 {
}
return helpers.SlotToEpoch(highestSlot)
}

func retrieveIndicesFromBitfield(bitV bitfield.Bitvector64) []uint64 {
committeeIdxs := []uint64{}
for i := uint64(0); i < 64; i++ {
if bitV.BitAt(i) {
committeeIdxs = append(committeeIdxs, i)
}
}
return committeeIdxs
}
Loading