Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ListPeers in the node API #8288

Merged
merged 21 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions beacon-chain/p2p/peers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,21 @@ func (p *Status) Connected() []peer.ID {
return peers
}

// Inbound returns the current batch of inbound peers that are connected.
// Inbound returns the current batch of inbound peers.
func (p *Status) Inbound() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.Direction == network.DirInbound {
peers = append(peers, pid)
}
}
return peers
}

// InboundConnected returns the current batch of inbound peers that are connected.
func (p *Status) InboundConnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
Expand All @@ -382,8 +395,21 @@ func (p *Status) Inbound() []peer.ID {
return peers
}

// Outbound returns the current batch of outbound peers that are connected.
// Outbound returns the current batch of outbound peers.
func (p *Status) Outbound() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.Direction == network.DirOutbound {
peers = append(peers, pid)
}
}
return peers
}

// OutboundConnected returns the current batch of outbound peers that are connected.
func (p *Status) OutboundConnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
Expand Down
40 changes: 39 additions & 1 deletion beacon-chain/p2p/peers/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func TestPrunePeers(t *testing.T) {
}

// Set up bad scores for inbound peers.
inboundPeers := p.Inbound()
inboundPeers := p.InboundConnected()
for i, pid := range inboundPeers {
modulo := i % 5
// Increment bad scores for peers.
Expand Down Expand Up @@ -950,6 +950,44 @@ func TestStatus_CurrentEpoch(t *testing.T) {
assert.Equal(t, uint64(5), p.HighestEpoch(), "Expected current epoch to be 5")
}

func TestInbound(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 0,
},
},
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
inbound := createPeer(t, p, addr, network.DirInbound, peers.PeerConnected)
createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected)

result := p.Inbound()
require.Equal(t, 1, len(result))
assert.Equal(t, inbound.Pretty(), result[0].Pretty())
}

func TestOutbound(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 0,
},
},
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
createPeer(t, p, addr, network.DirInbound, peers.PeerConnected)
outbound := createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected)

result := p.Outbound()
require.Equal(t, 1, len(result))
assert.Equal(t, outbound.Pretty(), result[0].Pretty())
}

// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID {
// Set up some peers with different states
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func (s *Service) Start() {
})
runutil.RunEvery(s.ctx, 1*time.Minute, func() {
log.WithFields(logrus.Fields{
"inbound": len(s.peers.Inbound()),
"outbound": len(s.peers.Outbound()),
"inbound": len(s.peers.InboundConnected()),
"outbound": len(s.peers.OutboundConnected()),
"activePeers": len(s.peers.Active()),
}).Info("Peer summary")
})
Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/p2p/testing/mock_peersprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ type MockPeersProvider struct {
peers *peers.Status
}

// ClearPeers removes all known peers.
func (m *MockPeersProvider) ClearPeers() {
m.lock.Lock()
defer m.lock.Unlock()
m.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 5,
},
},
})
}

