Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txHandler: add more metric #4786

Merged
merged 14 commits into from
Nov 17, 2022
105 changes: 82 additions & 23 deletions data/transactions/verify/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,54 @@ type GroupContext struct {
ledger logic.LedgerForSignature
}

var errTxGroupInvalidFee = errors.New("txgroup fee requirement overflow")
var errTxnSigHasNoSig = errors.New("signedtxn has no sig")
var errTxnSigNotWellFormed = errors.New("signedtxn should only have one of Sig or Msig or LogicSig")
var errRekeyingNotSupported = errors.New("nonempty AuthAddr but rekeying is not supported")
var errUnknownSignature = errors.New("has one mystery sig. WAT?")

// TxGroupErrorReason is reason code for ErrTxGroupError
type TxGroupErrorReason int

const (
// TxGroupErrorReasonGeneric is a generic (not tracked) reason code
TxGroupErrorReasonGeneric TxGroupErrorReason = iota
// TxGroupErrorReasonNotWellFormed is txn.WellFormed failure
TxGroupErrorReasonNotWellFormed
// TxGroupErrorReasonInvalidFee is invalid fee pooling in transaction group
TxGroupErrorReasonInvalidFee
// TxGroupErrorReasonHasNoSig is for transaction without any signature
TxGroupErrorReasonHasNoSig
// TxGroupErrorReasonSigNotWellFormed defines signature format errors
TxGroupErrorReasonSigNotWellFormed
// TxGroupErrorReasonMsigNotWellFormed defines multisig format errors
TxGroupErrorReasonMsigNotWellFormed
// TxGroupErrorReasonLogicSigFailed defines logic sig validation errors
TxGroupErrorReasonLogicSigFailed
)

// ErrTxGroupError is an error from txn pre-validation (well form-ness, signature format, etc).
// It can be unwrapped into underlying error, as well as has a specific failure reason code.
type ErrTxGroupError struct {
err error
reason TxGroupErrorReason
}

// Error returns an error message from the underlying error
func (e *ErrTxGroupError) Error() string {
return e.err.Error()
}

// Unwrap returns an underlying error
func (e *ErrTxGroupError) Unwrap() error {
return e.err
}

// Reason returns a reason code
func (e *ErrTxGroupError) Reason() TxGroupErrorReason {
return e.reason
}

