Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: handle txns in pubsub validator #6070

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan
func (network *MockNetwork) ClearHandlers() {
}

// RegisterProcessors - empty implementation.
func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) {
// RegisterValidatorHandlers - empty implementation.
func (network *MockNetwork) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) {
}

// ClearProcessors - empty implementation
Expand Down
54 changes: 41 additions & 13 deletions data/transactions/verify/txnBatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package verify

import (
"errors"
"fmt"
"sync/atomic"

"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -98,10 +98,16 @@ func (bl *batchLoad) addLoad(txngrp []transactions.SignedTxn, gctx *GroupContext

}

// TxnGroupBatchSigVerifier provides Verify method to synchronously verify a group of transactions
// It starts a new block listener to receive latests block headers for the sig verification
type TxnGroupBatchSigVerifier struct {
cache VerifiedTransactionCache
nbw *NewBlockWatcher
ledger logic.LedgerForSignature
}

type txnSigBatchProcessor struct {
cache VerifiedTransactionCache
nbw *NewBlockWatcher
ledger logic.LedgerForSignature
TxnGroupBatchSigVerifier
resultChan chan<- *VerificationResult
droppedChan chan<- *UnverifiedTxnSigJob
}
Expand Down Expand Up @@ -142,27 +148,49 @@ func (tbp txnSigBatchProcessor) sendResult(veTxnGroup []transactions.SignedTxn,
}
}

// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface
func MakeSigVerifyJobProcessor(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache,
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob) (svp execpool.BatchProcessor, err error) {
// MakeSigVerifier creats a new TxnGroupBatchSigVerifier for synchronous verification of transactions
func MakeSigVerifier(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache) (TxnGroupBatchSigVerifier, error) {
latest := ledger.Latest()
latestHdr, err := ledger.BlockHdr(latest)
if err != nil {
return nil, errors.New("MakeStreamVerifier: Could not get header for previous block")
return TxnGroupBatchSigVerifier{}, fmt.Errorf("MakeSigVerifier: Could not get header for previous block: %w", err)
}

nbw := MakeNewBlockWatcher(latestHdr)
ledger.RegisterBlockListeners([]ledgercore.BlockListener{nbw})

verifier := TxnGroupBatchSigVerifier{
cache: cache,
nbw: nbw,
ledger: ledger,
}

return verifier, nil
}

// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface
func MakeSigVerifyJobProcessor(
ledger LedgerForStreamVerifier, cache VerifiedTransactionCache,
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob,
) (svp execpool.BatchProcessor, err error) {
sigVerifier, err := MakeSigVerifier(ledger, cache)
if err != nil {
return nil, err
}
return &txnSigBatchProcessor{
cache: cache,
nbw: nbw,
ledger: ledger,
droppedChan: droppedChan,
resultChan: resultChan,
TxnGroupBatchSigVerifier: sigVerifier,
droppedChan: droppedChan,
resultChan: resultChan,
}, nil
}

// Verify synchronously verifies the signatures of the transactions in the group
func (sv *TxnGroupBatchSigVerifier) Verify(stxs []transactions.SignedTxn) error {
blockHeader := sv.nbw.getBlockHeader()
_, err := txnGroup(stxs, blockHeader, sv.cache, sv.ledger, nil)
return err
}

func (tbp *txnSigBatchProcessor) ProcessBatch(txns []execpool.InputJob) {
batchVerifier, ctx := tbp.preProcessUnverifiedTxns(txns)
failed, err := batchVerifier.VerifyWithFeedback()
Expand Down
48 changes: 41 additions & 7 deletions data/transactions/verify/txnBatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ func verifyResults(txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64
require.GreaterOrEqual(t, len(unverifiedGroups), badSigResultCounter)
for _, txn := range unverifiedGroups {
u, _ := binary.Uvarint(txn[0].Txn.Note)
if _, has := badTxnGroups[u]; has {
delete(badTxnGroups, u)
}
delete(badTxnGroups, u)
}
require.Empty(t, badTxnGroups, "unverifiedGroups should have all the transactions with invalid sigs")
}
Expand Down Expand Up @@ -301,6 +299,7 @@ func TestGetNumberOfBatchableSigsInGroup(t *testing.T) {
txnGroups[mod][0].Sig = crypto.Signature{}
batchSigs, err := UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems()
require.ErrorIs(t, err, errTxnSigHasNoSig)
require.Equal(t, uint64(0), batchSigs)
mod++

_, signedTxns, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50)
Expand Down Expand Up @@ -353,6 +352,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E=
txnGroups[mod][0].Msig = mSigTxn[0].Msig
batchSigs, err = UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems()
require.ErrorIs(t, err, errTxnSigNotWellFormed)
require.Equal(t, uint64(0), batchSigs)
}

// TestStreamToBatchPoolShutdown tests what happens when the exec pool shuts down
Expand Down Expand Up @@ -437,10 +437,11 @@ func TestStreamToBatchPoolShutdown(t *testing.T) { //nolint:paralleltest // Not
// send txn groups to be verified
go func() {
defer wg.Done()
outer:
for _, tg := range txnGroups {
select {
case <-ctx.Done():
break
break outer
case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}:
}
}
Expand Down Expand Up @@ -493,6 +494,7 @@ func TestStreamToBatchRestart(t *testing.T) {
// send txn groups to be verified
go func() {
defer wg.Done()
outer:
for i, tg := range txnGroups {
if (i+1)%10 == 0 {
cancel()
Expand All @@ -502,7 +504,7 @@ func TestStreamToBatchRestart(t *testing.T) {
}
select {
case <-ctx2.Done():
break
break outer
case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}:
}
}
Expand Down Expand Up @@ -798,7 +800,10 @@ func TestStreamToBatchPostVBlocked(t *testing.T) {

func TestStreamToBatchMakeStreamToBatchErr(t *testing.T) {
partitiontest.PartitionTest(t)
_, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil)
_, err := MakeSigVerifier(&DummyLedgerForSignature{badHdr: true}, nil)
require.Error(t, err)

_, err = MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil)
require.Error(t, err)
}

Expand Down Expand Up @@ -863,11 +868,40 @@ func TestGetErredUnprocessed(t *testing.T) {

droppedChan := make(chan *UnverifiedTxnSigJob, 1)
svh := txnSigBatchProcessor{
resultChan: make(chan<- *VerificationResult, 0),
resultChan: make(chan<- *VerificationResult),
droppedChan: droppedChan,
}

svh.GetErredUnprocessed(&UnverifiedTxnSigJob{}, nil)
dropped := <-droppedChan
require.Equal(t, *dropped, UnverifiedTxnSigJob{})
}

func TestSigVerifier(t *testing.T) {
partitiontest.PartitionTest(t)

numOfTxns := 16
txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, numOfTxns, 0, 0)
require.GreaterOrEqual(t, len(txnGroups), 1)
require.Equal(t, len(badTxnGroups), 0)
txnGroup := txnGroups[0]

verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t)
defer verificationPool.Shutdown()

cache := MakeVerifiedTransactionCache(50000)

verifier, err := MakeSigVerifier(&DummyLedgerForSignature{}, cache)
require.NoError(t, err)

err = verifier.Verify(txnGroup)
require.NoError(t, err)

txnGroups, badTxnGroups = getSignedTransactions(numOfTxns, numOfTxns, 0, 1)
require.GreaterOrEqual(t, len(txnGroups), 1)
require.Greater(t, len(badTxnGroups), 0)
txnGroup = txnGroups[0]

err = verifier.Verify(txnGroup)
require.Error(t, err)
}
107 changes: 62 additions & 45 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@
erl *util.ElasticRateLimiter
appLimiter *appRateLimiter
appLimiterBacklogThreshold int

// batchVerifier provides synchronous verification of transaction groups
cce marked this conversation as resolved.
Show resolved Hide resolved
batchVerifier verify.TxnGroupBatchSigVerifier
}

// TxHandlerOpts is TxHandler configuration options
Expand Down Expand Up @@ -209,6 +212,13 @@
}
}

// prepare the batch processor for pubsub synchronous verification
var err0 error
handler.batchVerifier, err0 = verify.MakeSigVerifier(handler.ledger, handler.ledger.VerifiedTransactionCache())
if err0 != nil {
return nil, err0

Check warning on line 219 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L219

Added line #L219 was not covered by tests
}

