Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txHandler: add more metric #4786

Merged
merged 14 commits into from
Nov 17, 2022
74 changes: 60 additions & 14 deletions data/transactions/verify/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,46 @@ type GroupContext struct {
ledger logic.LedgerForSignature
}

type ErrTxGroupInvalidFee struct {
err error
}

func (e *ErrTxGroupInvalidFee) Error() string {
return e.err.Error()
}

type ErrTxnBadFormed struct {
err error
}

func (e *ErrTxnBadFormed) Error() string {
return e.err.Error()
}

type ErrTxnSigBadFormed struct {
err error
}

func (e *ErrTxnSigBadFormed) Error() string {
return e.err.Error()
}

type ErrTxnMsigBadFormed struct {
err error
}

func (e *ErrTxnMsigBadFormed) Error() string {
return e.err.Error()
}

type ErrTxnLogicSig struct {
err error
}

func (e *ErrTxnLogicSig) Error() string {
return e.err.Error()
}

// PrepareGroupContext prepares a verification group parameter object for a given transaction
// group.
func PrepareGroupContext(group []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) {
Expand Down Expand Up @@ -108,7 +148,7 @@ func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext,
}

if err := s.Txn.WellFormed(groupCtx.specAddrs, groupCtx.consensusParams); err != nil {
return err
return &ErrTxnBadFormed{err: err}
}

return stxnCoreChecks(s, txnIdx, groupCtx, verifier)
Expand Down Expand Up @@ -147,7 +187,7 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo
err = txnBatchPrep(&stxn, i, groupCtx, verifier)
if err != nil {
err = fmt.Errorf("transaction %+v invalid : %w", stxn, err)
return
return nil, err
}
if stxn.Txn.Type != protocol.StateProofTx {
minFeeCount++
Expand All @@ -156,19 +196,22 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo
}
feeNeeded, overflow := basics.OMul(groupCtx.consensusParams.MinTxnFee, minFeeCount)
if overflow {
err = fmt.Errorf("txgroup fee requirement overflow")
return
err = &ErrTxGroupInvalidFee{err: fmt.Errorf("txgroup fee requirement overflow")}
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
// feesPaid may have saturated. That's ok. Since we know
// feeNeeded did not overflow, simple comparison tells us
// feesPaid was enough.
if feesPaid < feeNeeded {
err = fmt.Errorf("txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee)
return
err = &ErrTxGroupInvalidFee{
err: fmt.Errorf(
"txgroup had %d in fees, which is less than the minimum %d * %d",
feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee),
}
return nil, err
}

return
return groupCtx, nil
}

func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) error {
Expand Down Expand Up @@ -197,10 +240,10 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
return nil
}

return errors.New("signedtxn has no sig")
return &ErrTxnSigBadFormed{err: errors.New("signedtxn has no sig")}
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}
if numSigs > 1 {
return errors.New("signedtxn should only have one of Sig or Msig or LogicSig")
return &ErrTxnSigBadFormed{err: errors.New("signedtxn should only have one of Sig or Msig or LogicSig")}
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}