// Peers provides access the peer status.
func (m *MockPeersProvider) Peers() *peers.Status {
m.lock.Lock()
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/nodev1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/sync:go_default_library",
"//shared/version:go_default_library",
Expand All @@ -37,6 +38,7 @@ go_test(
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
Expand Down
169 changes: 166 additions & 3 deletions beacon-chain/rpc/nodev1/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@ import (
"context"
"fmt"
"runtime"
"strings"

ptypes "github.com/gogo/protobuf/types"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/shared/version"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
stateConnecting = ethpb.ConnectionState_CONNECTING.String()
stateConnected = ethpb.ConnectionState_CONNECTED.String()
stateDisconnecting = ethpb.ConnectionState_DISCONNECTING.String()
stateDisconnected = ethpb.ConnectionState_DISCONNECTED.String()
directionInbound = ethpb.PeerDirection_INBOUND.String()
directionOutbound = ethpb.PeerDirection_OUTBOUND.String()
)

// GetIdentity retrieves data about the node's network presence.
func (ns *Server) GetIdentity(ctx context.Context, _ *ptypes.Empty) (*ethpb.IdentityResponse, error) {
ctx, span := trace.StartSpan(ctx, "nodeV1.GetIdentity")
Expand Down Expand Up @@ -88,7 +99,7 @@ func (ns *Server) GetPeer(ctx context.Context, req *ethpb.PeerRequest) (*ethpb.P
}
state, err := peerStatus.ConnectionState(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain state: %v", err)
return nil, status.Errorf(codes.Internal, "Could not obtain connection state: %v", err)
}
direction, err := peerStatus.Direction(id)
if err != nil {
Expand All @@ -107,7 +118,99 @@ func (ns *Server) GetPeer(ctx context.Context, req *ethpb.PeerRequest) (*ethpb.P
}

// ListPeers retrieves data about the node's network peers.
func (ns *Server) ListPeers(ctx context.Context, _ *ptypes.Empty) (*ethpb.PeersResponse, error) {
func (ns *Server) ListPeers(ctx context.Context, req *ethpb.PeersRequest) (*ethpb.PeersResponse, error) {
ctx, span := trace.StartSpan(ctx, "nodev1.ListPeers")
defer span.End()

peerStatus := ns.PeersFetcher.Peers()
emptyStateFilter, emptyDirectionFilter := ns.handleEmptyFilters(req, peerStatus)

if emptyStateFilter && emptyDirectionFilter {
allIds := peerStatus.All()
allPeers := make([]*ethpb.Peer, 0, len(allIds))
for _, id := range allIds {
p, err := getPeer(peerStatus, id)
if err != nil {
return nil, err
}
allPeers = append(allPeers, p)
}
return &ethpb.PeersResponse{Data: allPeers}, nil
}

var stateIds []peer.ID
if emptyStateFilter {
stateIds = peerStatus.All()
} else {
for _, stateFilter := range req.State {
normalized := strings.ToUpper(stateFilter)
if normalized == stateConnecting {
ids := peerStatus.Connecting()
stateIds = append(stateIds, ids...)
continue
}
if normalized == stateConnected {
ids := peerStatus.Connected()
stateIds = append(stateIds, ids...)
continue
}
if normalized == stateDisconnecting {
ids := peerStatus.Disconnecting()
stateIds = append(stateIds, ids...)
continue
}
if normalized == stateDisconnected {
ids := peerStatus.Disconnected()
stateIds = append(stateIds, ids...)
continue
}
}
}

var directionIds []peer.ID
if emptyDirectionFilter {
directionIds = peerStatus.All()
} else {
for _, directionFilter := range req.Direction {
normalized := strings.ToUpper(directionFilter)
if normalized == directionInbound {
ids := peerStatus.Inbound()
directionIds = append(directionIds, ids...)
continue
}
if normalized == directionOutbound {
ids := peerStatus.Outbound()
directionIds = append(directionIds, ids...)
continue
}
}
}

var filteredIds []peer.ID
for _, stateId := range stateIds {
for _, directionId := range directionIds {
if stateId.Pretty() == directionId.Pretty() {
filteredIds = append(filteredIds, stateId)
break
}
}
}
filteredPeers := make([]*ethpb.Peer, 0, len(filteredIds))
for _, id := range filteredIds {
p, err := getPeer(peerStatus, id)
if err != nil {
return nil, err
}
filteredPeers = append(filteredPeers, p)
}
return &ethpb.PeersResponse{Data: filteredPeers}, nil
}

// PeerCount retrieves retrieves number of known peers.
func (ns *Server) PeerCount(ctx context.Context, _ *ptypes.Empty) (*ethpb.PeerCountResponse, error) {
Copy link
Contributor Author

@rkapka rkapka Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side-effect of prysmaticlabs/ethereumapis#214 - I had to add this function to conform to the new proto definitions.

ctx, span := trace.StartSpan(ctx, "nodev1.PeerCount")
defer span.End()

return nil, errors.New("unimplemented")
}

Expand All @@ -127,7 +230,10 @@ func (ns *Server) GetVersion(ctx context.Context, _ *ptypes.Empty) (*ethpb.Versi

// GetSyncStatus requests the beacon node to describe if it's currently syncing or not, and
// if it is, what block it is up to.
func (ns *Server) GetSyncStatus(_ context.Context, _ *ptypes.Empty) (*ethpb.SyncingResponse, error) {
func (ns *Server) GetSyncStatus(ctx context.Context, _ *ptypes.Empty) (*ethpb.SyncingResponse, error) {
ctx, span := trace.StartSpan(ctx, "nodev1.GetSyncStatus")
defer span.End()

headSlot := ns.HeadFetcher.HeadSlot()
return &ethpb.SyncingResponse{
Data: &ethpb.SyncInfo{
Expand All @@ -154,3 +260,60 @@ func (ns *Server) GetHealth(ctx context.Context, _ *ptypes.Empty) (*ptypes.Empty
}
return &ptypes.Empty{}, status.Error(codes.Internal, "Node not initialized or having issues")
}

func (ns *Server) handleEmptyFilters(req *ethpb.PeersRequest, peerStatus *peers.Status) (emptyState, emptyDirection bool) {
emptyState = true
for _, stateFilter := range req.State {
normalized := strings.ToUpper(stateFilter)
filterValid := normalized == stateConnecting || normalized == stateConnected ||
normalized == stateDisconnecting || normalized == stateDisconnected
if filterValid {
emptyState = false
break
}
}

emptyDirection = true
for _, directionFilter := range req.Direction {
normalized := strings.ToUpper(directionFilter)
filterValid := normalized == directionInbound || normalized == directionOutbound
if filterValid {
emptyDirection = false
break
}
}

return emptyState, emptyDirection
}

func getPeer(peerStatus *peers.Status, id peer.ID) (*ethpb.Peer, error) {
enr, err := peerStatus.ENR(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain ENR: %v", err)
}
serializedEnr, err := p2p.SerializeENR(enr)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not serialize ENR: %v", err)
}
address, err := peerStatus.Address(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain address: %v", err)
}
connectionState, err := peerStatus.ConnectionState(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain connection state: %v", err)
}
direction, err := peerStatus.Direction(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain direction: %v", err)
}
p := ethpb.Peer{
PeerId: id.Pretty(),
Enr: "enr:" + serializedEnr,
Address: address.String(),
State: ethpb.ConnectionState(connectionState),
Direction: ethpb.PeerDirection(direction),
}

return &p, nil
}
Loading