Skip to content

Commit

Permalink
P2P: Simplify code (#13719)
Browse files Browse the repository at this point in the history
* `subscribeStaticWithSubnets`: Fix docstring.

* `buildOptions`: Avoid `options` mutations.

* `dv5Cfg`: Avoid mutation.

* `RefreshENR`: Use default for all but Phase0.

* `udp4`, `udp6`: Create enum.

* `p2p.Config`: `BootstrapNodeAddr`==> `BootstrapNodeAddrs`.

* `p2p.Config`: `Discv5BootStrapAddr` ==> `Discv5BootStrapAddrs`.

* `TestScorers_BadResponses_Score`: Improve.

* `BeaconNode`: Avoid mutation.

* `TestStore_TrustedPeers`: Remove blankline.

* Remove blank identifiers.

* `privKey`: Keep the majority of code with low indentation.

* `P2PPreregistration`: Return error instead of fatal log.

* `parseBootStrapAddrs` => `ParseBootStrapAddrs` (export)

* `p2p.Config`: Remove `BootstrapNodeAddrs`.

* `NewService`: Avoid mutation when possible.

* `Service`: Remove blank identifier.

* `buildOptions`: Avoid `log.Fatalf` (make deepsource happy).

* `registerGRPCGateway`: Use `net.JoinHostPort` (make deepsource happy).

* `registerBuilderService`: Make deepsource happy.

* `scorers`: Add `NoLock` suffix (make deepsource happy).

* `scorerr`: Add some `NoLock`suffixes (making deepsource happy).

* `discovery_test.go`. Remove init.

Rationale:
`rand.Seed` is deprecated: As of Go 1.20 there is no reason to call Seed with a random value. Programs that call Seed with a known value to get a specific sequence of results should use New(NewSource(seed)) to obtain a local random generator.

This makes deepsource happy as well.

* `createListener`: Reduce cyclomatic complexity (make deepsource happy).

* `startDB`: Reduce cyclomatic complexity (make deepsource happy).

* `main`: Log a FATAL on error.

This way, the error message is very readable.
Before this commit, the error message is the less readable
message in the logs.

* `New`: Reduce cyclomatic complexity (make deepsource happy).

* `main`: Avoid `App` mutation, and make deepsource happy.

* Update beacon-chain/node/node.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* `bootnodes` ==> `BootNodes` (Fix PR comment).

* Remove duplicate `configureFastSSZHashingAlgorithm` since already done in `configureBeacon`. (Fix PR comment)

* Add `TestCreateLocalNode`. (PR comment fix.)

* `startModules` ==> `startBaseServices (Fix PR comment).

* `buildOptions` return errors consistently.

* `New`: Change ordering.

---------

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
  • Loading branch information
nalepae and saolyn authored Mar 15, 2024
1 parent 9fcb9b8 commit 65f71b3
Show file tree
Hide file tree
Showing 26 changed files with 667 additions and 438 deletions.
384 changes: 234 additions & 150 deletions beacon-chain/node/node.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions beacon-chain/node/registration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//cmd:go_default_library",
"//config/params:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@in_gopkg_yaml_v2//:go_default_library",
Expand Down
7 changes: 4 additions & 3 deletions beacon-chain/node/registration/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"path/filepath"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -31,9 +32,9 @@ func P2PPreregistration(cliCtx *cli.Context) (bootstrapNodeAddrs []string, dataD
if dataDir == "" {
dataDir = cmd.DefaultDataDir()
if dataDir == "" {
log.Fatal(
"Could not determine your system's HOME path, please specify a --datadir you wish " +
"to use for your chain data",
err = errors.Errorf(
"Could not determine your system's HOME path, please specify a --%s you wish to use for your chain data",
cmd.DataDirFlag.Name,
)
}
}
Expand Down
5 changes: 2 additions & 3 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,8 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
var hosts []host.Host
// setup other nodes.
cfg = &Config{
BootstrapNodeAddr: []string{bootNode.String()},
Discv5BootStrapAddr: []string{bootNode.String()},
MaxPeers: 30,
Discv5BootStrapAddrs: []string{bootNode.String()},
MaxPeers: 30,
}
// Setup 2 different hosts
for i := 1; i <= 2; i++ {
Expand Down
43 changes: 21 additions & 22 deletions beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,27 @@ const defaultPubsubQueueSize = 600
// Config for the p2p service. These parameters are set from application level flags
// to initialize the p2p service.
type Config struct {
NoDiscovery bool
EnableUPnP bool
StaticPeerID bool
StaticPeers []string
BootstrapNodeAddr []string
Discv5BootStrapAddr []string
RelayNodeAddr string
LocalIP string
HostAddress string
HostDNS string
PrivateKey string
DataDir string
MetaDataDir string
TCPPort uint
UDPPort uint
MaxPeers uint
QueueSize uint
AllowListCIDR string
DenyListCIDR []string
StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase
ClockWaiter startup.ClockWaiter
NoDiscovery bool
EnableUPnP bool
StaticPeerID bool
StaticPeers []string
Discv5BootStrapAddrs []string
RelayNodeAddr string
LocalIP string
HostAddress string
HostDNS string
PrivateKey string
DataDir string
MetaDataDir string
TCPPort uint
UDPPort uint
MaxPeers uint
QueueSize uint
AllowListCIDR string
DenyListCIDR []string
StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase
ClockWaiter startup.ClockWaiter
}

// validateConfig validates whether the values provided are accurate and will set
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/p2p/connection_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
)

// InterceptPeerDial tests whether we're permitted to Dial the specified peer.
func (_ *Service) InterceptPeerDial(_ peer.ID) (allow bool) {
func (*Service) InterceptPeerDial(_ peer.ID) (allow bool) {
return true
}

Expand Down Expand Up @@ -63,12 +63,12 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {

// InterceptSecured tests whether a given connection, now authenticated,
// is allowed.
func (_ *Service) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool) {
func (*Service) InterceptSecured(_ network.Direction, _ peer.ID, _ network.ConnMultiaddrs) (allow bool) {
return true
}

// InterceptUpgraded tests whether a fully capable connection is allowed.
func (_ *Service) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) {
func (*Service) InterceptUpgraded(_ network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}

Expand Down
101 changes: 58 additions & 43 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type Listener interface {
LocalNode() *enode.LocalNode
}

const (
udp4 = iota
udp6
)

// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee ids for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee ids.
Expand Down Expand Up @@ -62,8 +67,14 @@ func (s *Service) RefreshENR() {
// Compare current epoch with our fork epochs
altairForkEpoch := params.BeaconConfig().AltairForkEpoch
switch {
// Altair Behaviour
case currEpoch >= altairForkEpoch:
case currEpoch < altairForkEpoch:
// Phase 0 behaviour.
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
return
}
s.updateSubnetRecordWithMetadata(bitV)
default:
// Retrieve sync subnets from application level
// cache.
bitS := bitfield.Bitvector4{byte(0x00)}
Expand All @@ -82,13 +93,6 @@ func (s *Service) RefreshENR() {
return
}
s.updateSubnetRecordWithMetadataV2(bitV, bitS)
default:
// Phase 0 behaviour.
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
return
}
s.updateSubnetRecordWithMetadata(bitV)
}
// ping all peers to inform them of new metadata
s.pingPeers()
Expand Down Expand Up @@ -140,9 +144,9 @@ func (s *Service) createListener(
// by default we will listen to all interfaces.
var bindIP net.IP
switch udpVersionFromIP(ipAddr) {
case "udp4":
case udp4:
bindIP = net.IPv4zero
case "udp6":
case udp6:
bindIP = net.IPv6zero
default:
return nil, errors.New("invalid ip provided")
Expand All @@ -160,6 +164,7 @@ func (s *Service) createListener(
IP: bindIP,
Port: int(s.cfg.UDPPort),
}

// Listen to all network interfaces
// for both ip protocols.
networkVersion := "udp"
Expand All @@ -177,44 +182,27 @@ func (s *Service) createListener(
if err != nil {
return nil, errors.Wrap(err, "could not create local node")
}
if s.cfg.HostAddress != "" {
hostIP := net.ParseIP(s.cfg.HostAddress)
if hostIP.To4() == nil && hostIP.To16() == nil {
log.Errorf("Invalid host address given: %s", hostIP.String())
} else {
localNode.SetFallbackIP(hostIP)
localNode.SetStaticIP(hostIP)
}
}
if s.cfg.HostDNS != "" {
host := s.cfg.HostDNS
ips, err := net.LookupIP(host)

bootNodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddrs))
for _, addr := range s.cfg.Discv5BootStrapAddrs {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return nil, errors.Wrap(err, "could not resolve host address")
}
if len(ips) > 0 {
// Use first IP returned from the
// resolver.
firstIP := ips[0]
localNode.SetFallbackIP(firstIP)
return nil, errors.Wrap(err, "could not bootstrap addr")
}

bootNodes = append(bootNodes, bootNode)
}

dv5Cfg := discover.Config{
PrivateKey: privKey,
}
dv5Cfg.Bootnodes = []*enode.Node{}
for _, addr := range s.cfg.Discv5BootStrapAddr {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return nil, errors.Wrap(err, "could not bootstrap addr")
}
dv5Cfg.Bootnodes = append(dv5Cfg.Bootnodes, bootNode)
Bootnodes: bootNodes,
}

listener, err := discover.ListenV5(conn, localNode, dv5Cfg)
if err != nil {
return nil, errors.Wrap(err, "could not listen to discV5")
}

return listener, nil
}

Expand Down Expand Up @@ -242,8 +230,35 @@ func (s *Service) createLocalNode(
if err != nil {
return nil, errors.Wrap(err, "could not add eth2 fork version entry to enr")
}

localNode = initializeAttSubnets(localNode)
return initializeSyncCommSubnets(localNode), nil
localNode = initializeSyncCommSubnets(localNode)

if s.cfg != nil && s.cfg.HostAddress != "" {
hostIP := net.ParseIP(s.cfg.HostAddress)
if hostIP.To4() == nil && hostIP.To16() == nil {
return nil, errors.Errorf("invalid host address: %s", s.cfg.HostAddress)
} else {
localNode.SetFallbackIP(hostIP)
localNode.SetStaticIP(hostIP)
}
}

if s.cfg != nil && s.cfg.HostDNS != "" {
host := s.cfg.HostDNS
ips, err := net.LookupIP(host)
if err != nil {
return nil, errors.Wrapf(err, "could not resolve host address: %s", host)
}
if len(ips) > 0 {
// Use first IP returned from the
// resolver.
firstIP := ips[0]
localNode.SetFallbackIP(firstIP)
}
}

return localNode, nil
}

func (s *Service) startDiscoveryV5(
Expand Down Expand Up @@ -363,7 +378,7 @@ func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
return allAddrs, nil
}

func parseBootStrapAddrs(addrs []string) (discv5Nodes []string) {
func ParseBootStrapAddrs(addrs []string) (discv5Nodes []string) {
discv5Nodes, _ = parseGenericAddrs(addrs)
if len(discv5Nodes) == 0 {
log.Warn("No bootstrap addresses supplied")
Expand Down Expand Up @@ -483,9 +498,9 @@ func multiAddrFromString(address string) (ma.Multiaddr, error) {
return ma.NewMultiaddr(address)
}

func udpVersionFromIP(ipAddr net.IP) string {
func udpVersionFromIP(ipAddr net.IP) int {
if ipAddr.To4() != nil {
return "udp4"
return udp4
}
return "udp6"
return udp6
}
Loading

0 comments on commit 65f71b3

Please sign in to comment.