Skip to content

Commit

Permalink
txhandler: add more metric
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Nov 14, 2022
1 parent 445b45b commit f79a445
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 28 deletions.
74 changes: 60 additions & 14 deletions data/transactions/verify/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,46 @@ type GroupContext struct {
ledger logic.LedgerForSignature
}

type ErrTxGroupInvalidFee struct {
err error
}

func (e *ErrTxGroupInvalidFee) Error() string {
return e.err.Error()
}

type ErrTxnBadFormed struct {
err error
}

func (e *ErrTxnBadFormed) Error() string {
return e.err.Error()
}

type ErrTxnSigBadFormed struct {
err error
}

func (e *ErrTxnSigBadFormed) Error() string {
return e.err.Error()
}

type ErrTxnMsigBadFormed struct {
err error
}

func (e *ErrTxnMsigBadFormed) Error() string {
return e.err.Error()
}

type ErrTxnLogicSig struct {
err error
}

func (e *ErrTxnLogicSig) Error() string {
return e.err.Error()
}

// PrepareGroupContext prepares a verification group parameter object for a given transaction
// group.
func PrepareGroupContext(group []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) {
Expand Down Expand Up @@ -108,7 +148,7 @@ func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext,
}

if err := s.Txn.WellFormed(groupCtx.specAddrs, groupCtx.consensusParams); err != nil {
return err
return &ErrTxnBadFormed{err: err}
}

return stxnCoreChecks(s, txnIdx, groupCtx, verifier)
Expand Down Expand Up @@ -147,7 +187,7 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo
err = txnBatchPrep(&stxn, i, groupCtx, verifier)
if err != nil {
err = fmt.Errorf("transaction %+v invalid : %w", stxn, err)
return
return nil, err
}
if stxn.Txn.Type != protocol.StateProofTx {
minFeeCount++
Expand All @@ -156,19 +196,22 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo
}
feeNeeded, overflow := basics.OMul(groupCtx.consensusParams.MinTxnFee, minFeeCount)
if overflow {
err = fmt.Errorf("txgroup fee requirement overflow")
return
err = &ErrTxGroupInvalidFee{err: fmt.Errorf("txgroup fee requirement overflow")}
return nil, err
}
// feesPaid may have saturated. That's ok. Since we know
// feeNeeded did not overflow, simple comparison tells us
// feesPaid was enough.
if feesPaid < feeNeeded {
err = fmt.Errorf("txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee)
return
err = &ErrTxGroupInvalidFee{
err: fmt.Errorf(
"txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee),
}
return nil, err
}

return
return groupCtx, nil
}

func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) error {
Expand Down Expand Up @@ -197,10 +240,10 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
return nil
}

return errors.New("signedtxn has no sig")
return &ErrTxnSigBadFormed{err: errors.New("signedtxn has no sig")}
}
if numSigs > 1 {
return errors.New("signedtxn should only have one of Sig or Msig or LogicSig")
return &ErrTxnSigBadFormed{err: errors.New("signedtxn should only have one of Sig or Msig or LogicSig")}
}