// PrepareGroupContext prepares a verification group parameter object for a given transaction
// group.
func PrepareGroupContext(group []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) {
Expand Down Expand Up @@ -101,14 +149,14 @@ func (g *GroupContext) Equal(other *GroupContext) bool {

// txnBatchPrep verifies a SignedTxn having no obviously inconsistent data.
// Block-assembly time checks of LogicSig and accounting rules may still block the txn.
// it is the caller responsibility to call batchVerifier.Verify()
func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, verifier *crypto.BatchVerifier) error {
// It is the caller responsibility to call batchVerifier.Verify().
func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, verifier *crypto.BatchVerifier) *ErrTxGroupError {
if !groupCtx.consensusParams.SupportRekeying && (s.AuthAddr != basics.Address{}) {
return errors.New("nonempty AuthAddr but rekeying is not supported")
return &ErrTxGroupError{err: errRekeyingNotSupported, reason: TxGroupErrorReasonGeneric}
}

if err := s.Txn.WellFormed(groupCtx.specAddrs, groupCtx.consensusParams); err != nil {
return err
return &ErrTxGroupError{err: err, reason: TxGroupErrorReasonNotWellFormed}
}

return stxnCoreChecks(s, txnIdx, groupCtx, verifier)
Expand Down Expand Up @@ -144,10 +192,14 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo
minFeeCount := uint64(0)
feesPaid := uint64(0)
for i, stxn := range stxs {
err = txnBatchPrep(&stxn, i, groupCtx, verifier)
if err != nil {
err = fmt.Errorf("transaction %+v invalid : %w", stxn, err)
return
prepErr := txnBatchPrep(&stxn, i, groupCtx, verifier)
if prepErr != nil {
// re-wrap the error, take underlying one and copy the reason code
err = &ErrTxGroupError{
err: fmt.Errorf("transaction %+v invalid : %w", stxn, prepErr.err),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think txnBatchPrep is already returning ErrTxGroupError, maybe you would just want to keep the error it created (not make a new one), which preserves .Reason, and set .err on it to have this wrapped fmt?

reason: prepErr.reason,
}
return nil, err
}
if stxn.Txn.Type != protocol.StateProofTx {
minFeeCount++
Expand All @@ -156,22 +208,27 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo
}
feeNeeded, overflow := basics.OMul(groupCtx.consensusParams.MinTxnFee, minFeeCount)
if overflow {
err = fmt.Errorf("txgroup fee requirement overflow")
return
err = &ErrTxGroupError{err: errTxGroupInvalidFee, reason: TxGroupErrorReasonInvalidFee}
return nil, err
}
// feesPaid may have saturated. That's ok. Since we know
// feeNeeded did not overflow, simple comparison tells us
// feesPaid was enough.
if feesPaid < feeNeeded {
err = fmt.Errorf("txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee)
return
err = &ErrTxGroupError{
err: fmt.Errorf(
"txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee),
reason: TxGroupErrorReasonInvalidFee,
}
return nil, err
}

return
return groupCtx, nil
}

func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) error {
// stxnCoreChecks runs signatures validity checks and enqueues signature into batchVerifier for verification.
func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) *ErrTxGroupError {
numSigs := 0
hasSig := false
hasMsig := false
Expand All @@ -196,11 +253,10 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
if s.Txn.Sender == transactions.StateProofSender && s.Txn.Type == protocol.StateProofTx {
return nil
}

return errors.New("signedtxn has no sig")
return &ErrTxGroupError{err: errTxnSigHasNoSig, reason: TxGroupErrorReasonHasNoSig}
}
if numSigs > 1 {
return errors.New("signedtxn should only have one of Sig or Msig or LogicSig")
return &ErrTxGroupError{err: errTxnSigNotWellFormed, reason: TxGroupErrorReasonSigNotWellFormed}
}

if hasSig {
Expand All @@ -209,14 +265,17 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
}
if hasMsig {
if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil {
return fmt.Errorf("multisig validation failed: %w", err)
return &ErrTxGroupError{err: fmt.Errorf("multisig validation failed: %w", err), reason: TxGroupErrorReasonMsigNotWellFormed}
}
return nil
}
if hasLogicSig {
return logicSigVerify(s, txnIdx, groupCtx)
if err := logicSigVerify(s, txnIdx, groupCtx); err != nil {
return &ErrTxGroupError{err: err, reason: TxGroupErrorReasonLogicSigFailed}
}
return nil
}
return errors.New("has one mystery sig. WAT?")
return &ErrTxGroupError{err: errUnknownSignature, reason: TxGroupErrorReasonGeneric}
}

// LogicSigSanityCheck checks that the signature is valid and that the program is basically well formed.
Expand Down Expand Up @@ -254,7 +313,7 @@ func logicSigSanityCheckBatchPrep(txn *transactions.SignedTxn, groupIndex int, g
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
txngroup := transactions.WrapSignedTxnsWithAD(groupCtx.signedGroupTxns)
ep := logic.EvalParams{
Expand Down Expand Up @@ -310,7 +369,7 @@ func logicSigVerify(txn *transactions.SignedTxn, groupIndex int, groupCtx *Group
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
ep := logic.EvalParams{
Proto: &groupCtx.consensusParams,
Expand Down
64 changes: 59 additions & 5 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package data
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand All @@ -45,6 +47,14 @@ var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnByt
var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled)
var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
var transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted)
var transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee)
var transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed)
var transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed)
var transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed)
var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig)
var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed)
var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize)

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
type txBacklogMsg struct {
Expand Down Expand Up @@ -101,8 +111,9 @@ func (handler *TxHandler) Start() {
handler.net.RegisterHandlers([]network.TaggedMessageHandler{
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
})
handler.backlogWg.Add(1)
handler.backlogWg.Add(2)
go handler.backlogWorker()
go handler.backlogGaugeThread()
}

// Stop suspends the processing of incoming messages at the transaction handler
Expand All @@ -119,6 +130,20 @@ func reencode(stxns []transactions.SignedTxn) []byte {
return bytes.Join(result, nil)
}

func (handler *TxHandler) backlogGaugeThread() {
defer handler.backlogWg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
transactionMessagesBacklogSizeGauge.Set(float64(len(handler.backlogQueue)))
case <-handler.ctx.Done():
return
}
}
}

// backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels
// and dispatches them further.
func (handler *TxHandler) backlogWorker() {
Expand All @@ -132,7 +157,7 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

// restart the loop so that we could empty out the post verification queue.
continue
Expand All @@ -146,6 +171,7 @@ func (handler *TxHandler) backlogWorker() {
return
}
if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
continue
}

Expand All @@ -156,17 +182,45 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

case <-handler.ctx.Done():
return
}
}
}

