Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node/P2P: Add protected peers list #4292

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions node/cmd/ccq/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -52,6 +53,7 @@ func runP2P(
monitorPeers bool,
loggingMap *LoggingMap,
gossipAdvertiseAddress string,
protectedPeers []string,
) (*P2PSub, error) {
// p2p setup
components := p2p.DefaultComponents()
Expand All @@ -63,6 +65,12 @@ func runP2P(
return nil, err
}

if len(protectedPeers) != 0 {
for _, peerId := range protectedPeers {
components.ConnMgr.Protect(peer.ID(peerId), "configured")
}
}

topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")

Expand Down
4 changes: 3 additions & 1 deletion node/cmd/ccq/query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
protectedPeers []string
listenAddr *string
nodeKeyPath *string
signerKeyPath *string
Expand All @@ -57,6 +58,7 @@ func init() {
p2pNetworkID = QueryServerCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)")
p2pPort = QueryServerCmd.Flags().Uint("port", 8995, "P2P UDP listener port")
p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
QueryServerCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")
nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
signerKeyPath = QueryServerCmd.Flags().String("signerKey", "", "Path to key used to sign unsigned queries")
listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)")
Expand Down Expand Up @@ -204,7 +206,7 @@ func runQueryServer(cmd *cobra.Command, args []string) {

// Run p2p
pendingResponses := NewPendingResponses(logger)
p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress)
p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress, protectedPeers)
if err != nil {
logger.Fatal("Failed to start p2p", zap.Error(err))
}
Expand Down
12 changes: 8 additions & 4 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ import (
)

var (
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
protectedPeers []string

nodeKeyPath *string

Expand Down Expand Up @@ -262,6 +263,7 @@ var (
ccqAllowedRequesters *string
ccqP2pPort *uint
ccqP2pBootstrap *string
ccqProtectedPeers []string
ccqAllowedPeers *string
ccqBackfillCache *bool

Expand All @@ -282,6 +284,7 @@ func init() {
p2pNetworkID = NodeCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)")
p2pPort = NodeCmd.Flags().Uint("port", p2p.DefaultPort, "P2P UDP listener port")
p2pBootstrap = NodeCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)")
NodeCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")

statusAddr = NodeCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")

Expand Down Expand Up @@ -491,6 +494,7 @@ func init() {
ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries")
ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port")
ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)")
NodeCmd.Flags().StringSliceVarP(&ccqProtectedPeers, "ccqProtectedPeers", "", []string{}, "")
ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)")
ccqBackfillCache = NodeCmd.Flags().Bool("ccqBackfillCache", true, "Should EVM chains backfill CCQ timestamp cache on startup")
gossipAdvertiseAddress = NodeCmd.Flags().String("gossipAdvertiseAddress", "", "External IP to advertize on Guardian and CCQ p2p (use if behind a NAT or running in k8s)")
Expand Down Expand Up @@ -1777,7 +1781,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn),
node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters),
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures, protectedPeers, ccqProtectedPeers),
node.GuardianOptionStatusServer(*statusAddr),
node.GuardianOptionProcessor(*p2pNetworkID),
}
Expand Down
9 changes: 6 additions & 3 deletions node/cmd/spy/spy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var (
var (
envStr *string

p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
protectedPeers []string

statusAddr *string

Expand All @@ -59,6 +60,7 @@ func init() {
p2pNetworkID = SpyCmd.Flags().String("network", "", "P2P network identifier (optional for testnet or mainnet, overrides default, required for devnet)")
p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)")
SpyCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "")

statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")

Expand Down Expand Up @@ -396,6 +398,7 @@ func runSpy(cmd *cobra.Command, args []string) {
rootCtxCancel,
p2p.WithSignedVAAListener(signedInC),
p2p.WithComponents(components),
p2p.WithProtectedPeers(protectedPeers),
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
GuardianOptionNoAccountant(), // disable accountant
GuardianOptionGovernor(true, false, ""),
GuardianOptionGatewayRelayer("", nil), // disable gateway relayer
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }),
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }, []string{}, []string{}),
GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail),
GuardianOptionPublicrpcTcpService(cfg.publicRpc, publicRpcLogDetail),
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
Expand Down
7 changes: 6 additions & 1 deletion node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func GuardianOptionP2P(
ccqAllowedPeers string,
gossipAdvertiseAddress string,
ibcFeaturesFunc func() string,
protectedPeers []string,
ccqProtectedPeers []string,
) *GuardianOption {
return &GuardianOption{
name: "p2p",
Expand Down Expand Up @@ -101,7 +103,10 @@ func GuardianOptionP2P(
g.queryResponsePublicationC.readC,
ccqBootstrapPeers,
ccqPort,
ccqAllowedPeers),
ccqAllowedPeers,
protectedPeers,
ccqProtectedPeers,
),
p2p.WithProcessorFeaturesFunc(processor.GetFeatures),
)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions node/pkg/p2p/ccq_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (ccq *ccqP2p) run(
port uint,
signedQueryReqC chan<- *gossipv1.SignedQueryRequest,
queryResponseReadC <-chan *query.QueryResponsePublication,
protectedPeers []string,
errC chan error,
) error {
networkID := p2pNetworkID + "/ccq"
Expand All @@ -95,6 +96,12 @@ func (ccq *ccqP2p) run(
return fmt.Errorf("failed to create p2p: %w", err)
}

if len(protectedPeers) != 0 {
for _, peerId := range protectedPeers {
components.ConnMgr.Protect(peer.ID(peerId), "configured")
}
}

// Build a map of bootstrap peers so we can always allow subscribe requests from them.
bootstrapPeersMap := map[string]struct{}{}
bootstrappers, _ := BootstrapAddrs(ccq.logger, bootstrapPeers, ccq.h.ID())
Expand Down
9 changes: 8 additions & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if len(params.protectedPeers) != 0 {
for _, peerId := range params.protectedPeers {
logger.Info("protecting peer", zap.String("peerId", peerId))
params.components.ConnMgr.Protect(peer.ID(peerId), "configured")
}
}

nodeIdBytes, err := h.ID().Marshal()
if err != nil {
panic(err)
Expand Down Expand Up @@ -462,7 +469,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
if params.ccqEnabled {
ccqErrC := make(chan error)
ccq := newCcqRunP2p(logger, params.ccqAllowedPeers, params.components)
if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, ccqErrC); err != nil {
if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, params.ccqProtectedPeers, ccqErrC); err != nil {
return fmt.Errorf("failed to start p2p for CCQ: %w", err)
}
defer ccq.close()
Expand Down
22 changes: 22 additions & 0 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type (
ccqBootstrapPeers string
ccqPort uint
ccqAllowedPeers string
protectedPeers []string
ccqProtectedPeers []string
}

// RunOpt is used to specify optional parameters.
Expand Down Expand Up @@ -162,6 +164,22 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
}
}