// prepare the transaction stream verifier
var err error
txnElementProcessor, err := verify.MakeSigVerifyJobProcessor(handler.ledger, handler.ledger.VerifiedTransactionCache(),
Expand Down Expand Up @@ -246,16 +256,15 @@
})

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
handler.net.RegisterProcessors([]network.TaggedMessageProcessor{
// TODO: rename to validators
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
{
Tag: protocol.TxnTag,
// create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface
MessageHandler: struct {
network.ProcessorValidateFunc
network.ProcessorHandleFunc
network.ValidateHandleFunc
}{
network.ProcessorValidateFunc(handler.validateIncomingTxMessage),
network.ProcessorHandleFunc(handler.processIncomingTxMessage),
network.ValidateHandleFunc(handler.validateIncomingTxMessage),
},
},
})
Expand Down Expand Up @@ -348,7 +357,7 @@
}
continue
}
// handler.streamVerifierChan does not receive if ctx is cancled
// handler.streamVerifierChan does not receive if ctx is cancelled
select {
case handler.streamVerifierChan <- &verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}:
case <-handler.ctx.Done():
Expand Down Expand Up @@ -772,65 +781,73 @@
return network.OutgoingMessage{Action: network.Ignore}
}

type validatedIncomingTxMessage struct {
rawmsg network.IncomingMessage
unverifiedTxGroup []transactions.SignedTxn
msgKey *crypto.Digest
canonicalKey *crypto.Digest
}

// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage {
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.OutgoingMessage {
msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data)
if isDup {
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 788 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L788

Added line #L788 was not covered by tests
}

unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
if invalid {
// invalid encoding or exceeding txgroup, disconnect from this peer
return network.ValidatedMessage{Action: network.Disconnect, ValidatedMessage: nil}
return network.OutgoingMessage{Action: network.Disconnect}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
if drop {
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 801 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L801

Added line #L801 was not covered by tests
}

return network.ValidatedMessage{
Action: network.Accept,
Tag: rawmsg.Tag,
ValidatedMessage: &validatedIncomingTxMessage{
rawmsg: rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
msgKey: msgKey,
canonicalKey: canonicalKey,
},
}
}
// apply backlog worker logic

// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage {
msg := validatedMessage.ValidatedMessage.(*validatedIncomingTxMessage)
select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &msg.rawmsg,
unverifiedTxGroup: msg.unverifiedTxGroup,
rawmsgDataHash: msg.msgKey,
unverifiedTxGroupHash: msg.canonicalKey,
wi := &txBacklogMsg{
rawmsg: &rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
rawmsgDataHash: msgKey,
unverifiedTxGroupHash: canonicalKey,
capguard: nil,
}:
default:
// if we failed here we want to increase the corresponding metric. It might suggest that we
// want to increase the queue size.
transactionMessagesDroppedFromBacklog.Inc(nil)
}

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
handler.deleteFromCaches(msg.msgKey, msg.canonicalKey)
if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
return network.OutgoingMessage{
Action: network.Ignore,

Check warning on line 817 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L815-L817

Added lines #L815 - L817 were not covered by tests
}
}

err := handler.batchVerifier.Verify(wi.unverifiedTxGroup)
if err != nil {
handler.postProcessReportErrors(err)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, err)
return network.OutgoingMessage{
Action: network.Disconnect,
}
}
verifiedTxGroup := wi.unverifiedTxGroup

// save the transaction, if it has high enough fee and not already in the cache
err = handler.txPool.Remember(verifiedTxGroup)
if err != nil {
handler.rememberReportErrors(err)
logging.Base().Debugf("could not remember tx: %v", err)
return network.OutgoingMessage{
Action: network.Ignore,

Check warning on line 837 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L834-L837

Added lines #L834 - L837 were not covered by tests
}
}

transactionMessagesRemember.Inc(nil)

// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)

Check warning on line 846 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L846

Added line #L846 was not covered by tests
}
return network.OutgoingMessage{
cce marked this conversation as resolved.
Show resolved Hide resolved
Action: network.Accept,
}
return network.OutgoingMessage{Action: network.Ignore}
}

var errBackLogFullLocal = errors.New("backlog full")
Expand Down
Loading
Loading