func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) {
func (handler *TxHandler) postProcessReportErrors(err error) {
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
if errors.Is(err, crypto.ErrBatchVerificationFailed) {
transactionMessagesTxnSigVerificationFailed.Inc(nil)
return
}

var txGroupErr *verify.ErrTxGroupError
if errors.As(err, &txGroupErr) {
txGroupErr = err.(*verify.ErrTxGroupError)
switch txGroupErr.Reason() {
case verify.TxGroupErrorReasonNotWellFormed:
transactionMessagesTxnNotWellFormed.Inc(nil)
case verify.TxGroupErrorReasonInvalidFee:
transactionMessagesTxGroupInvalidFee.Inc(nil)
case verify.TxGroupErrorReasonHasNoSig:
fallthrough
case verify.TxGroupErrorReasonSigNotWellFormed:
transactionMessagesTxnSigNotWellFormed.Inc(nil)
case verify.TxGroupErrorReasonMsigNotWellFormed:
transactionMessagesTxnMsigNotWellFormed.Inc(nil)
case verify.TxGroupErrorReasonLogicSigFailed:
transactionMessagesTxnLogicSig.Inc(nil)
default:
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) {
if wi.verificationErr != nil {
// disconnect from peer.
handler.postProcessReportErrors(wi.verificationErr)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr)
handler.net.Disconnect(wi.rawmsg.Sender)
return
Expand Down Expand Up @@ -203,7 +257,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} {
latest := handler.ledger.Latest()
latestHdr, err := handler.ledger.BlockHdr(latest)
if err != nil {
tx.verificationErr = fmt.Errorf("Could not get header for previous block %d: %w", latest, err)
tx.verificationErr = fmt.Errorf("could not get header for previous block %d: %w", latest, err)
logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err)
} else {
// we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock.
Expand Down
8 changes: 8 additions & 0 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package data

import (
"encoding/binary"
"errors"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -568,3 +569,10 @@ func runHandlerBenchmark(maxGroupSize int, b *testing.B) {
close(handler.backlogQueue)
wg.Wait()
}

func BenchmarkPostProcessError(b *testing.B) {
var txh TxHandler

err := errors.New("couldn't find latest resources")
txh.postProcessReportErrors(err)
}
23 changes: 16 additions & 7 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,11 +1440,13 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) {

// preparePeerData prepares batches of data for sending.
// It performs optional zstd compression for proposal massages
func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest) {
func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, map[protocol.Tag]struct{}) {
// determine if there is a payload proposal and peers supporting compressed payloads
wantCompression := false
var messageTags map[protocol.Tag]struct{}
if prio {
wantCompression = checkCanCompress(request, peers)
messageTags = make(map[protocol.Tag]struct{}, 1)
}

digests := make([]crypto.Digest, len(request.data))
Expand All @@ -1463,8 +1465,11 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool,
digests[i] = crypto.Hash(mbytes)
}

if prio && request.tags[i] == protocol.ProposalPayloadTag {
networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
if prio {
if request.tags[i] == protocol.ProposalPayloadTag {
Comment on lines +1467 to +1468
Copy link
Contributor

@algonautshant algonautshant Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to make two levels of if?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we want containsPrioPPTag

networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
}
messageTags[request.tags[i]] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, what is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for correcting the PP metric. without it it counts all prio messages, not only PP

}

if wantCompression {
Expand All @@ -1482,7 +1487,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool,
}
}
}
return data, dataCompressed, digests
return data, dataCompressed, digests, messageTags
}

// prio is set if the broadcast is a high-priority broadcast.
Expand All @@ -1499,7 +1504,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool,
}

start := time.Now()
data, dataWithCompression, digests := wn.preparePeerData(request, prio, peers)
data, dataWithCompression, digests, seenPrioTags := wn.preparePeerData(request, prio, peers)

// first send to all the easy outbound peers who don't block, get them started.
sentMessageCount := 0
Expand All @@ -1515,12 +1520,16 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool,
// if this peer supports compressed proposals and compressed data batch is filled out, use it
ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime)
if prio {
networkPrioBatchesPPWithCompression.Inc(nil)
if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok {
algonautshant marked this conversation as resolved.
Show resolved Hide resolved
networkPrioBatchesPPWithCompression.Inc(nil)
}
}
} else {
ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if prio {
networkPrioBatchesPPWithoutCompression.Inc(nil)
if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok {
networkPrioBatchesPPWithoutCompression.Inc(nil)
}
}
}
if ok {
Expand Down
Loading