Skip to content

Commit

Permalink
Implement ListPeers in the node API (#8288)
Browse files Browse the repository at this point in the history
* add span

* update ethereumapis

* align server with ethereumapis

* benchmark

* naive implementation

* rename two status functions

* new Inbound and Outbound status functions

* tests

* 'enr:' prefix

* refactoring

* gzl

* case when one filter is empty

* empty filter condition fix

* deepsource

* rename getPeer to peerInfo

* bring back correct version of ethereumapis

* go mod tidy
  • Loading branch information
rkapka authored Jan 20, 2021
1 parent 4b14fa4 commit 1537378
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 15 deletions.
30 changes: 28 additions & 2 deletions beacon-chain/p2p/peers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,21 @@ func (p *Status) Connected() []peer.ID {
return peers
}

// Inbound returns the current batch of inbound peers that are connected.
// Inbound returns the current batch of inbound peers.
func (p *Status) Inbound() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.Direction == network.DirInbound {
peers = append(peers, pid)
}
}
return peers
}

// InboundConnected returns the current batch of inbound peers that are connected.
func (p *Status) InboundConnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
Expand All @@ -382,8 +395,21 @@ func (p *Status) Inbound() []peer.ID {
return peers
}

// Outbound returns the current batch of outbound peers that are connected.
// Outbound returns the current batch of outbound peers.
func (p *Status) Outbound() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.Direction == network.DirOutbound {
peers = append(peers, pid)
}
}
return peers
}

// OutboundConnected returns the current batch of outbound peers that are connected.
func (p *Status) OutboundConnected() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
Expand Down
40 changes: 39 additions & 1 deletion beacon-chain/p2p/peers/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func TestPrunePeers(t *testing.T) {
}

// Set up bad scores for inbound peers.
inboundPeers := p.Inbound()
inboundPeers := p.InboundConnected()
for i, pid := range inboundPeers {
modulo := i % 5
// Increment bad scores for peers.
Expand Down Expand Up @@ -950,6 +950,44 @@ func TestStatus_CurrentEpoch(t *testing.T) {
assert.Equal(t, uint64(5), p.HighestEpoch(), "Expected current epoch to be 5")
}

func TestInbound(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 0,
},
},
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
inbound := createPeer(t, p, addr, network.DirInbound, peers.PeerConnected)
createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected)

result := p.Inbound()
require.Equal(t, 1, len(result))
assert.Equal(t, inbound.Pretty(), result[0].Pretty())
}

func TestOutbound(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 0,
},
},
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
createPeer(t, p, addr, network.DirInbound, peers.PeerConnected)
outbound := createPeer(t, p, addr, network.DirOutbound, peers.PeerConnected)

result := p.Outbound()
require.Equal(t, 1, len(result))
assert.Equal(t, outbound.Pretty(), result[0].Pretty())
}

// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID {
// Set up some peers with different states
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func (s *Service) Start() {
})
runutil.RunEvery(s.ctx, 1*time.Minute, func() {
log.WithFields(logrus.Fields{
"inbound": len(s.peers.Inbound()),
"outbound": len(s.peers.Outbound()),
"inbound": len(s.peers.InboundConnected()),
"outbound": len(s.peers.OutboundConnected()),
"activePeers": len(s.peers.Active()),
}).Info("Peer summary")
})
Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/p2p/testing/mock_peersprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ type MockPeersProvider struct {
peers *peers.Status
}

// ClearPeers removes all known peers.
func (m *MockPeersProvider) ClearPeers() {
m.lock.Lock()
defer m.lock.Unlock()
m.peers = peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 5,
},
},
})
}

