diff --git a/node/cmd/ccq/p2p.go b/node/cmd/ccq/p2p.go index f7871e06e7..617ec1485a 100644 --- a/node/cmd/ccq/p2p.go +++ b/node/cmd/ccq/p2p.go @@ -17,6 +17,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -52,6 +53,7 @@ func runP2P( monitorPeers bool, loggingMap *LoggingMap, gossipAdvertiseAddress string, + protectedPeers []string, ) (*P2PSub, error) { // p2p setup components := p2p.DefaultComponents() @@ -63,6 +65,12 @@ func runP2P( return nil, err } + if len(protectedPeers) != 0 { + for _, peerId := range protectedPeers { + components.ConnMgr.Protect(peer.ID(peerId), "configured") + } + } + topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req") topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp") diff --git a/node/cmd/ccq/query_server.go b/node/cmd/ccq/query_server.go index 023c39721c..b50ec65157 100644 --- a/node/cmd/ccq/query_server.go +++ b/node/cmd/ccq/query_server.go @@ -32,6 +32,7 @@ var ( p2pNetworkID *string p2pPort *uint p2pBootstrap *string + protectedPeers []string listenAddr *string nodeKeyPath *string signerKeyPath *string @@ -57,6 +58,7 @@ func init() { p2pNetworkID = QueryServerCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)") p2pPort = QueryServerCmd.Flags().Uint("port", 8995, "P2P UDP listener port") p2pBootstrap = QueryServerCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)") + QueryServerCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "") nodeKeyPath = QueryServerCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)") signerKeyPath = QueryServerCmd.Flags().String("signerKey", "", "Path to key used to sign unsigned queries") listenAddr = QueryServerCmd.Flags().String("listenAddr", "[::]:6069", "Listen address for query server (disabled if blank)") @@ -204,7 +206,7 @@ func runQueryServer(cmd *cobra.Command, args []string) { // Run p2p pendingResponses := NewPendingResponses(logger) - p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress) + p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap, *gossipAdvertiseAddress, protectedPeers) if err != nil { logger.Fatal("Failed to start p2p", zap.Error(err)) } diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index d696ed4f9d..1290c92503 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -50,9 +50,10 @@ import ( ) var ( - p2pNetworkID *string - p2pPort *uint - p2pBootstrap *string + p2pNetworkID *string + p2pPort *uint + p2pBootstrap *string + protectedPeers []string nodeKeyPath *string @@ -262,6 +263,7 @@ var ( ccqAllowedRequesters *string ccqP2pPort *uint ccqP2pBootstrap *string + ccqProtectedPeers []string ccqAllowedPeers *string ccqBackfillCache *bool @@ -282,6 +284,7 @@ func init() { p2pNetworkID = NodeCmd.Flags().String("network", "", "P2P network identifier (optional, overrides default for environment)") p2pPort = NodeCmd.Flags().Uint("port", p2p.DefaultPort, "P2P UDP listener port") p2pBootstrap = NodeCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)") + NodeCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "") statusAddr = NodeCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)") @@ -491,6 +494,7 @@ func init() { ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries") ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port") ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (optional for mainnet or testnet, overrides default, required for unsafeDevMode)") + NodeCmd.Flags().StringSliceVarP(&ccqProtectedPeers, "ccqProtectedPeers", "", []string{}, "") ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)") ccqBackfillCache = NodeCmd.Flags().Bool("ccqBackfillCache", true, "Should EVM chains backfill CCQ timestamp cache on startup") gossipAdvertiseAddress = NodeCmd.Flags().String("gossipAdvertiseAddress", "", "External IP to advertize on Guardian and CCQ p2p (use if behind a NAT or running in k8s)") @@ -1777,7 +1781,7 @@ func runNode(cmd *cobra.Command, args []string) { node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn), node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters), node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap), - node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures), + node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures, protectedPeers, ccqProtectedPeers), node.GuardianOptionStatusServer(*statusAddr), node.GuardianOptionProcessor(*p2pNetworkID), } diff --git a/node/cmd/spy/spy.go b/node/cmd/spy/spy.go index 0cb6378ada..2f15e870ab 100644 --- a/node/cmd/spy/spy.go +++ b/node/cmd/spy/spy.go @@ -36,9 +36,10 @@ var ( var ( envStr *string - p2pNetworkID *string - p2pPort *uint - p2pBootstrap *string + p2pNetworkID *string + p2pPort *uint + p2pBootstrap *string + protectedPeers []string statusAddr *string @@ -59,6 +60,7 @@ func init() { p2pNetworkID = SpyCmd.Flags().String("network", "", "P2P network identifier (optional for testnet or mainnet, overrides default, required for devnet)") p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port") p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (optional for testnet or mainnet, overrides default, required for devnet)") + SpyCmd.Flags().StringSliceVarP(&protectedPeers, "protectedPeers", "", []string{}, "") statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)") @@ -396,6 +398,7 @@ func runSpy(cmd *cobra.Command, args []string) { rootCtxCancel, p2p.WithSignedVAAListener(signedInC), p2p.WithComponents(components), + p2p.WithProtectedPeers(protectedPeers), ) if err != nil { return err diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index 1678544057..64d19f556b 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -190,7 +190,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui GuardianOptionNoAccountant(), // disable accountant GuardianOptionGovernor(true, false, ""), GuardianOptionGatewayRelayer("", nil), // disable gateway relayer - GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }), + GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }, []string{}, []string{}), GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail), GuardianOptionPublicrpcTcpService(cfg.publicRpc, publicRpcLogDetail), GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""), diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 67602955af..ec58cf03f5 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -52,6 +52,8 @@ func GuardianOptionP2P( ccqAllowedPeers string, gossipAdvertiseAddress string, ibcFeaturesFunc func() string, + protectedPeers []string, + ccqProtectedPeers []string, ) *GuardianOption { return &GuardianOption{ name: "p2p", @@ -101,7 +103,10 @@ func GuardianOptionP2P( g.queryResponsePublicationC.readC, ccqBootstrapPeers, ccqPort, - ccqAllowedPeers), + ccqAllowedPeers, + protectedPeers, + ccqProtectedPeers, + ), p2p.WithProcessorFeaturesFunc(processor.GetFeatures), ) if err != nil { diff --git a/node/pkg/p2p/ccq_p2p.go b/node/pkg/p2p/ccq_p2p.go index 8a2d84e376..349323d16d 100644 --- a/node/pkg/p2p/ccq_p2p.go +++ b/node/pkg/p2p/ccq_p2p.go @@ -76,6 +76,7 @@ func (ccq *ccqP2p) run( port uint, signedQueryReqC chan<- *gossipv1.SignedQueryRequest, queryResponseReadC <-chan *query.QueryResponsePublication, + protectedPeers []string, errC chan error, ) error { networkID := p2pNetworkID + "/ccq" @@ -95,6 +96,12 @@ func (ccq *ccqP2p) run( return fmt.Errorf("failed to create p2p: %w", err) } + if len(protectedPeers) != 0 { + for _, peerId := range protectedPeers { + components.ConnMgr.Protect(peer.ID(peerId), "configured") + } + } + // Build a map of bootstrap peers so we can always allow subscribe requests from them. bootstrapPeersMap := map[string]struct{}{} bootstrappers, _ := BootstrapAddrs(ccq.logger, bootstrapPeers, ccq.h.ID()) diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 79ca1fcaaa..dda8d8acfb 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -335,6 +335,13 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() + if len(params.protectedPeers) != 0 { + for _, peerId := range params.protectedPeers { + logger.Info("protecting peer", zap.String("peerId", peerId)) + params.components.ConnMgr.Protect(peer.ID(peerId), "configured") + } + } + nodeIdBytes, err := h.ID().Marshal() if err != nil { panic(err) @@ -462,7 +469,7 @@ func Run(params *RunParams) func(ctx context.Context) error { if params.ccqEnabled { ccqErrC := make(chan error) ccq := newCcqRunP2p(logger, params.ccqAllowedPeers, params.components) - if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, ccqErrC); err != nil { + if err := ccq.run(ctx, params.priv, params.guardianSigner, params.networkID, params.ccqBootstrapPeers, params.ccqPort, params.signedQueryReqC, params.queryResponseReadC, params.ccqProtectedPeers, ccqErrC); err != nil { return fmt.Errorf("failed to start p2p for CCQ: %w", err) } defer ccq.close() diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index 3876471ec7..7c87e42537 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -60,6 +60,8 @@ type ( ccqBootstrapPeers string ccqPort uint ccqAllowedPeers string + protectedPeers []string + ccqProtectedPeers []string } // RunOpt is used to specify optional parameters. @@ -162,6 +164,22 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt { } } +// WithProtectedPeers is used to set the protected peers. +func WithProtectedPeers(protectedPeers []string) RunOpt { + return func(p *RunParams) error { + p.protectedPeers = protectedPeers + return nil + } +} + +// WithCcqProtectedPeers is used to set the protected peers for CCQ. +func WithCcqProtectedPeers(ccqProtectedPeers []string) RunOpt { + return func(p *RunParams) error { + p.ccqProtectedPeers = ccqProtectedPeers + return nil + } +} + // WithGuardianOptions is used to set options that are only meaningful to the guardian. func WithGuardianOptions( nodeName string, @@ -185,6 +203,8 @@ func WithGuardianOptions( ccqBootstrapPeers string, ccqPort uint, ccqAllowedPeers string, + protectedPeers []string, + ccqProtectedPeers []string, ) RunOpt { return func(p *RunParams) error { p.nodeName = nodeName @@ -208,6 +228,8 @@ func WithGuardianOptions( p.ccqBootstrapPeers = ccqBootstrapPeers p.ccqPort = ccqPort p.ccqAllowedPeers = ccqAllowedPeers + p.protectedPeers = protectedPeers + p.ccqProtectedPeers = ccqProtectedPeers return nil } } diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go index 103470f541..fa25864637 100644 --- a/node/pkg/p2p/run_params_test.go +++ b/node/pkg/p2p/run_params_test.go @@ -127,6 +127,57 @@ func TestRunParamsWithDisableHeartbeatVerify(t *testing.T) { assert.True(t, params.disableHeartbeatVerify) } +func TestRunParamsWithProtectedPeers(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + protectedPeers := []string{"peer1", "peer2", "peer3"} + params, err := NewRunParams( + bootstrapPeers, + networkId, + priv, + gst, + rootCtxCancel, + WithProtectedPeers(protectedPeers), + ) + + require.NoError(t, err) + require.NotNil(t, params) + + require.Equal(t, len(protectedPeers), len(params.protectedPeers)) + assert.Equal(t, protectedPeers[0], params.protectedPeers[0]) + assert.Equal(t, protectedPeers[1], params.protectedPeers[1]) + assert.Equal(t, protectedPeers[2], params.protectedPeers[2]) +} + +func TestRunParamsWithCcqProtectedPeers(t *testing.T) { + priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) + require.NoError(t, err) + gst := common.NewGuardianSetState(nil) + _, rootCtxCancel := context.WithCancel(context.Background()) + defer rootCtxCancel() + + ccqProtectedPeers := []string{"peerA", "peerB"} + params, err := NewRunParams( + bootstrapPeers, + networkId, + priv, + gst, + rootCtxCancel, + WithCcqProtectedPeers(ccqProtectedPeers), + ) + + require.NoError(t, err) + require.NotNil(t, params) + + require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers)) + assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0]) + assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1]) +} + func TestRunParamsWithGuardianOptions(t *testing.T) { priv, _, err := p2pcrypto.GenerateKeyPair(p2pcrypto.Ed25519, -1) require.NoError(t, err) @@ -159,6 +210,8 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { ccqBootstrapPeers := "some bootstrap string" ccqPort := uint(4242) ccqAllowedPeers := "some allowed peers" + protectedPeers := []string{"peer1", "peer2", "peer3"} + ccqProtectedPeers := []string{"peerA", "peerB"} params, err := NewRunParams( bootstrapPeers, @@ -187,7 +240,10 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { queryResponseReadC, ccqBootstrapPeers, ccqPort, - ccqAllowedPeers), + ccqAllowedPeers, + protectedPeers, + ccqProtectedPeers, + ), ) require.NoError(t, err) @@ -210,4 +266,13 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { assert.Equal(t, ccqBootstrapPeers, params.ccqBootstrapPeers) assert.Equal(t, ccqPort, params.ccqPort) assert.Equal(t, ccqAllowedPeers, params.ccqAllowedPeers) + + require.Equal(t, len(protectedPeers), len(params.protectedPeers)) + assert.Equal(t, protectedPeers[0], params.protectedPeers[0]) + assert.Equal(t, protectedPeers[1], params.protectedPeers[1]) + assert.Equal(t, protectedPeers[2], params.protectedPeers[2]) + + require.Equal(t, len(ccqProtectedPeers), len(params.ccqProtectedPeers)) + assert.Equal(t, ccqProtectedPeers[0], params.ccqProtectedPeers[0]) + assert.Equal(t, ccqProtectedPeers[1], params.ccqProtectedPeers[1]) } diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index aa5365dd42..1a9c1951e4 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -190,14 +190,16 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) { g.gov, g.disableHeartbeatVerify, g.components, - nil, //g.ibcFeaturesFunc, - false, // gateway relayer enabled - false, // ccqEnabled - nil, // signed query request channel - nil, // query response channel - "", // query bootstrap peers - 0, // query port - "", // query allowed peers), + nil, //g.ibcFeaturesFunc, + false, // gateway relayer enabled + false, // ccqEnabled + nil, // signed query request channel + nil, // query response channel + "", // query bootstrap peers + 0, // query port + "", // query allowed peers), + []string{}, // protected peers + []string{}, // ccq protected peers )) require.NoError(t, err)