if hasSig {
Expand All @@ -209,12 +252,15 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
}
if hasMsig {
if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil {
return fmt.Errorf("multisig validation failed: %w", err)
return &ErrTxnMsigBadFormed{err: fmt.Errorf("multisig validation failed: %w", err)}
}
return nil
}
if hasLogicSig {
return logicSigVerify(s, txnIdx, groupCtx)
if err := logicSigVerify(s, txnIdx, groupCtx); err != nil {
return &ErrTxnLogicSig{err: err}
}
return nil
}
return errors.New("has one mystery sig. WAT?")
}
Expand Down Expand Up @@ -254,7 +300,7 @@ func logicSigSanityCheckBatchPrep(txn *transactions.SignedTxn, groupIndex int, g
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
txngroup := transactions.WrapSignedTxnsWithAD(groupCtx.signedGroupTxns)
ep := logic.EvalParams{
Expand Down Expand Up @@ -310,7 +356,7 @@ func logicSigVerify(txn *transactions.SignedTxn, groupIndex int, groupCtx *Group
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
ep := logic.EvalParams{
Proto: &groupCtx.consensusParams,
Expand Down
55 changes: 51 additions & 4 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package data
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
Expand All @@ -45,6 +46,13 @@ var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnByt
var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled)
var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
var transactionMessagesDupPreBacklog = metrics.MakeCounter(metrics.TransactionMessagesDupPreBacklog)
var transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee)
var transactionMessagesTxnBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnBadFormed)
var transactionMessagesTxnSigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigBadFormed)
var transactionMessagesTxnMsigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigBadFormed)
var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig)
var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed)

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
type txBacklogMsg struct {
Expand Down Expand Up @@ -132,7 +140,7 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

// restart the loop so that we could empty out the post verification queue.
continue
Expand All @@ -146,6 +154,7 @@ func (handler *TxHandler) backlogWorker() {
return
}
if handler.checkAlreadyCommitted(wi) {
transactionMessagesDupPreBacklog.Inc(nil)
continue
}

Expand All @@ -156,17 +165,55 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

case <-handler.ctx.Done():
return
}
}
}

func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) {
func (handler *TxHandler) postProcessReportErrors(err error) {
var feeError *verify.ErrTxGroupInvalidFee
if errors.As(err, &feeError) {
transactionMessagesTxGroupInvalidFee.Inc(nil)
return
}

var badFormed *verify.ErrTxnBadFormed
if errors.As(err, &badFormed) {
transactionMessagesTxnBadFormed.Inc(nil)
return
}

var sigBadFormed *verify.ErrTxnSigBadFormed
if errors.As(err, &sigBadFormed) {
transactionMessagesTxnSigBadFormed.Inc(nil)
return
}

var msigBadFormed *verify.ErrTxnMsigBadFormed
if errors.As(err, &msigBadFormed) {
transactionMessagesTxnMsigBadFormed.Inc(nil)
return
}

var logicSig *verify.ErrTxnLogicSig
if errors.As(err, &logicSig) {
transactionMessagesTxnLogicSig.Inc(nil)
return
}

if errors.Is(err, crypto.ErrBatchVerificationFailed) {
transactionMessagesTxnSigVerificationFailed.Inc(nil)
return
}
}

func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) {
if wi.verificationErr != nil {
// disconnect from peer.
handler.postProcessReportErrors(wi.verificationErr)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr)
handler.net.Disconnect(wi.rawmsg.Sender)
return
Expand Down Expand Up @@ -203,7 +250,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} {
latest := handler.ledger.Latest()
latestHdr, err := handler.ledger.BlockHdr(latest)
if err != nil {
tx.verificationErr = fmt.Errorf("Could not get header for previous block %d: %w", latest, err)
tx.verificationErr = fmt.Errorf("could not get header for previous block %d: %w", latest, err)
logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err)
} else {
// we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock.
Expand Down
23 changes: 16 additions & 7 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,11 +1440,13 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) {

// preparePeerData prepares batches of data for sending.
// It performs optional zstd compression for proposal massages
func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest) {
func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, map[protocol.Tag]struct{}) {
// determine if there is a payload proposal and peers supporting compressed payloads
wantCompression := false
var messageTags map[protocol.Tag]struct{}
if prio {
wantCompression = checkCanCompress(request, peers)
messageTags = make(map[protocol.Tag]struct{}, 1)
}

digests := make([]crypto.Digest, len(request.data))
Expand All @@ -1463,8 +1465,11 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool,
digests[i] = crypto.Hash(mbytes)
}

if prio && request.tags[i] == protocol.ProposalPayloadTag {
networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
if prio {
if request.tags[i] == protocol.ProposalPayloadTag {
networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
}
messageTags[request.tags[i]] = struct{}{}
}

if wantCompression {
Expand All @@ -1482,7 +1487,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool,
}
}
}
return data, dataCompressed, digests
return data, dataCompressed, digests, messageTags
}

// prio is set if the broadcast is a high-priority broadcast.
Expand All @@ -1499,7 +1504,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool,
}

