Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Observation batching with override #4066

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
node.GuardianOptionStatusServer(*statusAddr),
node.GuardianOptionProcessor(),
node.GuardianOptionProcessor(*p2pNetworkID),
}

if shouldStart(publicGRPCSocketPath) {
Expand Down
8 changes: 8 additions & 0 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
inboundObservationBufferSize = 10000

// inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians.
// Since a batch contains many observations, the guardians should not be publishing too many of these. With 19 guardians, we would expect 19 messages
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
inboundBatchObservationBufferSize = 1000

// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
// So in the worst case the entire queue can be processed in 2s.
Expand Down Expand Up @@ -81,6 +86,8 @@ type G struct {
gossipVaaSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Inbound observation batches.
batchObsvC channelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]]
// Finalized guardian observations aggregated across all chains
msgC channelPair[*common.MessagePublication]
// Ethereum incoming guardian set updates
Expand Down Expand Up @@ -121,6 +128,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap),
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)),
GuardianOptionProcessor(),
GuardianOptionProcessor(networkID),
}

guardianNode := NewGuardianNode(
Expand Down
5 changes: 4 additions & 1 deletion node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func GuardianOptionP2P(
nodeName,
g.gk,
g.obsvC,
g.batchObsvC.writeC,
signedInC,
g.obsvReqC.writeC,
g.gossipControlSendC,
Expand Down Expand Up @@ -573,7 +574,7 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption {

// GuardianOptionProcessor enables the default processor, which is required to make consensus on messages.
// Dependencies: db, governor, accountant
func GuardianOptionProcessor() *GuardianOption {
func GuardianOptionProcessor(networkId string) *GuardianOption {
return &GuardianOption{
name: "processor",
// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
Expand All @@ -588,6 +589,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvC,
g.batchObsvC.readC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
g.gk,
Expand All @@ -596,6 +598,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.acct,
g.acctC.readC,
g.gatewayRelayer,
networkId,
).Run

return nil
Expand Down
40 changes: 31 additions & 9 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ const P2P_SUBSCRIPTION_BUFFER_SIZE = 1024
// TESTNET_BOOTSTRAP_DHI configures how many nodes may connect to the testnet bootstrap node. This number should not exceed HighWaterMark.
const TESTNET_BOOTSTRAP_DHI = 350

// MaxObservationBatchSize is the maximum number of observations that will fit in a single `SignedObservationBatch` message.
const MaxObservationBatchSize = 4000

// MaxObservationBatchDelay is the longest we will wait before publishing any queued up observations.
const MaxObservationBatchDelay = time.Second

var (
p2pHeartbeatsSent = promauto.NewCounter(
prometheus.CounterOpts{
Expand All @@ -74,11 +80,6 @@ var (
Name: "wormhole_p2p_drops",
Help: "Total number of messages that were dropped by libp2p",
})
p2pReject = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_rejects",
Help: "Total number of messages rejected by libp2p",
})
)

var heartbeatMessagePrefix = []byte("heartbeat|")
Expand Down Expand Up @@ -173,8 +174,6 @@ func (*traceHandler) Trace(evt *libp2ppb.TraceEvent) {
if evt.Type != nil {
if *evt.Type == libp2ppb.TraceEvent_DROP_RPC {
p2pDrop.Inc()
} else if *evt.Type == libp2ppb.TraceEvent_REJECT_MESSAGE {
p2pReject.Inc()
}
}
}
Expand Down Expand Up @@ -306,6 +305,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pMessagesSent.WithLabelValues("attestation").Add(0)
p2pMessagesSent.WithLabelValues("vaa").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("observation").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Add(0)

Expand Down Expand Up @@ -397,7 +397,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

// Set up the attestation channel. ////////////////////////////////////////////////////////////////////
if params.gossipAttestationSendC != nil || params.obsvRecvC != nil {
if params.gossipAttestationSendC != nil || params.obsvRecvC != nil || params.batchObsvRecvC != nil {
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation")
logger.Info("joining the attestation topic", zap.String("topic", attestationTopic))
attestationPubsubTopic, err = ps.Join(attestationTopic)
Expand All @@ -411,7 +411,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvRecvC != nil {
if params.obsvRecvC != nil || params.batchObsvRecvC != nil {
logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic))
attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand Down Expand Up @@ -918,6 +918,17 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch:
if params.batchObsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservationBatch, params.batchObsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because batchObsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
}
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type on attestation topic (running outdated software?)",
Expand Down Expand Up @@ -1039,6 +1050,17 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch: // TODO: Get rid of this after the cutover.
if params.batchObsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservationBatch, params.batchObsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because obsvRecvC is full")
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
if params.signedIncomingVaaRecvC != nil {
select {
Expand Down
23 changes: 18 additions & 5 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type (
// obsvRecvC is optional and can be set with `WithSignedObservationListener`.
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvRecvC is optional and can be set with `WithSignedObservationBatchListener`.
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

// obsvReqRecvC is optional and can be set with `WithObservationRequestListener`.
obsvReqRecvC chan<- *gossipv1.ObservationRequest

Expand Down Expand Up @@ -97,39 +100,47 @@ func NewRunParams(
return p, nil
}

// WithSignedObservationListener is used to set the channel to receive `SignedObservation messages.
// WithSignedObservationListener is used to set the channel to receive `SignedObservation` messages.
func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvRecvC = obsvRecvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages.
// WithSignedObservationBatchListener is used to set the channel to receive `SignedObservationBatch` messages.
func WithSignedObservationBatchListener(batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]) RunOpt {
return func(p *RunParams) error {
p.batchObsvRecvC = batchObsvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages`.
func WithSignedVAAListener(signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
return func(p *RunParams) error {
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
return nil
}
}

// WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages.
// WithObservationRequestListener is used to set the channel to receive `ObservationRequest` messages.
func WithObservationRequestListener(obsvReqRecvC chan<- *gossipv1.ObservationRequest) RunOpt {
return func(p *RunParams) error {
p.obsvReqRecvC = obsvReqRecvC
return nil
}
}

// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages.
// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig` messages.
func WithChainGovernorConfigListener(signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig) RunOpt {
return func(p *RunParams) error {
p.signedGovCfgRecvC = signedGovCfgRecvC
return nil
}
}

// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages.
// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus` messages.
func WithChainGovernorStatusListener(signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus) RunOpt {
return func(p *RunParams) error {
p.signedGovStatusRecvC = signedGovStatusRecvC
Expand All @@ -150,6 +161,7 @@ func WithGuardianOptions(
nodeName string,
gk *ecdsa.PrivateKey,
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqRecvC chan<- *gossipv1.ObservationRequest,
gossipControlSendC chan []byte,
Expand All @@ -173,6 +185,7 @@ func WithGuardianOptions(
p.nodeName = nodeName
p.gk = gk
p.obsvRecvC = obsvRecvC
p.batchObsvRecvC = batchObsvRecvC
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
p.obsvReqRecvC = obsvReqRecvC
p.gossipControlSendC = gossipControlSendC
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NotNil(t, gk)

obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
batchObsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 42)
signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
gossipControlSendC := make(chan []byte, 42)
Expand Down Expand Up @@ -172,6 +173,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
nodeName,
gk,
obsvC,
batchObsvC,
signedInC,
obsvReqC,
gossipControlSendC,
Expand Down
3 changes: 3 additions & 0 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const LOCAL_P2P_PORTRANGE_START = 11000
type G struct {
// arguments passed to p2p.New
obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
batchObsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
obsvReqC chan *gossipv1.ObservationRequest
obsvReqSendC chan *gossipv1.ObservationRequest
controlSendC chan []byte
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewG(t *testing.T, nodeName string) *G {

g := &G{
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
batchObsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs),
controlSendC: make(chan []byte, cs),
Expand Down Expand Up @@ -182,6 +184,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.nodeName,
g.gk,
g.obsvC,
g.batchObsvC,
g.signedInC,
g.obsvReqC,
g.controlSendC,
Expand Down
87 changes: 87 additions & 0 deletions node/pkg/processor/batch_obs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package processor

import (
"context"
"errors"
"fmt"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"google.golang.org/protobuf/proto"
)

// postObservationToBatch posts an individual observation to the batch processor.
func (p *Processor) postObservationToBatch(obs *gossipv1.Observation) {
select {
case p.batchObsvPubC <- obs:
default:
batchObservationChannelOverflow.WithLabelValues("batchObsvPub").Inc()
}
}

// batchProcessor is the entry point for the batch processor, which is responsible for taking individual
// observations and publishing them as batches. It limits the size of a batch and the delay before publishing.
func (p *Processor) batchProcessor(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
if err := p.handleBatch(ctx); err != nil {
return err
}
}
}
}

// handleBatch reads observations from the channel, either until a timeout occurs or the batch is full.
// Then it builds a `SendObservationBatch` gossip message and posts it to p2p.
func (p *Processor) handleBatch(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, p2p.MaxObservationBatchDelay)
defer cancel()

observations, err := common.ReadFromChannelWithTimeout(ctx, p.batchObsvPubC, p2p.MaxObservationBatchSize)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to read observations from the internal observation batch channel: %w", err)
}

if len(observations) != 0 {
_ = p.publishBatch(observations)
}

return nil
}

// publishBatch formats the set of observations into a gossip message, publishes it, and returns the message bytes.
func (p *Processor) publishBatch(observations []*gossipv1.Observation) []byte {
batchMsg := gossipv1.SignedObservationBatch{
Addr: p.ourAddr.Bytes(),
Observations: observations,
}

gossipMsg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservationBatch{SignedObservationBatch: &batchMsg}}
msg, err := proto.Marshal(&gossipMsg)
if err != nil {
panic(err)
}

select {
case p.gossipAttestationSendC <- msg:
default:
batchObservationChannelOverflow.WithLabelValues("gossipSend").Inc()
}

return msg
}

// shouldPublishImmediately returns true if the observation should be published immediately, rather than waiting for the next batch.
func (p *Processor) shouldPublishImmediately(v *vaa.VAA) bool {
return v.EmitterChain == vaa.ChainIDPythNet
}

// publishImmediately formats a single observation into a `SignedObservationBatch` gossip message and publishes it. It the returns the message bytes.
func (p *Processor) publishImmediately(obs *gossipv1.Observation) []byte {
return p.publishBatch([]*gossipv1.Observation{obs})
}
Loading
Loading