Skip to content

Commit

Permalink
txHandler: add more metric (#4786)
Browse files Browse the repository at this point in the history
* add txHandler backlog error reason counters
* add txHandler backlog size gauge
* add txsync counters
* add TX, AV, PP, MI counters to
  DisconnectPeerEventDetails and PeerConnectionDetails

Co-authored-by: chris erway <chris.erway@algorand.com>
  • Loading branch information
algorandskiy and cce authored Nov 17, 2022
1 parent 526cb89 commit d22fa42
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 43 deletions.
104 changes: 79 additions & 25 deletions data/transactions/verify/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,52 @@ type GroupContext struct {
ledger logic.LedgerForSignature
}

var errTxGroupInvalidFee = errors.New("txgroup fee requirement overflow")
var errTxnSigHasNoSig = errors.New("signedtxn has no sig")
var errTxnSigNotWellFormed = errors.New("signedtxn should only have one of Sig or Msig or LogicSig")
var errRekeyingNotSupported = errors.New("nonempty AuthAddr but rekeying is not supported")
var errUnknownSignature = errors.New("has one mystery sig. WAT?")

// TxGroupErrorReason is reason code for ErrTxGroupError
type TxGroupErrorReason int

const (
// TxGroupErrorReasonGeneric is a generic (not tracked) reason code
TxGroupErrorReasonGeneric TxGroupErrorReason = iota
// TxGroupErrorReasonNotWellFormed is txn.WellFormed failure
TxGroupErrorReasonNotWellFormed
// TxGroupErrorReasonInvalidFee is invalid fee pooling in transaction group
TxGroupErrorReasonInvalidFee
// TxGroupErrorReasonHasNoSig is for transaction without any signature
TxGroupErrorReasonHasNoSig
// TxGroupErrorReasonSigNotWellFormed defines signature format errors
TxGroupErrorReasonSigNotWellFormed
// TxGroupErrorReasonMsigNotWellFormed defines multisig format errors
TxGroupErrorReasonMsigNotWellFormed
// TxGroupErrorReasonLogicSigFailed defines logic sig validation errors
TxGroupErrorReasonLogicSigFailed

// TxGroupErrorReasonNumValues is number of enum values
TxGroupErrorReasonNumValues
)

// ErrTxGroupError is an error from txn pre-validation (well form-ness, signature format, etc).
// It can be unwrapped into underlying error, as well as has a specific failure reason code.
type ErrTxGroupError struct {
err error
Reason TxGroupErrorReason
}

// Error returns an error message from the underlying error
func (e *ErrTxGroupError) Error() string {
return e.err.Error()
}

// Unwrap returns an underlying error
func (e *ErrTxGroupError) Unwrap() error {
return e.err
}

// PrepareGroupContext prepares a verification group parameter object for a given transaction
// group.
func PrepareGroupContext(group []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) {
Expand Down Expand Up @@ -101,14 +147,14 @@ func (g *GroupContext) Equal(other *GroupContext) bool {

// txnBatchPrep verifies a SignedTxn having no obviously inconsistent data.
// Block-assembly time checks of LogicSig and accounting rules may still block the txn.
// it is the caller responsibility to call batchVerifier.Verify()
func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, verifier *crypto.BatchVerifier) error {
// It is the caller responsibility to call batchVerifier.Verify().
func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, verifier *crypto.BatchVerifier) *ErrTxGroupError {
if !groupCtx.consensusParams.SupportRekeying && (s.AuthAddr != basics.Address{}) {
return errors.New("nonempty AuthAddr but rekeying is not supported")
return &ErrTxGroupError{err: errRekeyingNotSupported, Reason: TxGroupErrorReasonGeneric}
}

if err := s.Txn.WellFormed(groupCtx.specAddrs, groupCtx.consensusParams); err != nil {
return err
return &ErrTxGroupError{err: err, Reason: TxGroupErrorReasonNotWellFormed}
}

return stxnCoreChecks(s, txnIdx, groupCtx, verifier)
Expand All @@ -135,19 +181,20 @@ func TxnGroup(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader,

// txnGroupBatchPrep verifies a []SignedTxn having no obviously inconsistent data.
// it is the caller responsibility to call batchVerifier.Verify()
func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature, verifier *crypto.BatchVerifier) (groupCtx *GroupContext, err error) {
groupCtx, err = PrepareGroupContext(stxs, contextHdr, ledger)
func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature, verifier *crypto.BatchVerifier) (*GroupContext, error) {
groupCtx, err := PrepareGroupContext(stxs, contextHdr, ledger)
if err != nil {
return nil, err
}

minFeeCount := uint64(0)
feesPaid := uint64(0)
for i, stxn := range stxs {
err = txnBatchPrep(&stxn, i, groupCtx, verifier)
if err != nil {
err = fmt.Errorf("transaction %+v invalid : %w", stxn, err)
return
prepErr := txnBatchPrep(&stxn, i, groupCtx, verifier)
if prepErr != nil {
// re-wrap the error with more details
prepErr.err = fmt.Errorf("transaction %+v invalid : %w", stxn, prepErr.err)
return nil, prepErr
}
if stxn.Txn.Type != protocol.StateProofTx {
minFeeCount++
Expand All @@ -156,22 +203,27 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo
}
feeNeeded, overflow := basics.OMul(groupCtx.consensusParams.MinTxnFee, minFeeCount)
if overflow {
err = fmt.Errorf("txgroup fee requirement overflow")
return
err = &ErrTxGroupError{err: errTxGroupInvalidFee, Reason: TxGroupErrorReasonInvalidFee}
return nil, err
}
// feesPaid may have saturated. That's ok. Since we know
// feeNeeded did not overflow, simple comparison tells us
// feesPaid was enough.
if feesPaid < feeNeeded {
err = fmt.Errorf("txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee)
return
err = &ErrTxGroupError{
err: fmt.Errorf(
"txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee),
Reason: TxGroupErrorReasonInvalidFee,
}
return nil, err
}

return
return groupCtx, nil
}

func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) error {
// stxnCoreChecks runs signatures validity checks and enqueues signature into batchVerifier for verification.
func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) *ErrTxGroupError {
numSigs := 0
hasSig := false
hasMsig := false
Expand All @@ -196,11 +248,10 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
if s.Txn.Sender == transactions.StateProofSender && s.Txn.Type == protocol.StateProofTx {
return nil
}

return errors.New("signedtxn has no sig")
return &ErrTxGroupError{err: errTxnSigHasNoSig, Reason: TxGroupErrorReasonHasNoSig}
}
if numSigs > 1 {
return errors.New("signedtxn should only have one of Sig or Msig or LogicSig")
return &ErrTxGroupError{err: errTxnSigNotWellFormed, Reason: TxGroupErrorReasonSigNotWellFormed}
}

if hasSig {
Expand All @@ -209,14 +260,17 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
}
if hasMsig {
if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil {
return fmt.Errorf("multisig validation failed: %w", err)
return &ErrTxGroupError{err: fmt.Errorf("multisig validation failed: %w", err), Reason: TxGroupErrorReasonMsigNotWellFormed}
}
return nil
}
if hasLogicSig {
return logicSigVerify(s, txnIdx, groupCtx)
if err := logicSigVerify(s, txnIdx, groupCtx); err != nil {
return &ErrTxGroupError{err: err, Reason: TxGroupErrorReasonLogicSigFailed}
}
return nil
}
return errors.New("has one mystery sig. WAT?")
return &ErrTxGroupError{err: errUnknownSignature, Reason: TxGroupErrorReasonGeneric}
}

// LogicSigSanityCheck checks that the signature is valid and that the program is basically well formed.
Expand Down Expand Up @@ -254,7 +308,7 @@ func logicSigSanityCheckBatchPrep(txn *transactions.SignedTxn, groupIndex int, g
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
txngroup := transactions.WrapSignedTxnsWithAD(groupCtx.signedGroupTxns)
ep := logic.EvalParams{
Expand Down Expand Up @@ -310,7 +364,7 @@ func logicSigVerify(txn *transactions.SignedTxn, groupIndex int, groupCtx *Group
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
ep := logic.EvalParams{
Proto: &groupCtx.consensusParams,
Expand Down
80 changes: 73 additions & 7 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package data
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand All @@ -45,6 +47,19 @@ var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnByt
var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled)
var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
var transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted)
var transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee)
var transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed)
var transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed)
var transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed)
var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig)
var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed)
var transactionMessagesBacklogErr = metrics.MakeCounter(metrics.TransactionMessagesBacklogErr)
var transactionMessagesRemember = metrics.MakeCounter(metrics.TransactionMessagesRemember)
var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize)

