Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: handle txns in pubsub validator #6070

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan
func (network *MockNetwork) ClearHandlers() {
}

// RegisterProcessors - empty implementation.
func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) {
// RegisterValidatorHandlers - empty implementation.
func (network *MockNetwork) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) {
}

// ClearProcessors - empty implementation
Expand Down
54 changes: 41 additions & 13 deletions data/transactions/verify/txnBatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package verify

import (
"errors"
"fmt"
"sync/atomic"

"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -98,10 +98,16 @@ func (bl *batchLoad) addLoad(txngrp []transactions.SignedTxn, gctx *GroupContext

}

// TxnGroupBatchSigVerifier provides Verify method to synchronously verify a group of transactions
// It starts a new block listener to receive latests block headers for the sig verification
type TxnGroupBatchSigVerifier struct {
cache VerifiedTransactionCache
nbw *NewBlockWatcher
ledger logic.LedgerForSignature
}

type txnSigBatchProcessor struct {
cache VerifiedTransactionCache
nbw *NewBlockWatcher
ledger logic.LedgerForSignature
TxnGroupBatchSigVerifier
resultChan chan<- *VerificationResult
droppedChan chan<- *UnverifiedTxnSigJob
}
Expand Down Expand Up @@ -142,27 +148,49 @@ func (tbp txnSigBatchProcessor) sendResult(veTxnGroup []transactions.SignedTxn,
}
}

// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface
func MakeSigVerifyJobProcessor(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache,
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob) (svp execpool.BatchProcessor, err error) {
// MakeSigVerifier creats a new TxnGroupBatchSigVerifier for synchronous verification of transactions
func MakeSigVerifier(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache) (TxnGroupBatchSigVerifier, error) {
latest := ledger.Latest()
latestHdr, err := ledger.BlockHdr(latest)
if err != nil {
return nil, errors.New("MakeStreamVerifier: Could not get header for previous block")
return TxnGroupBatchSigVerifier{}, fmt.Errorf("MakeSigVerifier: Could not get header for previous block: %w", err)
}

nbw := MakeNewBlockWatcher(latestHdr)
ledger.RegisterBlockListeners([]ledgercore.BlockListener{nbw})

verifier := TxnGroupBatchSigVerifier{
cache: cache,
nbw: nbw,
ledger: ledger,
}

return verifier, nil
}

// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface
func MakeSigVerifyJobProcessor(
ledger LedgerForStreamVerifier, cache VerifiedTransactionCache,
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob,
) (svp execpool.BatchProcessor, err error) {
sigVerifier, err := MakeSigVerifier(ledger, cache)
if err != nil {
return nil, err
}
return &txnSigBatchProcessor{
cache: cache,
nbw: nbw,
ledger: ledger,
droppedChan: droppedChan,
resultChan: resultChan,
TxnGroupBatchSigVerifier: sigVerifier,
droppedChan: droppedChan,
resultChan: resultChan,
}, nil
}

// Verify synchronously verifies the signatures of the transactions in the group
func (sv *TxnGroupBatchSigVerifier) Verify(stxs []transactions.SignedTxn) error {
blockHeader := sv.nbw.getBlockHeader()
_, err := txnGroup(stxs, blockHeader, sv.cache, sv.ledger, nil)
return err
}

func (tbp *txnSigBatchProcessor) ProcessBatch(txns []execpool.InputJob) {
batchVerifier, ctx := tbp.preProcessUnverifiedTxns(txns)
failed, err := batchVerifier.VerifyWithFeedback()
Expand Down
48 changes: 41 additions & 7 deletions data/transactions/verify/txnBatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ func verifyResults(txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64
require.GreaterOrEqual(t, len(unverifiedGroups), badSigResultCounter)
for _, txn := range unverifiedGroups {
u, _ := binary.Uvarint(txn[0].Txn.Note)
if _, has := badTxnGroups[u]; has {
delete(badTxnGroups, u)
}
delete(badTxnGroups, u)
}
require.Empty(t, badTxnGroups, "unverifiedGroups should have all the transactions with invalid sigs")
}
Expand Down Expand Up @@ -301,6 +299,7 @@ func TestGetNumberOfBatchableSigsInGroup(t *testing.T) {
txnGroups[mod][0].Sig = crypto.Signature{}
batchSigs, err := UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems()
require.ErrorIs(t, err, errTxnSigHasNoSig)
require.Equal(t, uint64(0), batchSigs)
mod++

_, signedTxns, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50)
Expand Down Expand Up @@ -353,6 +352,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E=
txnGroups[mod][0].Msig = mSigTxn[0].Msig
batchSigs, err = UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems()
require.ErrorIs(t, err, errTxnSigNotWellFormed)
require.Equal(t, uint64(0), batchSigs)
}

// TestStreamToBatchPoolShutdown tests what happens when the exec pool shuts down
Expand Down Expand Up @@ -437,10 +437,11 @@ func TestStreamToBatchPoolShutdown(t *testing.T) { //nolint:paralleltest // Not
// send txn groups to be verified
go func() {
defer wg.Done()
outer:
for _, tg := range txnGroups {
select {
case <-ctx.Done():
break
break outer
case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}:
}
}
Expand Down Expand Up @@ -493,6 +494,7 @@ func TestStreamToBatchRestart(t *testing.T) {
// send txn groups to be verified
go func() {
defer wg.Done()
outer:
for i, tg := range txnGroups {
if (i+1)%10 == 0 {
cancel()
Expand All @@ -502,7 +504,7 @@ func TestStreamToBatchRestart(t *testing.T) {
}
select {
case <-ctx2.Done():
break
break outer
case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}:
}
}
Expand Down Expand Up @@ -798,7 +800,10 @@ func TestStreamToBatchPostVBlocked(t *testing.T) {

func TestStreamToBatchMakeStreamToBatchErr(t *testing.T) {
partitiontest.PartitionTest(t)
_, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil)
_, err := MakeSigVerifier(&DummyLedgerForSignature{badHdr: true}, nil)
require.Error(t, err)

_, err = MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil)
require.Error(t, err)
}

Expand Down Expand Up @@ -863,11 +868,40 @@ func TestGetErredUnprocessed(t *testing.T) {

droppedChan := make(chan *UnverifiedTxnSigJob, 1)
svh := txnSigBatchProcessor{
resultChan: make(chan<- *VerificationResult, 0),
resultChan: make(chan<- *VerificationResult),
droppedChan: droppedChan,
}

svh.GetErredUnprocessed(&UnverifiedTxnSigJob{}, nil)
dropped := <-droppedChan
require.Equal(t, *dropped, UnverifiedTxnSigJob{})
}

func TestSigVerifier(t *testing.T) {
partitiontest.PartitionTest(t)

numOfTxns := 16
txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, numOfTxns, 0, 0)
require.GreaterOrEqual(t, len(txnGroups), 1)
require.Equal(t, len(badTxnGroups), 0)
txnGroup := txnGroups[0]

verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t)
defer verificationPool.Shutdown()

cache := MakeVerifiedTransactionCache(50000)

verifier, err := MakeSigVerifier(&DummyLedgerForSignature{}, cache)
require.NoError(t, err)

err = verifier.Verify(txnGroup)
require.NoError(t, err)

txnGroups, badTxnGroups = getSignedTransactions(numOfTxns, numOfTxns, 0, 1)
require.GreaterOrEqual(t, len(txnGroups), 1)
require.Greater(t, len(badTxnGroups), 0)
txnGroup = txnGroups[0]

err = verifier.Verify(txnGroup)
require.Error(t, err)
}
Loading
Loading