// Peers provides access the peer status.
func (m *MockPeersProvider) Peers() *peers.Status {
m.lock.Lock()
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/nodev1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/sync:go_default_library",
"//shared/version:go_default_library",
Expand All @@ -37,6 +38,7 @@ go_test(
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
Expand Down
169 changes: 166 additions & 3 deletions beacon-chain/rpc/nodev1/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@ import (
"context"
"fmt"
"runtime"
"strings"

ptypes "github.com/gogo/protobuf/types"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/shared/version"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
stateConnecting = ethpb.ConnectionState_CONNECTING.String()
stateConnected = ethpb.ConnectionState_CONNECTED.String()
stateDisconnecting = ethpb.ConnectionState_DISCONNECTING.String()
stateDisconnected = ethpb.ConnectionState_DISCONNECTED.String()
directionInbound = ethpb.PeerDirection_INBOUND.String()
directionOutbound = ethpb.PeerDirection_OUTBOUND.String()
)

// GetIdentity retrieves data about the node's network presence.
func (ns *Server) GetIdentity(ctx context.Context, _ *ptypes.Empty) (*ethpb.IdentityResponse, error) {
ctx, span := trace.StartSpan(ctx, "nodeV1.GetIdentity")
Expand Down Expand Up @@ -88,7 +99,7 @@ func (ns *Server) GetPeer(ctx context.Context, req *ethpb.PeerRequest) (*ethpb.P
}
state, err := peerStatus.ConnectionState(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain state: %v", err)
return nil, status.Errorf(codes.Internal, "Could not obtain connection state: %v", err)
}
direction, err := peerStatus.Direction(id)
if err != nil {
Expand All @@ -107,7 +118,99 @@ func (ns *Server) GetPeer(ctx context.Context, req *ethpb.PeerRequest) (*ethpb.P
}

// ListPeers retrieves data about the node's network peers.
func (ns *Server) ListPeers(ctx context.Context, _ *ptypes.Empty) (*ethpb.PeersResponse, error) {
func (ns *Server) ListPeers(ctx context.Context, req *ethpb.PeersRequest) (*ethpb.PeersResponse, error) {
ctx, span := trace.StartSpan(ctx, "nodev1.ListPeers")
defer span.End()

peerStatus := ns.PeersFetcher.Peers()
emptyStateFilter, emptyDirectionFilter := ns.handleEmptyFilters(req, peerStatus)

if emptyStateFilter && emptyDirectionFilter {
allIds := peerStatus.All()
allPeers := make([]*ethpb.Peer, 0, len(allIds))
for _, id := range allIds {
p, err := peerInfo(peerStatus, id)
if err != nil {
return nil, err
}
allPeers = append(allPeers, p)
}
return &ethpb.PeersResponse{Data: allPeers}, nil
}

var stateIds []peer.ID
if emptyStateFilter {
stateIds = peerStatus.All()
} else {
for _, stateFilter := range req.State {
normalized := strings.ToUpper(stateFilter)
if normalized == stateConnecting {
ids := peerStatus.Connecting()
stateIds = append(stateIds, ids...)
continue
}
if normalized == stateConnected {
ids := peerStatus.Connected()
stateIds = append(stateIds, ids...)
continue
}
if normalized == stateDisconnecting {
ids := peerStatus.Disconnecting()
stateIds = append(stateIds, ids...)
continue
}
if normalized == stateDisconnected {
ids := peerStatus.Disconnected()
stateIds = append(stateIds, ids...)
continue
}
}
}

var directionIds []peer.ID
if emptyDirectionFilter {
directionIds = peerStatus.All()
} else {
for _, directionFilter := range req.Direction {
normalized := strings.ToUpper(directionFilter)
if normalized == directionInbound {
ids := peerStatus.Inbound()
directionIds = append(directionIds, ids...)
continue
}
if normalized == directionOutbound {
ids := peerStatus.Outbound()
directionIds = append(directionIds, ids...)
continue
}
}
}

var filteredIds []peer.ID
for _, stateId := range stateIds {
for _, directionId := range directionIds {
if stateId.Pretty() == directionId.Pretty() {
filteredIds = append(filteredIds, stateId)
break
}
}
}
filteredPeers := make([]*ethpb.Peer, 0, len(filteredIds))
for _, id := range filteredIds {
p, err := peerInfo(peerStatus, id)
if err != nil {
return nil, err
}
filteredPeers = append(filteredPeers, p)
}
return &ethpb.PeersResponse{Data: filteredPeers}, nil
}

// PeerCount retrieves retrieves number of known peers.
func (ns *Server) PeerCount(ctx context.Context, _ *ptypes.Empty) (*ethpb.PeerCountResponse, error) {
ctx, span := trace.StartSpan(ctx, "nodev1.PeerCount")
defer span.End()

return nil, errors.New("unimplemented")
}

Expand All @@ -127,7 +230,10 @@ func (ns *Server) GetVersion(ctx context.Context, _ *ptypes.Empty) (*ethpb.Versi

// GetSyncStatus requests the beacon node to describe if it's currently syncing or not, and
// if it is, what block it is up to.
func (ns *Server) GetSyncStatus(_ context.Context, _ *ptypes.Empty) (*ethpb.SyncingResponse, error) {
func (ns *Server) GetSyncStatus(ctx context.Context, _ *ptypes.Empty) (*ethpb.SyncingResponse, error) {
ctx, span := trace.StartSpan(ctx, "nodev1.GetSyncStatus")
defer span.End()

headSlot := ns.HeadFetcher.HeadSlot()
return &ethpb.SyncingResponse{
Data: &ethpb.SyncInfo{
Expand All @@ -154,3 +260,60 @@ func (ns *Server) GetHealth(ctx context.Context, _ *ptypes.Empty) (*ptypes.Empty
}
return &ptypes.Empty{}, status.Error(codes.Internal, "Node not initialized or having issues")
}

func (ns *Server) handleEmptyFilters(req *ethpb.PeersRequest, peerStatus *peers.Status) (emptyState, emptyDirection bool) {
emptyState = true
for _, stateFilter := range req.State {
normalized := strings.ToUpper(stateFilter)
filterValid := normalized == stateConnecting || normalized == stateConnected ||
normalized == stateDisconnecting || normalized == stateDisconnected
if filterValid {
emptyState = false
break
}
}

emptyDirection = true
for _, directionFilter := range req.Direction {
normalized := strings.ToUpper(directionFilter)
filterValid := normalized == directionInbound || normalized == directionOutbound
if filterValid {
emptyDirection = false
break
}
}

return emptyState, emptyDirection
}

func peerInfo(peerStatus *peers.Status, id peer.ID) (*ethpb.Peer, error) {
enr, err := peerStatus.ENR(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain ENR: %v", err)
}
serializedEnr, err := p2p.SerializeENR(enr)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not serialize ENR: %v", err)
}
address, err := peerStatus.Address(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain address: %v", err)
}
connectionState, err := peerStatus.ConnectionState(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain connection state: %v", err)
}
direction, err := peerStatus.Direction(id)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not obtain direction: %v", err)
}
p := ethpb.Peer{
PeerId: id.Pretty(),
Enr: "enr:" + serializedEnr,
Address: address.String(),
State: ethpb.ConnectionState(connectionState),
Direction: ethpb.PeerDirection(direction),
}

return &p, nil
}
Loading

0 comments on commit 1537378

Please sign in to comment.