var transactionGroupTxSyncRemember = metrics.MakeCounter(metrics.TransactionGroupTxSyncRemember)
var transactionGroupTxSyncAlreadyCommitted = metrics.MakeCounter(metrics.TransactionGroupTxSyncAlreadyCommitted)

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
type txBacklogMsg struct {
Expand Down Expand Up @@ -101,8 +116,9 @@ func (handler *TxHandler) Start() {
handler.net.RegisterHandlers([]network.TaggedMessageHandler{
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
})
handler.backlogWg.Add(1)
handler.backlogWg.Add(2)
go handler.backlogWorker()
go handler.backlogGaugeThread()
}

// Stop suspends the processing of incoming messages at the transaction handler
Expand All @@ -113,12 +129,26 @@ func (handler *TxHandler) Stop() {

func reencode(stxns []transactions.SignedTxn) []byte {
var result [][]byte
for _, stxn := range stxns {
result = append(result, protocol.Encode(&stxn))
for i := range stxns {
result = append(result, protocol.Encode(&stxns[i]))
}
return bytes.Join(result, nil)
}

func (handler *TxHandler) backlogGaugeThread() {
defer handler.backlogWg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
transactionMessagesBacklogSizeGauge.Set(float64(len(handler.backlogQueue)))
case <-handler.ctx.Done():
return
}
}
}

// backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels
// and dispatches them further.
func (handler *TxHandler) backlogWorker() {
Expand All @@ -132,7 +162,7 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

// restart the loop so that we could empty out the post verification queue.
continue
Expand All @@ -146,6 +176,7 @@ func (handler *TxHandler) backlogWorker() {
return
}
if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
continue
}

Expand All @@ -156,17 +187,47 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

case <-handler.ctx.Done():
return
}
}
}