start := time.Now()
data, dataWithCompression, digests := wn.preparePeerData(request, prio, peers)
data, dataWithCompression, digests, seenPrioTags := wn.preparePeerData(request, prio, peers)

// first send to all the easy outbound peers who don't block, get them started.
sentMessageCount := 0
Expand All @@ -1515,12 +1520,16 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool,
// if this peer supports compressed proposals and compressed data batch is filled out, use it
ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime)
if prio {
networkPrioBatchesPPWithCompression.Inc(nil)
if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok {
networkPrioBatchesPPWithCompression.Inc(nil)
}
}
} else {
ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if prio {
networkPrioBatchesPPWithoutCompression.Inc(nil)
if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok {
networkPrioBatchesPPWithoutCompression.Inc(nil)
}
}
}
if ok {
Expand Down
10 changes: 7 additions & 3 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2690,20 +2690,21 @@ func TestParseHostOrURL(t *testing.T) {
func TestPreparePeerData(t *testing.T) {
partitiontest.PartitionTest(t)

// no comression
// no compression
req := broadcastRequest{
tags: []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag},
data: [][]byte{[]byte("test"), []byte("data")},
}

peers := []*wsPeer{}
wn := WebsocketNetwork{}
data, comp, digests := wn.preparePeerData(req, false, peers)
data, comp, digests, seenPrioTags := wn.preparePeerData(req, false, peers)
require.NotEmpty(t, data)
require.Empty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.Empty(t, seenPrioTags)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
Expand All @@ -2717,13 +2718,16 @@ func TestPreparePeerData(t *testing.T) {
features: pfCompressedProposal,
}
peers = []*wsPeer{&peer1, &peer2}
data, comp, digests = wn.preparePeerData(req, true, peers)
data, comp, digests, seenPrioTags = wn.preparePeerData(req, true, peers)
require.NotEmpty(t, data)
require.NotEmpty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.Equal(t, len(comp), len(digests))
require.NotEmpty(t, seenPrioTags)
require.Len(t, seenPrioTags, 1)
require.Contains(t, seenPrioTags, protocol.ProposalPayloadTag)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
Expand Down
14 changes: 14 additions & 0 deletions util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,18 @@ var (
TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"}
// TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool"
TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"}
// TransactionMessagesDupPreBacklog "Number of duplicate transaction messages before placing into a backlog"
TransactionMessagesDupPreBacklog = MetricName{Name: "algod_transaction_messages_dup_prebacklog", Description: "Number of duplicate transaction messages before placing into a backlog"}
// TransactionMessagesTxGroupInvalidFee "Number of transaction messages with invalid txgroup fee"
TransactionMessagesTxGroupInvalidFee = MetricName{Name: "algod_transaction_messages_txgroup_invalid_fee", Description: "Number of transaction messages with invalid txgroup fee"}
// TransactionMessagesTxnBadFormed "Number of transaction messages not well formed"
TransactionMessagesTxnBadFormed = MetricName{Name: "algod_transaction_messages_txn_bad_formed", Description: "Number of transaction messages not well formed"}
// TransactionMessagesTxnSigBadFormed "Number of transaction messages with bad formed signature"
TransactionMessagesTxnSigBadFormed = MetricName{Name: "algod_transaction_messages_sig_bad_formed", Description: "Number of transaction messages with bad formed signature"}
// TransactionMessagesTxnMsigBadFormed "Number of transaction messages with bad formed multisig"
TransactionMessagesTxnMsigBadFormed = MetricName{Name: "algod_transaction_messages_msig_bad_formed", Description: "Number of transaction messages with bad formed multisig"}
// TransactionMessagesTxnLogicSig "Number of transaction messages with invalid logic sig"
TransactionMessagesTxnLogicSig = MetricName{Name: "algod_transaction_messages_logic_sig_failed", Description: "Number of transaction messages with invalid logic sig"}
// TransactionMessagesTxnSigVerificationFailed "Number of transaction messages with signature verification failed"
TransactionMessagesTxnSigVerificationFailed = MetricName{Name: "algod_transaction_messages_sig_verify_failed", Description: "Number of transaction messages with signature verification failed"}
)

0 comments on commit f79a445

Please sign in to comment.