if hasSig {
Expand All @@ -209,12 +252,15 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex
}
if hasMsig {
if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil {
return fmt.Errorf("multisig validation failed: %w", err)
return &ErrTxnMsigBadFormed{err: fmt.Errorf("multisig validation failed: %w", err)}
}
return nil
}
if hasLogicSig {
return logicSigVerify(s, txnIdx, groupCtx)
if err := logicSigVerify(s, txnIdx, groupCtx); err != nil {
return &ErrTxnLogicSig{err: err}
}
return nil
}
return errors.New("has one mystery sig. WAT?")
}
Expand Down Expand Up @@ -254,7 +300,7 @@ func logicSigSanityCheckBatchPrep(txn *transactions.SignedTxn, groupIndex int, g
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
txngroup := transactions.WrapSignedTxnsWithAD(groupCtx.signedGroupTxns)
ep := logic.EvalParams{
Expand Down Expand Up @@ -310,7 +356,7 @@ func logicSigVerify(txn *transactions.SignedTxn, groupIndex int, groupCtx *Group
}

if groupIndex < 0 {
return errors.New("Negative groupIndex")
return errors.New("negative groupIndex")
}
ep := logic.EvalParams{
Proto: &groupCtx.consensusParams,
Expand Down
55 changes: 51 additions & 4 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package data
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
Expand All @@ -45,6 +46,13 @@ var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnByt
var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled)
var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)
var transactionMessagesDupPreBacklog = metrics.MakeCounter(metrics.TransactionMessagesDupPreBacklog)
var transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee)
var transactionMessagesTxnBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnBadFormed)
var transactionMessagesTxnSigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigBadFormed)
var transactionMessagesTxnMsigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigBadFormed)
var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig)
var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed)

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
type txBacklogMsg struct {
Expand Down Expand Up @@ -132,7 +140,7 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

// restart the loop so that we could empty out the post verification queue.
continue
Expand All @@ -146,6 +154,7 @@ func (handler *TxHandler) backlogWorker() {
return
}
if handler.checkAlreadyCommitted(wi) {
transactionMessagesDupPreBacklog.Inc(nil)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor note: a duplicate may be detected here because the node got the transactions through the pool sync.
Although the txSync transactions also go through the txHandler, they don't get check for duplication through this code path.
I am not saying this is wrong, I just wanted to put out some more information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I was just looking at txSyncer.go and noticed it doesn't have any counters, and neither does the txHandler.processDecoded() call it makes when it adds txns from txSyncer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine, we want to count duplicates from tx handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also want to add some counters for how often txSyncer is actually putting txns in the pool (or even counting their failures too), which seems to be happening more often than we think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added AlreadyCommitted and Remembered counters

continue
}

Expand All @@ -156,17 +165,55 @@ func (handler *TxHandler) backlogWorker() {
if !ok {
return
}
handler.postprocessCheckedTxn(wi)
handler.postProcessCheckedTxn(wi)

case <-handler.ctx.Done():
return
}
}
}

func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) {
func (handler *TxHandler) postProcessReportErrors(err error) {
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
var feeError *verify.ErrTxGroupInvalidFee
if errors.As(err, &feeError) {
transactionMessagesTxGroupInvalidFee.Inc(nil)
return
}

var badFormed *verify.ErrTxnBadFormed
if errors.As(err, &badFormed) {
transactionMessagesTxnBadFormed.Inc(nil)
return
}

var sigBadFormed *verify.ErrTxnSigBadFormed
if errors.As(err, &sigBadFormed) {
transactionMessagesTxnSigBadFormed.Inc(nil)
return
}

var msigBadFormed *verify.ErrTxnMsigBadFormed
if errors.As(err, &msigBadFormed) {
transactionMessagesTxnMsigBadFormed.Inc(nil)
return
}

var logicSig *verify.ErrTxnLogicSig
if errors.As(err, &logicSig) {
transactionMessagesTxnLogicSig.Inc(nil)
return
}

if errors.Is(err, crypto.ErrBatchVerificationFailed) {
transactionMessagesTxnSigVerificationFailed.Inc(nil)
return
}
}

func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) {
if wi.verificationErr != nil {
// disconnect from peer.
handler.postProcessReportErrors(wi.verificationErr)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr)
handler.net.Disconnect(wi.rawmsg.Sender)
return
Expand Down Expand Up @@ -203,7 +250,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} {
latest := handler.ledger.Latest()
latestHdr, err := handler.ledger.BlockHdr(latest)
if err != nil {
tx.verificationErr = fmt.Errorf("Could not get header for previous block %d: %w", latest, err)
tx.verificationErr = fmt.Errorf("could not get header for previous block %d: %w", latest, err)
logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err)
} else {
// we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock.
Expand Down
23 changes: 16 additions & 7 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,11 +1440,13 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) {

// preparePeerData prepares batches of data for sending.
// It performs optional zstd compression for proposal massages
func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest) {
func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, map[protocol.Tag]struct{}) {
// determine if there is a payload proposal and peers supporting compressed payloads
wantCompression := false
var messageTags map[protocol.Tag]struct{}
if prio {
wantCompression = checkCanCompress(request, peers)
messageTags = make(map[protocol.Tag]struct{}, 1)
}

digests := make([]crypto.Digest, len(request.data))
Expand All @@ -1463,8 +1465,11 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool,
digests[i] = crypto.Hash(mbytes)
}

if prio && request.tags[i] == protocol.ProposalPayloadTag {
networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
if prio {
if request.tags[i] == protocol.ProposalPayloadTag {
Comment on lines +1467 to +1468
Copy link
Contributor

@algonautshant algonautshant Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to make two levels of if?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we want containsPrioPPTag

networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
}
messageTags[request.tags[i]] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, what is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for correcting the PP metric. without it it counts all prio messages, not only PP

}

if wantCompression {
Expand All @@ -1482,7 +1487,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool,
}
}
}
return data, dataCompressed, digests
return data, dataCompressed, digests, messageTags
}