func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) {
func (handler *TxHandler) postProcessReportErrors(err error) {
if errors.Is(err, crypto.ErrBatchVerificationFailed) {
transactionMessagesTxnSigVerificationFailed.Inc(nil)
return
}

var txGroupErr *verify.ErrTxGroupError
if errors.As(err, &txGroupErr) {
switch txGroupErr.Reason {
case verify.TxGroupErrorReasonNotWellFormed:
transactionMessagesTxnNotWellFormed.Inc(nil)
case verify.TxGroupErrorReasonInvalidFee:
transactionMessagesTxGroupInvalidFee.Inc(nil)
case verify.TxGroupErrorReasonHasNoSig:
fallthrough
case verify.TxGroupErrorReasonSigNotWellFormed:
transactionMessagesTxnSigNotWellFormed.Inc(nil)
case verify.TxGroupErrorReasonMsigNotWellFormed:
transactionMessagesTxnMsigNotWellFormed.Inc(nil)
case verify.TxGroupErrorReasonLogicSigFailed:
transactionMessagesTxnLogicSig.Inc(nil)
default:
transactionMessagesBacklogErr.Inc(nil)
}
} else {
transactionMessagesBacklogErr.Inc(nil)
}
}

func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) {
if wi.verificationErr != nil {
// disconnect from peer.
handler.postProcessReportErrors(wi.verificationErr)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr)
handler.net.Disconnect(wi.rawmsg.Sender)
return
Expand All @@ -185,6 +246,8 @@ func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) {
return
}

transactionMessagesRemember.Inc(nil)

// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
if err != nil {
Expand All @@ -203,7 +266,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} {
latest := handler.ledger.Latest()
latestHdr, err := handler.ledger.BlockHdr(latest)
if err != nil {
tx.verificationErr = fmt.Errorf("Could not get header for previous block %d: %w", latest, err)
tx.verificationErr = fmt.Errorf("could not get header for previous block %d: %w", latest, err)
logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err)
} else {
// we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock.
Expand Down Expand Up @@ -289,6 +352,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed
unverifiedTxGroup: unverifiedTxGroup,
}
if handler.checkAlreadyCommitted(tx) {
transactionGroupTxSyncAlreadyCommitted.Inc(nil)
return network.OutgoingMessage{}, true
}

Expand Down Expand Up @@ -319,6 +383,8 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed
return network.OutgoingMessage{}, true
}

transactionGroupTxSyncRemember.Inc(nil)

// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
if err != nil {
Expand Down
Loading

0 comments on commit d22fa42

Please sign in to comment.