// WithProtectedPeers is used to set the protected peers.
func WithProtectedPeers(protectedPeers []string) RunOpt {
return func(p *RunParams) error {
p.protectedPeers = protectedPeers
return nil
}
}

// WithCcqProtectedPeers is used to set the protected peers for CCQ.
func WithCcqProtectedPeers(ccqProtectedPeers []string) RunOpt {
return func(p *RunParams) error {
p.ccqProtectedPeers = ccqProtectedPeers
return nil
}
}

// WithGuardianOptions is used to set options that are only meaningful to the guardian.
func WithGuardianOptions(
nodeName string,
Expand All @@ -185,6 +203,8 @@ func WithGuardianOptions(
ccqBootstrapPeers string,
ccqPort uint,
ccqAllowedPeers string,
protectedPeers []string,
ccqProtectedPeers []string,
) RunOpt {
return func(p *RunParams) error {
p.nodeName = nodeName
Expand All @@ -208,6 +228,8 @@ func WithGuardianOptions(
p.ccqBootstrapPeers = ccqBootstrapPeers
p.ccqPort = ccqPort
p.ccqAllowedPeers = ccqAllowedPeers
p.protectedPeers = protectedPeers
p.ccqProtectedPeers = ccqProtectedPeers
return nil
}
}
Expand Down
67 changes: 66 additions & 1 deletion node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,57 @@ func TestRunParamsWithDisableHeartbeatVerify(t *testing.T) {
assert.True(t, params.disableHeartbeatVerify)
}

func TestRunParamsWithProtectedPeers(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()

protectedPeers := []string{"peer1", "peer2", "peer3"}
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
gst,
rootCtxCancel,
WithProtectedPeers(protectedPeers),
)

require.NoError(t, err)
require.NotNil(t, params)

require.Equal(t, len(protectedPeers), len(params.protectedPeers))
assert.Equal(t, protectedPeers[0], params.protectedPeers[0])
assert.Equal(t, protectedPeers[1], params.protectedPeers[1])
assert.Equal(t, protectedPeers[2], params.protectedPeers[2])
}

func TestRunParamsWithCcqProtectedPeers(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
gst := common.NewGuardianSetState(nil)
_, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()

ccqProtectedPeers := []string{"peerA", "peerB"}
params, err := NewRunParams(
bootstrapPeers,
networkId,
priv,
gst,
rootCtxCancel,
WithCcqProtectedPeers(ccqProtectedPeers),
)

require.NoError(t, err)
require.NotNil(t, params)

require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers))
assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0])
assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1])
}

func TestRunParamsWithGuardianOptions(t *testing.T) {
priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1)
require.NoError(t, err)
Expand Down Expand Up @@ -159,6 +210,8 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
ccqBootstrapPeers := "some bootstrap string"
ccqPort := uint(4242)
ccqAllowedPeers := "some allowed peers"
protectedPeers := []string{"peer1", "peer2", "peer3"}
ccqProtectedPeers := []string{"peerA", "peerB"}

params, err := NewRunParams(
bootstrapPeers,
Expand Down Expand Up @@ -187,7 +240,10 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
queryResponseReadC,
ccqBootstrapPeers,
ccqPort,
ccqAllowedPeers),
ccqAllowedPeers,
protectedPeers,
ccqProtectedPeers,
),
)

require.NoError(t, err)
Expand All @@ -210,4 +266,13 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
assert.Equal(t, ccqBootstrapPeers, params.ccqBootstrapPeers)
assert.Equal(t, ccqPort, params.ccqPort)
assert.Equal(t, ccqAllowedPeers, params.ccqAllowedPeers)

require.Equal(t, len(protectedPeers), len(params.protectedPeers))
assert.Equal(t, protectedPeers[0], params.protectedPeers[0])
assert.Equal(t, protectedPeers[1], params.protectedPeers[1])
assert.Equal(t, protectedPeers[2], params.protectedPeers[2])

require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers))
assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0])
assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1])
}
18 changes: 10 additions & 8 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,16 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.gov,
g.disableHeartbeatVerify,
g.components,
nil, //g.ibcFeaturesFunc,
false, // gateway relayer enabled
false, // ccqEnabled
nil, // signed query request channel
nil, // query response channel
"", // query bootstrap peers
0, // query port
"", // query allowed peers),
nil, //g.ibcFeaturesFunc,
false, // gateway relayer enabled
false, // ccqEnabled
nil, // signed query request channel
nil, // query response channel
"", // query bootstrap peers
0, // query port
"", // query allowed peers),
[]string{}, // protected peers
[]string{}, // ccq protected peers
))
require.NoError(t, err)

Expand Down
Loading