diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 42248b8785..c99476648c 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -65,6 +65,7 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers, nodeName, g.gk, g.obsvC, + g.batchObsvC.writeC, g.signedInC.writeC, g.obsvReqC.writeC, g.gossipSendC, diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index d943e9191a..120f3da8e5 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -350,7 +350,6 @@ func Run(params *RunParams) func(ctx context.Context) error { pubsub.WithEventTracer(ourTracer), // TODO: Investigate making this change. May need to use LaxSign until everyone has upgraded to that. // pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), - // pubsub.WithPeerOutboundQueueSize(1000000), ) if err != nil { panic(err) @@ -688,11 +687,11 @@ func Run(params *RunParams) func(ctx context.Context) error { } } case *gossipv1.GossipMessage_SignedObservationBatch: - if batchObsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, batchObsvC); err == nil { + if params.batchObsvC != nil { + if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, params.batchObsvC); err == nil { p2pMessagesReceived.WithLabelValues("batch_observation").Inc() } else { - if components.WarnChannelOverflow { + if params.components.WarnChannelOverflow { logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr))) } p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc() diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index 59652f21ca..503d4d0829 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -26,6 +26,9 @@ type ( // obsvC is optional and can be set with `WithSignedObservationListener`. obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] + // batchObsvC is optional and can be set with `WithSignedObservationBatchListener`. + batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch] + // obsvReqC is optional and can be set with `WithObservationRequestListener`. obsvReqC chan<- *gossipv1.ObservationRequest @@ -95,7 +98,7 @@ func NewRunParams( return p, nil } -// WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages. +// WithSignedObservationListener is used to set the channel to receive `SignedObservation` messages. func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { return func(p *RunParams) error { p.obsvC = obsvC @@ -103,6 +106,14 @@ func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv } } +// WithSignedObservationBatchListener is used to set the channel to receive `SignedObservationBatch` messages. +func WithSignedObservationBatchListener(batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]) RunOpt { + return func(p *RunParams) error { + p.batchObsvC = batchObsvC + return nil + } +} + // WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages. func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { return func(p *RunParams) error { @@ -148,6 +159,7 @@ func WithGuardianOptions( nodeName string, gk *ecdsa.PrivateKey, obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], + batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], signedInC chan<- *gossipv1.SignedVAAWithQuorum, obsvReqC chan<- *gossipv1.ObservationRequest, gossipSendC chan []byte, @@ -169,6 +181,7 @@ func WithGuardianOptions( p.nodeName = nodeName p.gk = gk p.obsvC = obsvC + p.batchObsvC = batchObsvC p.signedInC = signedInC p.obsvReqC = obsvReqC p.gossipSendC = gossipSendC diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go index aebebd952c..7cf139c286 100644 --- a/node/pkg/p2p/run_params_test.go +++ b/node/pkg/p2p/run_params_test.go @@ -141,6 +141,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { require.NotNil(t, gk) obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42) + batchObsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 42) signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42) obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42) gossipSendC := make(chan []byte, 42) @@ -170,6 +171,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { nodeName, gk, obsvC, + batchObsvC, signedInC, obsvReqC, gossipSendC, diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index 05430e17fe..1e196ca7e1 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -178,6 +178,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) { g.nodeName, g.gk, g.obsvC, + g.batchObsvC, g.signedInC, g.obsvReqC, g.sendC,