From 31fa91c4b6235beb7ad6e55d9387372b029b2719 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 17 Jul 2024 12:08:01 -0400 Subject: [PATCH 01/16] e2e test: extend p2p txn with txn exchange --- test/e2e-go/features/p2p/p2p_basic_test.go | 79 ++++++++++++++++++++-- 1 file changed, 73 insertions(+), 6 deletions(-) diff --git a/test/e2e-go/features/p2p/p2p_basic_test.go b/test/e2e-go/features/p2p/p2p_basic_test.go index 6f3e8aae47..5ee2f034ae 100644 --- a/test/e2e-go/features/p2p/p2p_basic_test.go +++ b/test/e2e-go/features/p2p/p2p_basic_test.go @@ -17,6 +17,7 @@ package p2p import ( + "crypto/rand" "path/filepath" "testing" "time" @@ -28,7 +29,7 @@ import ( "github.com/stretchr/testify/require" ) -func testP2PWithConfig(t *testing.T, cfgname string) { +func testP2PWithConfig(t *testing.T, templateName string) *fixtures.RestClientFixture { r := require.New(fixtures.SynchronizedTest(t)) var fixture fixtures.RestClientFixture @@ -42,22 +43,88 @@ func testP2PWithConfig(t *testing.T, cfgname string) { consensus[protocol.ConsensusCurrentVersion] = fastProtocol fixture.SetConsensus(consensus) - fixture.Setup(t, filepath.Join("nettemplates", cfgname)) - defer fixture.ShutdownImpl(true) // preserve logs in testdir - + fixture.Setup(t, filepath.Join("nettemplates", templateName)) _, err := fixture.NC.AlgodClient() r.NoError(err) err = fixture.WaitForRound(10, 30*time.Second) r.NoError(err) + + return &fixture } func TestP2PTwoNodes(t *testing.T) { partitiontest.PartitionTest(t) - testP2PWithConfig(t, "TwoNodes50EachP2P.json") + fixture := testP2PWithConfig(t, "TwoNodes50EachP2P.json") + defer fixture.Shutdown() + + // ensure transaction propagation on both directions + pingClient := fixture.LibGoalClient + pingAccountList, err := fixture.GetWalletsSortedByBalance() + require.NoError(t, err) + pingAccount := pingAccountList[0].Address + + pongClient := fixture.GetLibGoalClientForNamedNode("Node") + pongAccounts, err := fixture.GetNodeWalletsSortedByBalance(pongClient) + require.NoError(t, err) + pongAccount := pongAccounts[0].Address + + pingBalance, err := pingClient.GetBalance(pingAccount) + require.NoError(t, err) + pongBalance, err := pingClient.GetBalance(pongAccount) + require.NoError(t, err) + + require.Equal(t, pingBalance, pongBalance) + + expectedPingBalance := pingBalance + expectedPongBalance := pongBalance + + minTxnFee, minAcctBalance, err := fixture.CurrentMinFeeAndBalance() + require.NoError(t, err) + + transactionFee := minTxnFee + 5 + amountPongSendsPing := minAcctBalance + amountPingSendsPong := minAcctBalance * 3 / 2 + + pongTxidsToAddresses := make(map[string]string) + pingTxidsToAddresses := make(map[string]string) + + randNote := func(tb testing.TB) []byte { + b := make([]byte, 8) + _, err := rand.Read(b) + require.NoError(tb, err) + return b + } + + for i := 0; i < 5; i++ { + pongTx, err := pongClient.SendPaymentFromUnencryptedWallet(pongAccount, pingAccount, transactionFee, amountPongSendsPing, randNote(t)) + pongTxidsToAddresses[pongTx.ID().String()] = pongAccount + require.NoError(t, err) + pingTx, err := pingClient.SendPaymentFromUnencryptedWallet(pingAccount, pongAccount, transactionFee, amountPingSendsPong, randNote(t)) + pingTxidsToAddresses[pingTx.ID().String()] = pingAccount + require.NoError(t, err) + expectedPingBalance = expectedPingBalance - transactionFee - amountPingSendsPong + amountPongSendsPing + expectedPongBalance = expectedPongBalance - transactionFee - amountPongSendsPing + amountPingSendsPong + } + curStatus, _ := pongClient.Status() + curRound := curStatus.LastRound + + fixture.AlgodClient = fixture.GetAlgodClientForController(fixture.GetNodeControllerForDataDir(pongClient.DataDir())) + confirmed := fixture.WaitForAllTxnsToConfirm(curRound+uint64(5), pingTxidsToAddresses) + require.True(t, confirmed, "failed to see confirmed ping transaction by round %v", curRound+uint64(5)) + confirmed = fixture.WaitForAllTxnsToConfirm(curRound+uint64(5), pongTxidsToAddresses) + require.True(t, confirmed, "failed to see confirmed pong transaction by round %v", curRound+uint64(5)) + + pingBalance, err = pongClient.GetBalance(pingAccount) + require.NoError(t, err) + pongBalance, err = pongClient.GetBalance(pongAccount) + require.NoError(t, err) + require.True(t, expectedPingBalance <= pingBalance, "ping balance is different than expected.") + require.True(t, expectedPongBalance <= pongBalance, "pong balance is different than expected.") } func TestP2PFiveNodes(t *testing.T) { partitiontest.PartitionTest(t) - testP2PWithConfig(t, "FiveNodesP2P.json") + fixture := testP2PWithConfig(t, "FiveNodesP2P.json") + defer fixture.Shutdown() } From 291608357af9c93520e71a3180676d7cd693e8f7 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 15 Jul 2024 17:29:29 -0400 Subject: [PATCH 02/16] WIP: p2p: handle txns in pubsub validator --- data/txHandler.go | 115 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 88 insertions(+), 27 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index eae9586c47..907d46831c 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -132,6 +132,10 @@ type TxHandler struct { erl *util.ElasticRateLimiter appLimiter *appRateLimiter appLimiterBacklogThreshold int + + batchProcessor execpool.BatchProcessor + streamVerifierDropped2 chan *verify.UnverifiedTxnSigJob + postVerificationQueue2 chan *verify.VerificationResult } // TxHandlerOpts is TxHandler configuration options @@ -173,6 +177,9 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { net: opts.Net, streamVerifierChan: make(chan execpool.InputJob), streamVerifierDropped: make(chan *verify.UnverifiedTxnSigJob), + + postVerificationQueue2: make(chan *verify.VerificationResult, 1), + streamVerifierDropped2: make(chan *verify.UnverifiedTxnSigJob, 1), } if opts.Config.TxFilterRawMsgEnabled() { @@ -209,6 +216,14 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { } } + // prepare the batch processor for pubsub synchronous verification + var err0 error + handler.batchProcessor, err0 = verify.MakeSigVerifyJobProcessor(handler.ledger, handler.ledger.VerifiedTransactionCache(), + handler.postVerificationQueue2, handler.streamVerifierDropped2) + if err0 != nil { + return nil, err0 + } + // prepare the transaction stream verifier var err error txnElementProcessor, err := verify.MakeSigVerifyJobProcessor(handler.ledger, handler.ledger.VerifiedTransactionCache(), @@ -246,6 +261,7 @@ func (handler *TxHandler) Start() { }) // libp2p pubsub validator and handler abstracted as TaggedMessageProcessor + // TODO: rename to validators handler.net.RegisterProcessors([]network.TaggedMessageProcessor{ { Tag: protocol.TxnTag, @@ -348,7 +364,7 @@ func (handler *TxHandler) backlogWorker() { } continue } - // handler.streamVerifierChan does not receive if ctx is cancled + // handler.streamVerifierChan does not receive if ctx is cancelled select { case handler.streamVerifierChan <- &verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}: case <-handler.ctx.Done(): @@ -799,37 +815,82 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil} } - return network.ValidatedMessage{ - Action: network.Accept, - Tag: rawmsg.Tag, - ValidatedMessage: &validatedIncomingTxMessage{ - rawmsg: rawmsg, - unverifiedTxGroup: unverifiedTxGroup, - msgKey: msgKey, - canonicalKey: canonicalKey, - }, + // apply backlog worker logic + + wi := &txBacklogMsg{ + rawmsg: &rawmsg, + unverifiedTxGroup: unverifiedTxGroup, + rawmsgDataHash: msgKey, + unverifiedTxGroupHash: canonicalKey, + capguard: nil, } -} -// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork. -func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage { - msg := validatedMessage.ValidatedMessage.(*validatedIncomingTxMessage) + if handler.checkAlreadyCommitted(wi) { + transactionMessagesAlreadyCommitted.Inc(nil) + return network.ValidatedMessage{ + Action: network.Ignore, + ValidatedMessage: nil, + } + } + + jobs := []execpool.InputJob{&verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}} + handler.batchProcessor.ProcessBatch(jobs) + select { - case handler.backlogQueue <- &txBacklogMsg{ - rawmsg: &msg.rawmsg, - unverifiedTxGroup: msg.unverifiedTxGroup, - rawmsgDataHash: msg.msgKey, - unverifiedTxGroupHash: msg.canonicalKey, - capguard: nil, - }: - default: - // if we failed here we want to increase the corresponding metric. It might suggest that we - // want to increase the queue size. - transactionMessagesDroppedFromBacklog.Inc(nil) + case wi := <-handler.postVerificationQueue2: + m := wi.BacklogMessage.(*txBacklogMsg) + if wi.Err != nil { + handler.postProcessReportErrors(wi.Err) + logging.Base().Warnf("Received a malformed tx group %v: %v", m.unverifiedTxGroup, wi.Err) + return network.ValidatedMessage{ + Action: network.Disconnect, + ValidatedMessage: nil, + } + } + // at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction. + verifiedTxGroup := m.unverifiedTxGroup - // additionally, remove the txn from duplicate caches to ensure it can be re-submitted - handler.deleteFromCaches(msg.msgKey, msg.canonicalKey) + // save the transaction, if it has high enough fee and not already in the cache + err := handler.txPool.Remember(verifiedTxGroup) + if err != nil { + handler.rememberReportErrors(err) + logging.Base().Debugf("could not remember tx: %v", err) + return network.ValidatedMessage{ + Action: network.Ignore, + ValidatedMessage: nil, + } + } + + transactionMessagesRemember.Inc(nil) + + // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. + err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) + if err != nil { + logging.Base().Infof("unable to pin transaction: %v", err) + } + return network.ValidatedMessage{ + Action: network.Accept, + ValidatedMessage: nil, + } + + case <-handler.streamVerifierDropped2: + transactionMessagesDroppedFromBacklog.Inc(nil) + return network.ValidatedMessage{ + Action: network.Ignore, + ValidatedMessage: nil, + } + case <-handler.ctx.Done(): + transactionMessagesDroppedFromBacklog.Inc(nil) + return network.ValidatedMessage{ + Action: network.Ignore, + ValidatedMessage: nil, + } } +} + +// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork. +func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage { + // process is noop, all work is done in validateIncomingTxMessage above return network.OutgoingMessage{Action: network.Ignore} } From 355d62e0a0c9d1fae53fdd27d7458dc45454c338 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 17 Jul 2024 12:31:29 -0400 Subject: [PATCH 03/16] Remove dead code --- components/mocks/mockNetwork.go | 4 +-- data/txHandler.go | 59 +++++++++++---------------------- network/gossipNode.go | 45 +++++++++---------------- network/hybridNetwork.go | 8 ++--- network/multiplexer.go | 22 ++++-------- network/p2pNetwork.go | 16 +++------ network/p2pNetwork_test.go | 54 ++++++++++++------------------ network/wsNetwork.go | 4 +-- 8 files changed, 75 insertions(+), 137 deletions(-) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 47b1a5b5e4..fb665eb459 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -91,8 +91,8 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan func (network *MockNetwork) ClearHandlers() { } -// RegisterProcessors - empty implementation. -func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) { +// RegisterValidatorHandlers - empty implementation. +func (network *MockNetwork) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) { } // ClearProcessors - empty implementation diff --git a/data/txHandler.go b/data/txHandler.go index 907d46831c..d5c09ed744 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -262,16 +262,14 @@ func (handler *TxHandler) Start() { // libp2p pubsub validator and handler abstracted as TaggedMessageProcessor // TODO: rename to validators - handler.net.RegisterProcessors([]network.TaggedMessageProcessor{ + handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{ { Tag: protocol.TxnTag, // create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface MessageHandler: struct { - network.ProcessorValidateFunc - network.ProcessorHandleFunc + network.ValidateHandleFunc }{ - network.ProcessorValidateFunc(handler.validateIncomingTxMessage), - network.ProcessorHandleFunc(handler.processIncomingTxMessage), + network.ValidateHandleFunc(handler.validateIncomingTxMessage), }, }, }) @@ -788,31 +786,24 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net return network.OutgoingMessage{Action: network.Ignore} } -type validatedIncomingTxMessage struct { - rawmsg network.IncomingMessage - unverifiedTxGroup []transactions.SignedTxn - msgKey *crypto.Digest - canonicalKey *crypto.Digest -} - // validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork. -func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage { +func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.OutgoingMessage { msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data) if isDup { - return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil} + return network.OutgoingMessage{Action: network.Ignore} } unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data) if invalid { // invalid encoding or exceeding txgroup, disconnect from this peer - return network.ValidatedMessage{Action: network.Disconnect, ValidatedMessage: nil} + return network.OutgoingMessage{Action: network.Disconnect} } canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) if drop { // this re-serialized txgroup was detected as a duplicate by the canonical message cache, // or it was rate-limited by the per-app rate limiter - return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil} + return network.OutgoingMessage{Action: network.Ignore} } // apply backlog worker logic @@ -827,9 +818,8 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if handler.checkAlreadyCommitted(wi) { transactionMessagesAlreadyCommitted.Inc(nil) - return network.ValidatedMessage{ - Action: network.Ignore, - ValidatedMessage: nil, + return network.OutgoingMessage{ + Action: network.Ignore, } } @@ -842,9 +832,8 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if wi.Err != nil { handler.postProcessReportErrors(wi.Err) logging.Base().Warnf("Received a malformed tx group %v: %v", m.unverifiedTxGroup, wi.Err) - return network.ValidatedMessage{ - Action: network.Disconnect, - ValidatedMessage: nil, + return network.OutgoingMessage{ + Action: network.Disconnect, } } // at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction. @@ -855,9 +844,8 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if err != nil { handler.rememberReportErrors(err) logging.Base().Debugf("could not remember tx: %v", err) - return network.ValidatedMessage{ - Action: network.Ignore, - ValidatedMessage: nil, + return network.OutgoingMessage{ + Action: network.Ignore, } } @@ -868,32 +856,23 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if err != nil { logging.Base().Infof("unable to pin transaction: %v", err) } - return network.ValidatedMessage{ - Action: network.Accept, - ValidatedMessage: nil, + return network.OutgoingMessage{ + Action: network.Accept, } case <-handler.streamVerifierDropped2: transactionMessagesDroppedFromBacklog.Inc(nil) - return network.ValidatedMessage{ - Action: network.Ignore, - ValidatedMessage: nil, + return network.OutgoingMessage{ + Action: network.Ignore, } case <-handler.ctx.Done(): transactionMessagesDroppedFromBacklog.Inc(nil) - return network.ValidatedMessage{ - Action: network.Ignore, - ValidatedMessage: nil, + return network.OutgoingMessage{ + Action: network.Ignore, } } } -// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork. -func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage { - // process is noop, all work is done in validateIncomingTxMessage above - return network.OutgoingMessage{Action: network.Ignore} -} - var errBackLogFullLocal = errors.New("backlog full") // LocalTransaction is a special shortcut handler for local transactions and intended to be used diff --git a/network/gossipNode.go b/network/gossipNode.go index 1592641f70..992b67aca8 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -80,8 +80,10 @@ type GossipNode interface { // ClearHandlers deregisters all the existing message handlers. ClearHandlers() - // RegisterProcessors adds to the set of given message processors. - RegisterProcessors(dispatch []TaggedMessageProcessor) + // RegisterValidatorHandlers adds to the set of given message validation handlers. + // A difference with regular handlers is validation ones perform synchronous validation. + // Currently used as p2p pubsub topic validators. + RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) // ClearProcessors deregisters all the existing message processors. ClearProcessors() @@ -156,14 +158,6 @@ type OutgoingMessage struct { OnRelease func() } -// ValidatedMessage is a message that has been validated and is ready to be processed. -// Think as an intermediate one between IncomingMessage and OutgoingMessage -type ValidatedMessage struct { - Action ForwardingPolicy - Tag Tag - ValidatedMessage interface{} -} - // ForwardingPolicy is an enum indicating to whom we should send a message // //msgp:ignore ForwardingPolicy @@ -202,28 +196,19 @@ func (f HandlerFunc) Handle(message IncomingMessage) OutgoingMessage { return f(message) } -// MessageProcessor takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything) +// MessageValidatorHandler takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything) // to send to the network in response. -// This is an extension of the MessageHandler that works in two stages: validate ->[result]-> handle. -type MessageProcessor interface { - Validate(message IncomingMessage) ValidatedMessage - Handle(message ValidatedMessage) OutgoingMessage +// it supposed to perform synchronous validation and return the result of the validation +// so that network knows immediately if the message should be be broadcasted or not. +type MessageValidatorHandler interface { + ValidateHandle(message IncomingMessage) OutgoingMessage } -// ProcessorValidateFunc represents an implementation of the MessageProcessor interface -type ProcessorValidateFunc func(message IncomingMessage) ValidatedMessage - -// ProcessorHandleFunc represents an implementation of the MessageProcessor interface -type ProcessorHandleFunc func(message ValidatedMessage) OutgoingMessage - -// Validate implements MessageProcessor.Validate, calling the validator with the IncomingMessage and returning the action -// and validation extra data that can be use as the handler input. -func (f ProcessorValidateFunc) Validate(message IncomingMessage) ValidatedMessage { - return f(message) -} +// ValidateHandleFunc represents an implementation of the MessageProcessor interface +type ValidateHandleFunc func(message IncomingMessage) OutgoingMessage -// Handle implements MessageProcessor.Handle calling the handler with the ValidatedMessage and returning the OutgoingMessage -func (f ProcessorHandleFunc) Handle(message ValidatedMessage) OutgoingMessage { +// ValidateHandle implements MessageValidatorHandler.ValidateHandle, calling the validator with the IncomingMessage and returning the action. +func (f ValidateHandleFunc) ValidateHandle(message IncomingMessage) OutgoingMessage { return f(message) } @@ -235,9 +220,9 @@ type taggedMessageDispatcher[T any] struct { // TaggedMessageHandler receives one type of broadcast messages type TaggedMessageHandler = taggedMessageDispatcher[MessageHandler] -// TaggedMessageProcessor receives one type of broadcast messages +// TaggedMessageValidatorHandler receives one type of broadcast messages // and performs two stage processing: validating and handling -type TaggedMessageProcessor = taggedMessageDispatcher[MessageProcessor] +type TaggedMessageValidatorHandler = taggedMessageDispatcher[MessageValidatorHandler] // Propagate is a convenience function to save typing in the common case of a message handler telling us to propagate an incoming message // "return network.Propagate(msg)" instead of "return network.OutgoingMsg{network.Broadcast, msg.Tag, msg.Data}" diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 27fc6edbb0..7eaefbc189 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -180,10 +180,10 @@ func (n *HybridP2PNetwork) ClearHandlers() { n.wsNetwork.ClearHandlers() } -// RegisterProcessors adds to the set of given message processors. -func (n *HybridP2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { - n.p2pNetwork.RegisterProcessors(dispatch) - n.wsNetwork.RegisterProcessors(dispatch) +// RegisterValidatorHandlers adds to the set of given message processors. +func (n *HybridP2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) { + n.p2pNetwork.RegisterValidatorHandlers(dispatch) + n.wsNetwork.RegisterValidatorHandlers(dispatch) } // ClearProcessors deregisters all the existing message processors. diff --git a/network/multiplexer.go b/network/multiplexer.go index dc38fba277..ba92517ddc 100644 --- a/network/multiplexer.go +++ b/network/multiplexer.go @@ -61,8 +61,8 @@ func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) { } // Retrieves the processor for the given message Tag from the processors array. -func (m *Multiplexer) getProcessor(tag Tag) (MessageProcessor, bool) { - return getHandler[MessageProcessor](&m.msgProcessors, tag) +func (m *Multiplexer) getProcessor(tag Tag) (MessageValidatorHandler, bool) { + return getHandler[MessageValidatorHandler](&m.msgProcessors, tag) } // Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. @@ -74,17 +74,9 @@ func (m *Multiplexer) Handle(msg IncomingMessage) OutgoingMessage { } // Validate is an alternative "input" side of the multiplexer. It dispatches the message to the previously defined validator. -func (m *Multiplexer) Validate(msg IncomingMessage) ValidatedMessage { +func (m *Multiplexer) Validate(msg IncomingMessage) OutgoingMessage { if handler, ok := m.getProcessor(msg.Tag); ok { - return handler.Validate(msg) - } - return ValidatedMessage{} -} - -// Process is the second step of message handling after validation. It dispatches the message to the previously defined processor. -func (m *Multiplexer) Process(msg ValidatedMessage) OutgoingMessage { - if handler, ok := m.getProcessor(msg.Tag); ok { - return handler.Handle(msg) + return handler.ValidateHandle(msg) } return OutgoingMessage{} } @@ -110,8 +102,8 @@ func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) { registerMultiplexer(&m.msgHandlers, dispatch) } -// RegisterProcessors registers the set of given message handlers. -func (m *Multiplexer) RegisterProcessors(dispatch []TaggedMessageProcessor) { +// RegisterValidatorHandlers registers the set of given message handlers. +func (m *Multiplexer) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) { registerMultiplexer(&m.msgProcessors, dispatch) } @@ -145,5 +137,5 @@ func (m *Multiplexer) ClearHandlers(excludeTags []Tag) { // ClearProcessors deregisters all the existing message handlers other than the one provided in the excludeTags list func (m *Multiplexer) ClearProcessors(excludeTags []Tag) { - clearMultiplexer[MessageProcessor](&m.msgProcessors, excludeTags) + clearMultiplexer[MessageValidatorHandler](&m.msgProcessors, excludeTags) } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 1ad49bd045..0f5e2d1371 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -685,9 +685,9 @@ func (n *P2PNetwork) ClearHandlers() { n.handler.ClearHandlers([]Tag{}) } -// RegisterProcessors adds to the set of given message handlers. -func (n *P2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { - n.handler.RegisterProcessors(dispatch) +// RegisterValidatorHandlers adds to the set of given message handlers. +func (n *P2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) { + n.handler.RegisterValidatorHandlers(dispatch) } // ClearProcessors deregisters all the existing message handlers. @@ -880,7 +880,8 @@ func (n *P2PNetwork) txTopicHandleLoop() { n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName) for { - msg, err := sub.Next(n.ctx) + // msg from sub.Next not used since all work done by txTopicValidator + _, err := sub.Next(n.ctx) if err != nil { if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID()) @@ -889,13 +890,6 @@ func (n *P2PNetwork) txTopicHandleLoop() { sub.Cancel() return } - // if there is a self-sent the message no need to process it. - if msg.ReceivedFrom == n.service.ID() { - continue - } - - _ = n.handler.Process(msg.ValidatorData.(ValidatedMessage)) - // participation or configuration change, cancel subscription and quit if !n.wantTXGossip.Load() { n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 5b3470689f..2b6fefd671 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -110,26 +110,22 @@ func TestP2PSubmitTX(t *testing.T) { // now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other // Since we aren't using the transaction handler in this test, we need to register a pass-through handler - passThroughHandler := []TaggedMessageProcessor{ + passThroughHandler := []TaggedMessageValidatorHandler{ { Tag: protocol.TxnTag, MessageHandler: struct { - ProcessorValidateFunc - ProcessorHandleFunc + ValidateHandleFunc }{ - ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { - return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil} - }), - ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { - return OutgoingMessage{Action: Ignore} + ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { + return OutgoingMessage{Action: Accept, Tag: msg.Tag} }), }, }, } - netA.RegisterProcessors(passThroughHandler) - netB.RegisterProcessors(passThroughHandler) - netC.RegisterProcessors(passThroughHandler) + netA.RegisterValidatorHandlers(passThroughHandler) + netB.RegisterValidatorHandlers(passThroughHandler) + netC.RegisterValidatorHandlers(passThroughHandler) // send messages from B and confirm that they get received by C (via A) for i := 0; i < 10; i++ { @@ -202,25 +198,21 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { // ensure netC cannot receive messages - passThroughHandler := []TaggedMessageProcessor{ + passThroughHandler := []TaggedMessageValidatorHandler{ { Tag: protocol.TxnTag, MessageHandler: struct { - ProcessorValidateFunc - ProcessorHandleFunc + ValidateHandleFunc }{ - ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { - return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil} - }), - ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { - return OutgoingMessage{Action: Ignore} + ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { + return OutgoingMessage{Action: Accept, Tag: msg.Tag} }), }, }, } - netB.RegisterProcessors(passThroughHandler) - netC.RegisterProcessors(passThroughHandler) + netB.RegisterValidatorHandlers(passThroughHandler) + netC.RegisterValidatorHandlers(passThroughHandler) for i := 0; i < 10; i++ { err = netA.Broadcast(context.Background(), protocol.TxnTag, []byte(fmt.Sprintf("test %d", i)), false, nil) require.NoError(t, err) @@ -850,26 +842,22 @@ func TestP2PRelay(t *testing.T) { return netA.hasPeers() && netB.hasPeers() }, 2*time.Second, 50*time.Millisecond) - makeCounterHandler := func(numExpected int, counter *atomic.Uint32, msgs *[][]byte) ([]TaggedMessageProcessor, chan struct{}) { + makeCounterHandler := func(numExpected int, counter *atomic.Uint32, msgs *[][]byte) ([]TaggedMessageValidatorHandler, chan struct{}) { counterDone := make(chan struct{}) - counterHandler := []TaggedMessageProcessor{ + counterHandler := []TaggedMessageValidatorHandler{ { Tag: protocol.TxnTag, MessageHandler: struct { - ProcessorValidateFunc - ProcessorHandleFunc + ValidateHandleFunc }{ - ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { - return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: msg.Data} - }), - ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { + ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { if msgs != nil { - *msgs = append(*msgs, msg.ValidatedMessage.([]byte)) + *msgs = append(*msgs, msg.Data) } if count := counter.Add(1); int(count) >= numExpected { close(counterDone) } - return OutgoingMessage{Action: Ignore} + return OutgoingMessage{Action: Accept, Tag: msg.Tag} }), }, }, @@ -878,7 +866,7 @@ func TestP2PRelay(t *testing.T) { } var counter atomic.Uint32 counterHandler, counterDone := makeCounterHandler(1, &counter, nil) - netA.RegisterProcessors(counterHandler) + netA.RegisterValidatorHandlers(counterHandler) // send 5 messages from both netB to netA // since there is no node with listening address set => no messages should be received @@ -933,7 +921,7 @@ func TestP2PRelay(t *testing.T) { var loggedMsgs [][]byte counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs) netA.ClearProcessors() - netA.RegisterProcessors(counterHandler) + netA.RegisterValidatorHandlers(counterHandler) for i := 0; i < expectedMsgs/2; i++ { err := netB.Relay(context.Background(), protocol.TxnTag, []byte{5, 6, 7, byte(i)}, true, nil) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index f222d2ff27..ff890d50b1 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -864,8 +864,8 @@ func (wn *WebsocketNetwork) ClearHandlers() { wn.handler.ClearHandlers([]Tag{protocol.PingTag, protocol.PingReplyTag, protocol.NetPrioResponseTag}) } -// RegisterProcessors registers the set of given message handlers. -func (wn *WebsocketNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { +// RegisterValidatorHandlers registers the set of given message handlers. +func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) { } // ClearProcessors deregisters all the existing message handlers. From 1b04acaebc4bb2fe8cb48181ad556f8c5e463e8a Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 17 Jul 2024 15:15:14 -0400 Subject: [PATCH 04/16] some extra cleanup --- data/txHandler.go | 2 ++ network/multiplexer.go | 28 ++++++++++++++-------------- network/p2pNetwork.go | 4 ++-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index d5c09ed744..5c9f044e81 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -823,6 +823,8 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa } } + // TODO: implement a proper batching when more messages can be batched together + // and each validator waits on a channel for its result. jobs := []execpool.InputJob{&verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}} handler.batchProcessor.ProcessBatch(jobs) diff --git a/network/multiplexer.go b/network/multiplexer.go index ba92517ddc..ca40f9b0e4 100644 --- a/network/multiplexer.go +++ b/network/multiplexer.go @@ -24,15 +24,15 @@ import ( // Multiplexer is a message handler that sorts incoming messages by Tag and passes // them along to the relevant message handler for that type of message. type Multiplexer struct { - msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map. - msgProcessors atomic.Value // stores map[Tag]MessageProcessor, an immutable map. + msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map. + msgValidatorHandlers atomic.Value // stores map[Tag]MessageValidatorHandler, an immutable map. } // MakeMultiplexer creates an empty Multiplexer func MakeMultiplexer() *Multiplexer { m := &Multiplexer{} - m.ClearHandlers(nil) // allocate the map - m.ClearProcessors(nil) // allocate the map + m.ClearHandlers(nil) // allocate the map + m.ClearValidatorHandlers(nil) // allocate the map return m } @@ -60,9 +60,9 @@ func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) { return getHandler[MessageHandler](&m.msgHandlers, tag) } -// Retrieves the processor for the given message Tag from the processors array. -func (m *Multiplexer) getProcessor(tag Tag) (MessageValidatorHandler, bool) { - return getHandler[MessageValidatorHandler](&m.msgProcessors, tag) +// Retrieves the validating handler for the given message Tag from the validating handlers array. +func (m *Multiplexer) getValidatorHandler(tag Tag) (MessageValidatorHandler, bool) { + return getHandler[MessageValidatorHandler](&m.msgValidatorHandlers, tag) } // Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. @@ -73,9 +73,9 @@ func (m *Multiplexer) Handle(msg IncomingMessage) OutgoingMessage { return OutgoingMessage{} } -// Validate is an alternative "input" side of the multiplexer. It dispatches the message to the previously defined validator. -func (m *Multiplexer) Validate(msg IncomingMessage) OutgoingMessage { - if handler, ok := m.getProcessor(msg.Tag); ok { +// ValidateHandle is an alternative "input" side of the multiplexer. It dispatches the message to the previously defined validator. +func (m *Multiplexer) ValidateHandle(msg IncomingMessage) OutgoingMessage { + if handler, ok := m.getValidatorHandler(msg.Tag); ok { return handler.ValidateHandle(msg) } return OutgoingMessage{} @@ -104,7 +104,7 @@ func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) { // RegisterValidatorHandlers registers the set of given message handlers. func (m *Multiplexer) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) { - registerMultiplexer(&m.msgProcessors, dispatch) + registerMultiplexer(&m.msgValidatorHandlers, dispatch) } func clearMultiplexer[T any](target *atomic.Value, excludeTags []Tag) { @@ -135,7 +135,7 @@ func (m *Multiplexer) ClearHandlers(excludeTags []Tag) { clearMultiplexer[MessageHandler](&m.msgHandlers, excludeTags) } -// ClearProcessors deregisters all the existing message handlers other than the one provided in the excludeTags list -func (m *Multiplexer) ClearProcessors(excludeTags []Tag) { - clearMultiplexer[MessageValidatorHandler](&m.msgProcessors, excludeTags) +// ClearValidatorHandlers deregisters all the existing message handlers other than the one provided in the excludeTags list +func (m *Multiplexer) ClearValidatorHandlers(excludeTags []Tag) { + clearMultiplexer[MessageValidatorHandler](&m.msgValidatorHandlers, excludeTags) } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 0f5e2d1371..a2f8948c95 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -692,7 +692,7 @@ func (n *P2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidator // ClearProcessors deregisters all the existing message handlers. func (n *P2PNetwork) ClearProcessors() { - n.handler.ClearProcessors([]Tag{}) + n.handler.ClearValidatorHandlers([]Tag{}) } // GetHTTPClient returns a http.Client with a suitable for the network Transport @@ -933,7 +933,7 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * peerStats.txReceived.Add(1) n.peerStatsMu.Unlock() - outmsg := n.handler.Validate(inmsg) + outmsg := n.handler.ValidateHandle(inmsg) // there was a decision made in the handler about this message switch outmsg.Action { case Ignore: From b9cd44f0d1b953a38cf992490304a12b8fb61fc9 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 17 Jul 2024 15:20:31 -0400 Subject: [PATCH 05/16] match number channel sizes to number of validators --- data/txHandler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 5c9f044e81..20f478e5f2 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -178,8 +178,9 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { streamVerifierChan: make(chan execpool.InputJob), streamVerifierDropped: make(chan *verify.UnverifiedTxnSigJob), - postVerificationQueue2: make(chan *verify.VerificationResult, 1), - streamVerifierDropped2: make(chan *verify.UnverifiedTxnSigJob, 1), + // match to the number of validator workers + postVerificationQueue2: make(chan *verify.VerificationResult, 20), + streamVerifierDropped2: make(chan *verify.UnverifiedTxnSigJob, 20), } if opts.Config.TxFilterRawMsgEnabled() { From 74adfd7dbddbd43a5e703ea456589938e333dced Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 18 Jul 2024 12:28:54 -0400 Subject: [PATCH 06/16] verify: extract TxnGroupBatchSigVerifier subclass from txnSigBatchProcessor --- data/transactions/verify/txnBatch.go | 52 +++++++++++++++++------ data/transactions/verify/txnBatch_test.go | 48 ++++++++++++++++++--- 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/data/transactions/verify/txnBatch.go b/data/transactions/verify/txnBatch.go index 8619208da8..ce1399133d 100644 --- a/data/transactions/verify/txnBatch.go +++ b/data/transactions/verify/txnBatch.go @@ -17,7 +17,7 @@ package verify import ( - "errors" + "fmt" "sync/atomic" "github.com/algorand/go-algorand/crypto" @@ -98,10 +98,16 @@ func (bl *batchLoad) addLoad(txngrp []transactions.SignedTxn, gctx *GroupContext } +// TxnGroupBatchSigVerifier provides Verify method to synchronously verify a group of transactions +// It starts a new block listener to receive latests block headers for the sig verification +type TxnGroupBatchSigVerifier struct { + cache VerifiedTransactionCache + nbw *NewBlockWatcher + ledger logic.LedgerForSignature +} + type txnSigBatchProcessor struct { - cache VerifiedTransactionCache - nbw *NewBlockWatcher - ledger logic.LedgerForSignature + TxnGroupBatchSigVerifier resultChan chan<- *VerificationResult droppedChan chan<- *UnverifiedTxnSigJob } @@ -142,27 +148,47 @@ func (tbp txnSigBatchProcessor) sendResult(veTxnGroup []transactions.SignedTxn, } } -// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface -func MakeSigVerifyJobProcessor(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache, - resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob) (svp execpool.BatchProcessor, err error) { +func MakeSigVerifier(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache) (TxnGroupBatchSigVerifier, error) { latest := ledger.Latest() latestHdr, err := ledger.BlockHdr(latest) if err != nil { - return nil, errors.New("MakeStreamVerifier: Could not get header for previous block") + return TxnGroupBatchSigVerifier{}, fmt.Errorf("MakeSigVerifier: Could not get header for previous block: %w", err) } nbw := MakeNewBlockWatcher(latestHdr) ledger.RegisterBlockListeners([]ledgercore.BlockListener{nbw}) + verifier := TxnGroupBatchSigVerifier{ + cache: cache, + nbw: nbw, + ledger: ledger, + } + + return verifier, nil +} + +// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface +func MakeSigVerifyJobProcessor( + ledger LedgerForStreamVerifier, cache VerifiedTransactionCache, + resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob, +) (svp execpool.BatchProcessor, err error) { + sigVerifier, err := MakeSigVerifier(ledger, cache) + if err != nil { + return nil, err + } return &txnSigBatchProcessor{ - cache: cache, - nbw: nbw, - ledger: ledger, - droppedChan: droppedChan, - resultChan: resultChan, + TxnGroupBatchSigVerifier: sigVerifier, + droppedChan: droppedChan, + resultChan: resultChan, }, nil } +func (sv *TxnGroupBatchSigVerifier) Verify(stxs []transactions.SignedTxn) error { + blockHeader := sv.nbw.getBlockHeader() + _, err := txnGroup(stxs, blockHeader, sv.cache, sv.ledger, nil) + return err +} + func (tbp *txnSigBatchProcessor) ProcessBatch(txns []execpool.InputJob) { batchVerifier, ctx := tbp.preProcessUnverifiedTxns(txns) failed, err := batchVerifier.VerifyWithFeedback() diff --git a/data/transactions/verify/txnBatch_test.go b/data/transactions/verify/txnBatch_test.go index 45693f58a1..27dcc56343 100644 --- a/data/transactions/verify/txnBatch_test.go +++ b/data/transactions/verify/txnBatch_test.go @@ -139,9 +139,7 @@ func verifyResults(txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64 require.GreaterOrEqual(t, len(unverifiedGroups), badSigResultCounter) for _, txn := range unverifiedGroups { u, _ := binary.Uvarint(txn[0].Txn.Note) - if _, has := badTxnGroups[u]; has { - delete(badTxnGroups, u) - } + delete(badTxnGroups, u) } require.Empty(t, badTxnGroups, "unverifiedGroups should have all the transactions with invalid sigs") } @@ -301,6 +299,7 @@ func TestGetNumberOfBatchableSigsInGroup(t *testing.T) { txnGroups[mod][0].Sig = crypto.Signature{} batchSigs, err := UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems() require.ErrorIs(t, err, errTxnSigHasNoSig) + require.Equal(t, uint64(0), batchSigs) mod++ _, signedTxns, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50) @@ -353,6 +352,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= txnGroups[mod][0].Msig = mSigTxn[0].Msig batchSigs, err = UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems() require.ErrorIs(t, err, errTxnSigNotWellFormed) + require.Equal(t, uint64(0), batchSigs) } // TestStreamToBatchPoolShutdown tests what happens when the exec pool shuts down @@ -437,10 +437,11 @@ func TestStreamToBatchPoolShutdown(t *testing.T) { //nolint:paralleltest // Not // send txn groups to be verified go func() { defer wg.Done() + outer: for _, tg := range txnGroups { select { case <-ctx.Done(): - break + break outer case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}: } } @@ -493,6 +494,7 @@ func TestStreamToBatchRestart(t *testing.T) { // send txn groups to be verified go func() { defer wg.Done() + outer: for i, tg := range txnGroups { if (i+1)%10 == 0 { cancel() @@ -502,7 +504,7 @@ func TestStreamToBatchRestart(t *testing.T) { } select { case <-ctx2.Done(): - break + break outer case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}: } } @@ -798,7 +800,10 @@ func TestStreamToBatchPostVBlocked(t *testing.T) { func TestStreamToBatchMakeStreamToBatchErr(t *testing.T) { partitiontest.PartitionTest(t) - _, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil) + _, err := MakeSigVerifier(&DummyLedgerForSignature{badHdr: true}, nil) + require.Error(t, err) + + _, err = MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil) require.Error(t, err) } @@ -863,7 +868,7 @@ func TestGetErredUnprocessed(t *testing.T) { droppedChan := make(chan *UnverifiedTxnSigJob, 1) svh := txnSigBatchProcessor{ - resultChan: make(chan<- *VerificationResult, 0), + resultChan: make(chan<- *VerificationResult), droppedChan: droppedChan, } @@ -871,3 +876,32 @@ func TestGetErredUnprocessed(t *testing.T) { dropped := <-droppedChan require.Equal(t, *dropped, UnverifiedTxnSigJob{}) } + +func TestSigVerifier(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfTxns := 16 + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, numOfTxns, 0, 0) + require.GreaterOrEqual(t, len(txnGroups), 1) + require.Equal(t, len(badTxnGroups), 0) + txnGroup := txnGroups[0] + + verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t) + defer verificationPool.Shutdown() + + cache := MakeVerifiedTransactionCache(50000) + + verifier, err := MakeSigVerifier(&DummyLedgerForSignature{}, cache) + require.NoError(t, err) + + err = verifier.Verify(txnGroup) + require.NoError(t, err) + + txnGroups, badTxnGroups = getSignedTransactions(numOfTxns, numOfTxns, 0, 1) + require.GreaterOrEqual(t, len(txnGroups), 1) + require.Greater(t, len(badTxnGroups), 0) + txnGroup = txnGroups[0] + + err = verifier.Verify(txnGroup) + require.Error(t, err) +} From f3f63f91b84f5f051fa4f8c23feddd21498abfc2 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 18 Jul 2024 12:52:09 -0400 Subject: [PATCH 07/16] txhandler: use TxnGroupBatchSigVerifier --- data/txHandler.go | 78 +++++++++++++------------------------ data/txHandler_test.go | 87 ++++++++++++++++++++++++++++++++---------- 2 files changed, 92 insertions(+), 73 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 20f478e5f2..e30174fb23 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -133,9 +133,8 @@ type TxHandler struct { appLimiter *appRateLimiter appLimiterBacklogThreshold int - batchProcessor execpool.BatchProcessor - streamVerifierDropped2 chan *verify.UnverifiedTxnSigJob - postVerificationQueue2 chan *verify.VerificationResult + // batchVerifier provides synchronous verification of transaction groups + batchVerifier verify.TxnGroupBatchSigVerifier } // TxHandlerOpts is TxHandler configuration options @@ -177,10 +176,6 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { net: opts.Net, streamVerifierChan: make(chan execpool.InputJob), streamVerifierDropped: make(chan *verify.UnverifiedTxnSigJob), - - // match to the number of validator workers - postVerificationQueue2: make(chan *verify.VerificationResult, 20), - streamVerifierDropped2: make(chan *verify.UnverifiedTxnSigJob, 20), } if opts.Config.TxFilterRawMsgEnabled() { @@ -219,8 +214,7 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { // prepare the batch processor for pubsub synchronous verification var err0 error - handler.batchProcessor, err0 = verify.MakeSigVerifyJobProcessor(handler.ledger, handler.ledger.VerifiedTransactionCache(), - handler.postVerificationQueue2, handler.streamVerifierDropped2) + handler.batchVerifier, err0 = verify.MakeSigVerifier(handler.ledger, handler.ledger.VerifiedTransactionCache()) if err0 != nil { return nil, err0 } @@ -824,56 +818,36 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa } } - // TODO: implement a proper batching when more messages can be batched together - // and each validator waits on a channel for its result. - jobs := []execpool.InputJob{&verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}} - handler.batchProcessor.ProcessBatch(jobs) - - select { - case wi := <-handler.postVerificationQueue2: - m := wi.BacklogMessage.(*txBacklogMsg) - if wi.Err != nil { - handler.postProcessReportErrors(wi.Err) - logging.Base().Warnf("Received a malformed tx group %v: %v", m.unverifiedTxGroup, wi.Err) - return network.OutgoingMessage{ - Action: network.Disconnect, - } - } - // at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction. - verifiedTxGroup := m.unverifiedTxGroup - - // save the transaction, if it has high enough fee and not already in the cache - err := handler.txPool.Remember(verifiedTxGroup) - if err != nil { - handler.rememberReportErrors(err) - logging.Base().Debugf("could not remember tx: %v", err) - return network.OutgoingMessage{ - Action: network.Ignore, - } - } - - transactionMessagesRemember.Inc(nil) - - // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. - err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) - if err != nil { - logging.Base().Infof("unable to pin transaction: %v", err) - } + err := handler.batchVerifier.Verify(wi.unverifiedTxGroup) + if err != nil { + handler.postProcessReportErrors(err) + logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, err) return network.OutgoingMessage{ - Action: network.Accept, + Action: network.Disconnect, } + } + verifiedTxGroup := wi.unverifiedTxGroup - case <-handler.streamVerifierDropped2: - transactionMessagesDroppedFromBacklog.Inc(nil) - return network.OutgoingMessage{ - Action: network.Ignore, - } - case <-handler.ctx.Done(): - transactionMessagesDroppedFromBacklog.Inc(nil) + // save the transaction, if it has high enough fee and not already in the cache + err = handler.txPool.Remember(verifiedTxGroup) + if err != nil { + handler.rememberReportErrors(err) + logging.Base().Debugf("could not remember tx: %v", err) return network.OutgoingMessage{ Action: network.Ignore, } } + + transactionMessagesRemember.Inc(nil) + + // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. + err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) + if err != nil { + logging.Base().Infof("unable to pin transaction: %v", err) + } + return network.OutgoingMessage{ + Action: network.Accept, + } } var errBackLogFullLocal = errors.New("backlog full") diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 9237865037..59ad055b99 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -1012,6 +1012,29 @@ func TestTxHandlerProcessIncomingCacheBacklogDrop(t *testing.T) { require.Equal(t, initialValue+1, currentValue) } +func makeTxns(addresses []basics.Address, secrets []*crypto.SignatureSecrets, sendIdx, recvIdx int, gh crypto.Digest) ([]transactions.SignedTxn, []byte) { + note := make([]byte, 2) + crypto.RandBytes(note) + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: addresses[sendIdx], + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, + FirstValid: 0, + LastValid: basics.Round(proto.MaxTxnLife), + Note: note, + GenesisHash: gh, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: addresses[recvIdx], + Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, + }, + } + signedTx := tx.Sign(secrets[sendIdx]) + blob := protocol.Encode(&signedTx) + return []transactions.SignedTxn{signedTx}, blob +} + func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { partitiontest.PartitionTest(t) @@ -1048,27 +1071,7 @@ loop: } } - makeTxns := func(sendIdx, recvIdx int) ([]transactions.SignedTxn, []byte) { - tx := transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: addresses[sendIdx], - Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, - FirstValid: 0, - LastValid: basics.Round(proto.MaxTxnLife), - Note: make([]byte, 2), - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: addresses[recvIdx], - Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, - }, - } - signedTx := tx.Sign(secrets[sendIdx]) - blob := protocol.Encode(&signedTx) - return []transactions.SignedTxn{signedTx}, blob - } - - stxns, blob := makeTxns(1, 2) + stxns, blob := makeTxns(addresses, secrets, 1, 2, genesisHash) action := handler.processIncomingTxn(network.IncomingMessage{Data: blob}) require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) @@ -2745,3 +2748,45 @@ func TestTxHandlerCapGuard(t *testing.T) { require.Eventually(t, func() bool { return completed.Load() }, 1*time.Second, 10*time.Millisecond) } + +func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { + partitiontest.PartitionTest(t) + + partitiontest.PartitionTest(t) + t.Parallel() + + const numUsers = 10 + addresses, secrets, genesis := makeTestGenesisAccounts(t, numUsers) + genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr) + + ledgerName := fmt.Sprintf("%s-mem", t.Name()) + const inMem = true + log := logging.TestingLog(t) + log.SetLevel(logging.Panic) + + cfg := config.GetDefaultLocal() + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) + require.NoError(t, err) + defer ledger.Close() + + handler, err := makeTestTxHandler(ledger, cfg) + require.NoError(t, err) + + // valid message + _, blob := makeTxns(addresses, secrets, 1, 2, genesisHash) + outmsg := handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob}) + require.Equal(t, outmsg.Action, network.Accept) + + // invalid signature + stxns, _ := makeTxns(addresses, secrets, 1, 2, genesisHash) + stxns[0].Sig[0] = stxns[0].Sig[0] + 1 + blob2 := protocol.Encode(&stxns[0]) + outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob2}) + require.Equal(t, outmsg.Action, network.Disconnect) + + // invalid message + _, blob = makeTxns(addresses, secrets, 1, 2, genesisHash) + blob[0] = blob[0] + 1 + outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob}) + require.Equal(t, outmsg.Action, network.Disconnect) +} From 614ba2d5c1743450fda674a3c0699cb0bd2e79a2 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 18 Jul 2024 12:58:47 -0400 Subject: [PATCH 08/16] fix linter --- data/transactions/verify/txnBatch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data/transactions/verify/txnBatch.go b/data/transactions/verify/txnBatch.go index ce1399133d..e2bd95d4ea 100644 --- a/data/transactions/verify/txnBatch.go +++ b/data/transactions/verify/txnBatch.go @@ -148,6 +148,7 @@ func (tbp txnSigBatchProcessor) sendResult(veTxnGroup []transactions.SignedTxn, } } +// MakeSigVerifier creats a new TxnGroupBatchSigVerifier for synchronous verification of transactions func MakeSigVerifier(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache) (TxnGroupBatchSigVerifier, error) { latest := ledger.Latest() latestHdr, err := ledger.BlockHdr(latest) @@ -183,6 +184,7 @@ func MakeSigVerifyJobProcessor( }, nil } +// Verify synchronously verifies the signatures of the transactions in the group func (sv *TxnGroupBatchSigVerifier) Verify(stxs []transactions.SignedTxn) error { blockHeader := sv.nbw.getBlockHeader() _, err := txnGroup(stxs, blockHeader, sv.cache, sv.ledger, nil) From cd340f18d725820794a933f47be2258d2b062d2f Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 18 Jul 2024 15:45:58 -0400 Subject: [PATCH 09/16] add pubsub raw tracer for metrics --- network/p2p/pubsub.go | 1 + network/p2p/pubsubTracer.go | 86 +++++++++++++++++++++++++++++++++++++ network/wsNetwork.go | 2 +- 3 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 network/p2p/pubsubTracer.go diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index a968bcb6a9..657baecdde 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -98,6 +98,7 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub. pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), // pubsub.WithValidateThrottle(cfg.TxBacklogSize), pubsub.WithValidateWorkers(incomingThreads), + pubsub.WithRawTracer(pubsubTracer{}), } return pubsub.NewGossipSub(ctx, host, options...) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go new file mode 100644 index 0000000000..1732aa570f --- /dev/null +++ b/network/p2p/pubsubTracer.go @@ -0,0 +1,86 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package p2p + +import ( + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/algorand/go-algorand/util/metrics" +) + +var _ = pubsub.RawTracer(pubsubTracer{}) + +var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) +var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg) + +// pubsubTracer is a tracer for pubsub events used to track metrics. +type pubsubTracer struct{} + +// AddPeer is invoked when a new peer is added. +func (t pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {} + +// RemovePeer is invoked when a peer is removed. +func (t pubsubTracer) RemovePeer(p peer.ID) {} + +// Join is invoked when a new topic is joined +func (t pubsubTracer) Join(topic string) {} + +// Leave is invoked when a topic is abandoned +func (t pubsubTracer) Leave(topic string) {} + +// Graft is invoked when a new peer is grafted on the mesh (gossipsub) +func (t pubsubTracer) Graft(p peer.ID, topic string) {} + +// Prune is invoked when a peer is pruned from the message (gossipsub) +func (t pubsubTracer) Prune(p peer.ID, topic string) {} + +// ValidateMessage is invoked when a message first enters the validation pipeline. +func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {} + +// DeliverMessage is invoked when a message is delivered +func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {} + +// RejectMessage is invoked when a message is Rejected or Ignored. +// The reason argument can be one of the named strings Reject*. +func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) { + if reason == pubsub.RejectValidationThrottled || reason == pubsub.RejectValidationQueueFull { + transactionMessagesDroppedFromBacklog.Inc(nil) + } +} + +// DuplicateMessage is invoked when a duplicate message is dropped. +func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) { + transactionMessagesDupRawMsg.Inc(nil) +} + +// ThrottlePeer is invoked when a peer is throttled by the peer gater. +func (t pubsubTracer) ThrottlePeer(p peer.ID) {} + +// RecvRPC is invoked when an incoming RPC is received. +func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {} + +// SendRPC is invoked when a RPC is sent. +func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {} + +// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. +func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} + +// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and +// the pressure release mechanism trigger, dropping messages. +func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {} diff --git a/network/wsNetwork.go b/network/wsNetwork.go index ff890d50b1..5fa0b80ec1 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -124,6 +124,7 @@ var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: " var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) +var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"}) var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) @@ -135,7 +136,6 @@ var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "alg var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"}) var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"}) -var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"}) var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."}) var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."}) From 5977ccaa1f8e2dfef272b1172791c5c1eff699fd Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 19 Jul 2024 10:34:23 -0400 Subject: [PATCH 10/16] add algod_transaction_messages_dropped_backlog_p2p --- network/p2p/pubsubTracer.go | 4 ++-- util/metrics/metrics.go | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go index 1732aa570f..7fa1c75750 100644 --- a/network/p2p/pubsubTracer.go +++ b/network/p2p/pubsubTracer.go @@ -26,7 +26,7 @@ import ( var _ = pubsub.RawTracer(pubsubTracer{}) -var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) +var transactionMessagesDroppedFromBacklogP2P = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklogP2P) var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg) // pubsubTracer is a tracer for pubsub events used to track metrics. @@ -60,7 +60,7 @@ func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {} // The reason argument can be one of the named strings Reject*. func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) { if reason == pubsub.RejectValidationThrottled || reason == pubsub.RejectValidationQueueFull { - transactionMessagesDroppedFromBacklog.Inc(nil) + transactionMessagesDroppedFromBacklogP2P.Inc(nil) } } diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index eb867729cf..519095bb6a 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -93,6 +93,8 @@ var ( TransactionMessagesHandled = MetricName{Name: "algod_transaction_messages_handled", Description: "Number of transaction messages handled"} // TransactionMessagesDroppedFromBacklog "Number of transaction messages dropped from backlog" TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"} + // TransactionMessagesDroppedFromBacklogP2P "Number of transaction messages throttled with p2p pubsub" + TransactionMessagesDroppedFromBacklogP2P = MetricName{Name: "algod_transaction_messages_dropped_backlog_p2p", Description: "Number of transaction messages throttled with p2p pubsub"} // TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool" TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"} // TransactionMessagesAlreadyCommitted "Number of duplicate or error transaction messages before placing into a backlog" From 686d9a20e7a79086d4e58b8896bf8cc9853289f7 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:24:38 -0400 Subject: [PATCH 11/16] get all the pubsub RawTracer message metrics --- network/p2p/pubsubTracer.go | 28 ++++++++++++++++++++-------- util/metrics/metrics.go | 13 +++++++++++-- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go index 7fa1c75750..ca57bc69ce 100644 --- a/network/p2p/pubsubTracer.go +++ b/network/p2p/pubsubTracer.go @@ -26,8 +26,11 @@ import ( var _ = pubsub.RawTracer(pubsubTracer{}) -var transactionMessagesDroppedFromBacklogP2P = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklogP2P) -var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg) +var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description) +var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage) +var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) +var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) +var transactionMessagesP2PValidateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PValidateMessage) // pubsubTracer is a tracer for pubsub events used to track metrics. type pubsubTracer struct{} @@ -51,22 +54,29 @@ func (t pubsubTracer) Graft(p peer.ID, topic string) {} func (t pubsubTracer) Prune(p peer.ID, topic string) {} // ValidateMessage is invoked when a message first enters the validation pipeline. -func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {} +func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) { + transactionMessagesP2PValidateMessage.Inc(nil) +} // DeliverMessage is invoked when a message is delivered -func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {} +func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) { + transactionMessagesP2PDeliverMessage.Inc(nil) +} // RejectMessage is invoked when a message is Rejected or Ignored. // The reason argument can be one of the named strings Reject*. func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) { - if reason == pubsub.RejectValidationThrottled || reason == pubsub.RejectValidationQueueFull { - transactionMessagesDroppedFromBacklogP2P.Inc(nil) + switch reason { + case pubsub.RejectValidationThrottled, pubsub.RejectValidationQueueFull, pubsub.RejectValidationFailed, pubsub.RejectValidationIgnored: + transactionMessagesP2PRejectMessage.Add(reason, 1) + default: + transactionMessagesP2PRejectMessage.Add("other", 1) } } // DuplicateMessage is invoked when a duplicate message is dropped. func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) { - transactionMessagesDupRawMsg.Inc(nil) + transactionMessagesP2PDuplicateMessage.Inc(nil) } // ThrottlePeer is invoked when a peer is throttled by the peer gater. @@ -83,4 +93,6 @@ func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} // UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and // the pressure release mechanism trigger, dropping messages. -func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {} +func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) { + transactionMessagesP2PUnderdeliverableMessage.Inc(nil) +} diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index 519095bb6a..fcc566312f 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -93,8 +93,6 @@ var ( TransactionMessagesHandled = MetricName{Name: "algod_transaction_messages_handled", Description: "Number of transaction messages handled"} // TransactionMessagesDroppedFromBacklog "Number of transaction messages dropped from backlog" TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"} - // TransactionMessagesDroppedFromBacklogP2P "Number of transaction messages throttled with p2p pubsub" - TransactionMessagesDroppedFromBacklogP2P = MetricName{Name: "algod_transaction_messages_dropped_backlog_p2p", Description: "Number of transaction messages throttled with p2p pubsub"} // TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool" TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"} // TransactionMessagesAlreadyCommitted "Number of duplicate or error transaction messages before placing into a backlog" @@ -130,6 +128,17 @@ var ( // TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue" TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"} + // TransactionMessagesP2PRejectMessage "Number of rejected p2p pubsub transaction messages" + TransactionMessagesP2PRejectMessage = MetricName{Name: "algod_transaction_messages_p2p_reject", Description: "Number of rejected p2p pubsub transaction messages"} + // TransactionMessagesP2PDuplicateMessage "Number of duplicate p2p pubsub transaction messages"} + TransactionMessagesP2PDuplicateMessage = MetricName{Name: "algod_transaction_messages_p2p_duplicate", Description: "Number of duplicate p2p pubsub transaction messages"} + // TransactionMessagesP2PDeliverMessage "Number of delivered p2p pubsub transaction messages" + TransactionMessagesP2PDeliverMessage = MetricName{Name: "algod_transaction_messages_p2p_delivered", Description: "Number of delivered p2p pubsub transaction messages"} + // TransactionMessagesP2PUndeliverableMessage "Number of undeliverable p2p pubsub transaction messages" + TransactionMessagesP2PUndeliverableMessage = MetricName{Name: "algod_transaction_messages_p2p_undeliverable", Description: "Number of undeliverable p2p pubsub transaction messages"} + // TransactionMessagesP2PValidateMessage "Number of p2p pubsub transaction messages received for validation" + TransactionMessagesP2PValidateMessage = MetricName{Name: "algod_transaction_messages_p2p_validate", Description: "Number of p2p pubsub transaction messages received for validation"} + // TransactionGroupTxSyncHandled "Number of transaction groups handled via txsync" TransactionGroupTxSyncHandled = MetricName{Name: "algod_transaction_group_txsync_handled", Description: "Number of transaction groups handled via txsync"} // TransactionGroupTxSyncRemember "Number of transaction groups remembered via txsync" From 0bced19d78c19be99dee977c93e4db42f181e4f9 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Wed, 31 Jul 2024 21:19:31 -0400 Subject: [PATCH 12/16] Update data/txHandler.go --- data/txHandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/txHandler.go b/data/txHandler.go index e30174fb23..7eb5126cf2 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -133,7 +133,7 @@ type TxHandler struct { appLimiter *appRateLimiter appLimiterBacklogThreshold int - // batchVerifier provides synchronous verification of transaction groups + // batchVerifier provides synchronous verification of transaction groups, used only by pubsub validation in validateIncomingTxMessage. batchVerifier verify.TxnGroupBatchSigVerifier } From 10bfe5311d9c8a49786e01e2e157847d64b8a48c Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 1 Aug 2024 13:08:40 -0400 Subject: [PATCH 13/16] ignore non-canonical tx messages --- data/txHandler.go | 56 ++++++++++++++++-------- data/txHandler_test.go | 97 +++++++++++++++++++++++++++--------------- 2 files changed, 100 insertions(+), 53 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 7eb5126cf2..95bb23ca92 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -256,7 +256,6 @@ func (handler *TxHandler) Start() { }) // libp2p pubsub validator and handler abstracted as TaggedMessageProcessor - // TODO: rename to validators handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{ { Tag: protocol.TxnTag, @@ -559,7 +558,7 @@ func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey * // dedupCanonical checks if the transaction group has been seen before after reencoding to canonical representation. // returns a key used for insertion if the group was not found. -func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, isDup bool) { +func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, reencoded []byte, isDup bool) { // consider situations where someone want to censor transactions A // 1. Txn A is not part of a group => txn A with a valid signature is OK // Censorship attempts are: @@ -576,14 +575,16 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed // - using individual txn from a group: {A, Z} could be poisoned by {A, B}, where B is invalid var d crypto.Digest + var reencodedBuf []byte ntx := len(unverifiedTxGroup) if ntx == 1 { // a single transaction => cache/dedup canonical txn with its signature enc := unverifiedTxGroup[0].MarshalMsg(nil) d = crypto.Hash(enc) if handler.txCanonicalCache.CheckAndPut(&d) { - return nil, true + return nil, nil, true } + reencodedBuf = enc } else { // a transaction group => cache/dedup the entire group canonical group encodeBuf := make([]byte, 0, unverifiedTxGroup[0].Msgsize()*ntx) @@ -594,14 +595,15 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed // reallocated, some assumption on size was wrong // log and skip logging.Base().Warnf("Decoded size %d does not match to encoded %d", consumed, len(encodeBuf)) - return nil, false + return nil, nil, false } d = crypto.Hash(encodeBuf) if handler.txCanonicalCache.CheckAndPut(&d) { - return nil, true + return nil, nil, true } + reencodedBuf = encodeBuf } - return &d, false + return &d, reencodedBuf, false } // incomingMsgDupCheck runs the duplicate check on a raw incoming message. @@ -696,28 +698,32 @@ func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consume return unverifiedTxGroup, consumed, false } -// incomingTxGroupDupRateLimit checks -// - if the incoming transaction group has been seen before after reencoding to canonical representation, and -// - if the sender is rate limited by the per-application rate limiter. -func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) { +// incomingTxGroupDupRateLimit checks if the incoming transaction group has been seen before after reencoding to canonical representation. +// It also return canonical representation of the transaction group allowing the caller to compare it with the input. +func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int) (*crypto.Digest, []byte, bool) { var canonicalKey *crypto.Digest + var reencoded []byte if handler.txCanonicalCache != nil { var isDup bool - if canonicalKey, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup { + if canonicalKey, reencoded, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup { transactionMessagesDupCanonical.Inc(nil) - return canonicalKey, true + return nil, nil, true } } + return canonicalKey, reencoded, false +} +// incomingTxGroupAppRateLimit checks if the sender is rate limited by the per-application rate limiter. +func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectablePeer) bool { // rate limit per application in a group. Limiting any app in a group drops the entire message. if handler.appLimiter != nil { congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) { transactionMessagesAppLimiterDrop.Inc(nil) - return canonicalKey, true + return true } } - return canonicalKey, false + return false } // processIncomingTxn decodes a transaction group from incoming message and enqueues into the back log for processing. @@ -753,13 +759,17 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net return network.OutgoingMessage{Action: network.Disconnect} } - canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) + canonicalKey, _, drop := handler.incomingTxGroupCanonicalDedup(unverifiedTxGroup, consumed) if drop { // this re-serialized txgroup was detected as a duplicate by the canonical message cache, // or it was rate-limited by the per-app rate limiter return network.OutgoingMessage{Action: network.Ignore} } + if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) { + return network.OutgoingMessage{Action: network.Ignore} + } + select { case handler.backlogQueue <- &txBacklogMsg{ rawmsg: &rawmsg, @@ -794,10 +804,20 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa return network.OutgoingMessage{Action: network.Disconnect} } - canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) + canonicalKey, reencoded, drop := handler.incomingTxGroupCanonicalDedup(unverifiedTxGroup, consumed) if drop { - // this re-serialized txgroup was detected as a duplicate by the canonical message cache, - // or it was rate-limited by the per-app rate limiter + return network.OutgoingMessage{Action: network.Ignore} + } + if reencoded == nil { + reencoded = reencode(unverifiedTxGroup) + } + + if !bytes.Equal(rawmsg.Data, reencoded) { + // ignore non-canonically encoded messages + return network.OutgoingMessage{Action: network.Ignore} + } + + if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) { return network.OutgoingMessage{Action: network.Ignore} } diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 59ad055b99..41cf713028 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -646,42 +646,42 @@ func TestTxHandlerProcessIncomingGroup(t *testing.T) { } } +func craftNonCanonical(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) []byte { + // make non-canonical encoding and ensure it is not accepted + stxnNonCanTxn := transactions.SignedTxn{Txn: stxn.Txn} + blobTxn := protocol.Encode(&stxnNonCanTxn) + stxnNonCanAuthAddr := transactions.SignedTxn{AuthAddr: stxn.AuthAddr} + blobAuthAddr := protocol.Encode(&stxnNonCanAuthAddr) + stxnNonCanAuthSig := transactions.SignedTxn{Sig: stxn.Sig} + blobSig := protocol.Encode(&stxnNonCanAuthSig) + + if blobStxn == nil { + blobStxn = protocol.Encode(stxn) + } + + // double check our skills for transactions.SignedTxn creation by creating a new canonical encoding and comparing to the original + blobValidation := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig)) + blobValidation = append(blobValidation[:], blobAuthAddr...) + blobValidation = append(blobValidation[:], blobSig[1:]...) // cut transactions.SignedTxn's field count + blobValidation = append(blobValidation[:], blobTxn[1:]...) // cut transactions.SignedTxn's field count + blobValidation[0] += 2 // increase field count + require.Equal(t, blobStxn, blobValidation) + + // craft non-canonical + blobNonCan := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig)) + blobNonCan = append(blobNonCan[:], blobTxn...) + blobNonCan = append(blobNonCan[:], blobAuthAddr[1:]...) // cut transactions.SignedTxn's field count + blobNonCan = append(blobNonCan[:], blobSig[1:]...) // cut transactions.SignedTxn's field count + blobNonCan[0] += 2 // increase field count + require.Len(t, blobNonCan, len(blobStxn)) + require.NotEqual(t, blobStxn, blobNonCan) + return blobNonCan +} + func TestTxHandlerProcessIncomingCensoring(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() - craftNonCanonical := func(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) []byte { - // make non-canonical encoding and ensure it is not accepted - stxnNonCanTxn := transactions.SignedTxn{Txn: stxn.Txn} - blobTxn := protocol.Encode(&stxnNonCanTxn) - stxnNonCanAuthAddr := transactions.SignedTxn{AuthAddr: stxn.AuthAddr} - blobAuthAddr := protocol.Encode(&stxnNonCanAuthAddr) - stxnNonCanAuthSig := transactions.SignedTxn{Sig: stxn.Sig} - blobSig := protocol.Encode(&stxnNonCanAuthSig) - - if blobStxn == nil { - blobStxn = protocol.Encode(stxn) - } - - // double check our skills for transactions.SignedTxn creation by creating a new canonical encoding and comparing to the original - blobValidation := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig)) - blobValidation = append(blobValidation[:], blobAuthAddr...) - blobValidation = append(blobValidation[:], blobSig[1:]...) // cut transactions.SignedTxn's field count - blobValidation = append(blobValidation[:], blobTxn[1:]...) // cut transactions.SignedTxn's field count - blobValidation[0] += 2 // increase field count - require.Equal(t, blobStxn, blobValidation) - - // craft non-canonical - blobNonCan := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig)) - blobNonCan = append(blobNonCan[:], blobTxn...) - blobNonCan = append(blobNonCan[:], blobAuthAddr[1:]...) // cut transactions.SignedTxn's field count - blobNonCan = append(blobNonCan[:], blobSig[1:]...) // cut transactions.SignedTxn's field count - blobNonCan[0] += 2 // increase field count - require.Len(t, blobNonCan, len(blobStxn)) - require.NotEqual(t, blobStxn, blobNonCan) - return blobNonCan - } - forgeSig := func(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) (transactions.SignedTxn, []byte) { stxnForged := *stxn crypto.RandBytes(stxnForged.Sig[:]) @@ -2750,8 +2750,6 @@ func TestTxHandlerCapGuard(t *testing.T) { } func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { - partitiontest.PartitionTest(t) - partitiontest.PartitionTest(t) t.Parallel() @@ -2777,8 +2775,16 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { outmsg := handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob}) require.Equal(t, outmsg.Action, network.Accept) + // non-canonical message + // for some reason craftNonCanonical cannot handle makeTxns output so make a simpler random txn + stxns, blob := makeRandomTransactions(1) + stxn := stxns[0] + blobNonCan := craftNonCanonical(t, &stxn, blob) + outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan}) + require.Equal(t, outmsg.Action, network.Ignore) + // invalid signature - stxns, _ := makeTxns(addresses, secrets, 1, 2, genesisHash) + stxns, _ = makeTxns(addresses, secrets, 1, 2, genesisHash) stxns[0].Sig[0] = stxns[0].Sig[0] + 1 blob2 := protocol.Encode(&stxns[0]) outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob2}) @@ -2789,4 +2795,25 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { blob[0] = blob[0] + 1 outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob}) require.Equal(t, outmsg.Action, network.Disconnect) + + t.Run("with-canonical", func(t *testing.T) { + // make sure the reencoding from the canonical dedup checker's reencoding buf is correctly reused + cfg.TxIncomingFilteringFlags = 2 + require.True(t, cfg.TxFilterCanonicalEnabled()) + handler, err := makeTestTxHandler(ledger, cfg) + require.NoError(t, err) + + // valid message + _, blob := makeTxns(addresses, secrets, 1, 2, genesisHash) + outmsg := handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob}) + require.Equal(t, outmsg.Action, network.Accept) + + // non-canonical message + // for some reason craftNonCanonical cannot handle makeTxns output so make a simpler random txn + stxns, blob := makeRandomTransactions(1) + stxn := stxns[0] + blobNonCan := craftNonCanonical(t, &stxn, blob) + outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan}) + require.Equal(t, outmsg.Action, network.Ignore) + }) } From 6a6f276440618efe1f9f2b10c30c0c3507c07898 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Thu, 1 Aug 2024 15:16:59 -0400 Subject: [PATCH 14/16] Update data/txHandler.go Co-authored-by: Jason Paulos --- data/txHandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/txHandler.go b/data/txHandler.go index 95bb23ca92..29b2756a07 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -698,7 +698,7 @@ func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consume return unverifiedTxGroup, consumed, false } -// incomingTxGroupDupRateLimit checks if the incoming transaction group has been seen before after reencoding to canonical representation. +// incomingTxGroupCanonicalDedup checks if the incoming transaction group has been seen before after reencoding to canonical representation. // It also return canonical representation of the transaction group allowing the caller to compare it with the input. func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int) (*crypto.Digest, []byte, bool) { var canonicalKey *crypto.Digest From 870186635622761eb405ef67a5da91df72892092 Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Thu, 8 Aug 2024 10:10:11 -0400 Subject: [PATCH 15/16] reject non-canonical --- data/txHandler.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 29b2756a07..7ee5764137 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -808,17 +808,18 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if drop { return network.OutgoingMessage{Action: network.Ignore} } + + if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) { + return network.OutgoingMessage{Action: network.Ignore} + } + if reencoded == nil { reencoded = reencode(unverifiedTxGroup) } if !bytes.Equal(rawmsg.Data, reencoded) { - // ignore non-canonically encoded messages - return network.OutgoingMessage{Action: network.Ignore} - } - - if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) { - return network.OutgoingMessage{Action: network.Ignore} + // reject non-canonically encoded messages + return network.OutgoingMessage{Action: network.Disconnect} } // apply backlog worker logic From 95184b5e7bf2a5c486547a3507098fa570bf6cf7 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 8 Aug 2024 10:48:05 -0400 Subject: [PATCH 16/16] Fix test after disconnect on non-canonical --- data/txHandler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 41cf713028..21afc88d59 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -2781,7 +2781,7 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { stxn := stxns[0] blobNonCan := craftNonCanonical(t, &stxn, blob) outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan}) - require.Equal(t, outmsg.Action, network.Ignore) + require.Equal(t, outmsg.Action, network.Disconnect) // invalid signature stxns, _ = makeTxns(addresses, secrets, 1, 2, genesisHash) @@ -2814,6 +2814,6 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { stxn := stxns[0] blobNonCan := craftNonCanonical(t, &stxn, blob) outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan}) - require.Equal(t, outmsg.Action, network.Ignore) + require.Equal(t, outmsg.Action, network.Disconnect) }) }