// prio is set if the broadcast is a high-priority broadcast.
Expand All @@ -1499,7 +1504,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool,
}

start := time.Now()
data, dataWithCompression, digests := wn.preparePeerData(request, prio, peers)
data, dataWithCompression, digests, seenPrioTags := wn.preparePeerData(request, prio, peers)

// first send to all the easy outbound peers who don't block, get them started.
sentMessageCount := 0
Expand All @@ -1515,12 +1520,16 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool,
// if this peer supports compressed proposals and compressed data batch is filled out, use it
ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime)
if prio {
networkPrioBatchesPPWithCompression.Inc(nil)
if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok {
algonautshant marked this conversation as resolved.
Show resolved Hide resolved
networkPrioBatchesPPWithCompression.Inc(nil)
}
}
} else {
ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if prio {
networkPrioBatchesPPWithoutCompression.Inc(nil)
if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok {
networkPrioBatchesPPWithoutCompression.Inc(nil)
}
}
}
if ok {
Expand Down
10 changes: 7 additions & 3 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2690,20 +2690,21 @@ func TestParseHostOrURL(t *testing.T) {
func TestPreparePeerData(t *testing.T) {
partitiontest.PartitionTest(t)

// no comression
// no compression
req := broadcastRequest{
tags: []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag},
data: [][]byte{[]byte("test"), []byte("data")},
}

peers := []*wsPeer{}
wn := WebsocketNetwork{}
data, comp, digests := wn.preparePeerData(req, false, peers)
data, comp, digests, seenPrioTags := wn.preparePeerData(req, false, peers)
require.NotEmpty(t, data)
require.Empty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.Empty(t, seenPrioTags)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
Expand All @@ -2717,13 +2718,16 @@ func TestPreparePeerData(t *testing.T) {
features: pfCompressedProposal,
}
peers = []*wsPeer{&peer1, &peer2}
data, comp, digests = wn.preparePeerData(req, true, peers)
data, comp, digests, seenPrioTags = wn.preparePeerData(req, true, peers)
require.NotEmpty(t, data)
require.NotEmpty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.Equal(t, len(comp), len(digests))
require.NotEmpty(t, seenPrioTags)
require.Len(t, seenPrioTags, 1)
require.Contains(t, seenPrioTags, protocol.ProposalPayloadTag)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
Expand Down
14 changes: 14 additions & 0 deletions util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,18 @@ var (
TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"}
// TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool"
TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"}
// TransactionMessagesDupPreBacklog "Number of duplicate transaction messages before placing into a backlog"
TransactionMessagesDupPreBacklog = MetricName{Name: "algod_transaction_messages_dup_prebacklog", Description: "Number of duplicate transaction messages before placing into a backlog"}
// TransactionMessagesTxGroupInvalidFee "Number of transaction messages with invalid txgroup fee"
TransactionMessagesTxGroupInvalidFee = MetricName{Name: "algod_transaction_messages_txgroup_invalid_fee", Description: "Number of transaction messages with invalid txgroup fee"}
// TransactionMessagesTxnBadFormed "Number of transaction messages not well formed"
TransactionMessagesTxnBadFormed = MetricName{Name: "algod_transaction_messages_txn_bad_formed", Description: "Number of transaction messages not well formed"}
// TransactionMessagesTxnSigBadFormed "Number of transaction messages with bad formed signature"
TransactionMessagesTxnSigBadFormed = MetricName{Name: "algod_transaction_messages_sig_bad_formed", Description: "Number of transaction messages with bad formed signature"}
// TransactionMessagesTxnMsigBadFormed "Number of transaction messages with bad formed multisig"
TransactionMessagesTxnMsigBadFormed = MetricName{Name: "algod_transaction_messages_msig_bad_formed", Description: "Number of transaction messages with bad formed multisig"}
// TransactionMessagesTxnLogicSig "Number of transaction messages with invalid logic sig"
TransactionMessagesTxnLogicSig = MetricName{Name: "algod_transaction_messages_logic_sig_failed", Description: "Number of transaction messages with invalid logic sig"}
// TransactionMessagesTxnSigVerificationFailed "Number of transaction messages with signature verification failed"
TransactionMessagesTxnSigVerificationFailed = MetricName{Name: "algod_transaction_messages_sig_verify_failed", Description: "Number of transaction messages with signature verification failed"}
)