-
Notifications
You must be signed in to change notification settings - Fork 474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
txHandler: add more metric #4786
Changes from 9 commits
f79a445
6d1f0ad
f234c96
a12fe42
5fd63cb
96d46e5
e5bcf91
b9a1d9b
17a100b
9188690
937818e
2c29d72
ab035ca
d998917
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,11 @@ package data | |
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"sync" | ||
"time" | ||
|
||
"github.com/algorand/go-algorand/config" | ||
"github.com/algorand/go-algorand/crypto" | ||
|
@@ -45,6 +47,19 @@ var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnByt | |
var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled) | ||
var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) | ||
var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) | ||
var transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted) | ||
var transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee) | ||
var transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed) | ||
var transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed) | ||
var transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed) | ||
var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) | ||
var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) | ||
var transactionMessagesBacklogErr = metrics.MakeCounter(metrics.TransactionMessagesBacklogErr) | ||
var transactionMessagesRemember = metrics.MakeCounter(metrics.TransactionMessagesRemember) | ||
var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize) | ||
|
||
var transactionGroupTxSyncRemember = metrics.MakeCounter(metrics.TransactionGroupTxSyncRemember) | ||
var transactionGroupTxSyncAlreadyCommitted = metrics.MakeCounter(metrics.TransactionGroupTxSyncAlreadyCommitted) | ||
|
||
// The txBacklogMsg structure used to track a single incoming transaction from the gossip network, | ||
type txBacklogMsg struct { | ||
|
@@ -101,8 +116,9 @@ func (handler *TxHandler) Start() { | |
handler.net.RegisterHandlers([]network.TaggedMessageHandler{ | ||
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)}, | ||
}) | ||
handler.backlogWg.Add(1) | ||
handler.backlogWg.Add(2) | ||
go handler.backlogWorker() | ||
go handler.backlogGaugeThread() | ||
} | ||
|
||
// Stop suspends the processing of incoming messages at the transaction handler | ||
|
@@ -113,12 +129,26 @@ func (handler *TxHandler) Stop() { | |
|
||
func reencode(stxns []transactions.SignedTxn) []byte { | ||
var result [][]byte | ||
for _, stxn := range stxns { | ||
result = append(result, protocol.Encode(&stxn)) | ||
for i := range stxns { | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result = append(result, protocol.Encode(&stxns[i])) | ||
} | ||
return bytes.Join(result, nil) | ||
} | ||
|
||
func (handler *TxHandler) backlogGaugeThread() { | ||
defer handler.backlogWg.Done() | ||
ticker := time.NewTicker(5 * time.Second) | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ticker.C: | ||
transactionMessagesBacklogSizeGauge.Set(float64(len(handler.backlogQueue))) | ||
case <-handler.ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
// backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels | ||
// and dispatches them further. | ||
func (handler *TxHandler) backlogWorker() { | ||
|
@@ -132,7 +162,7 @@ func (handler *TxHandler) backlogWorker() { | |
if !ok { | ||
return | ||
} | ||
handler.postprocessCheckedTxn(wi) | ||
handler.postProcessCheckedTxn(wi) | ||
|
||
// restart the loop so that we could empty out the post verification queue. | ||
continue | ||
|
@@ -146,6 +176,7 @@ func (handler *TxHandler) backlogWorker() { | |
return | ||
} | ||
if handler.checkAlreadyCommitted(wi) { | ||
transactionMessagesAlreadyCommitted.Inc(nil) | ||
continue | ||
} | ||
|
||
|
@@ -156,17 +187,47 @@ func (handler *TxHandler) backlogWorker() { | |
if !ok { | ||
return | ||
} | ||
handler.postprocessCheckedTxn(wi) | ||
handler.postProcessCheckedTxn(wi) | ||
|
||
case <-handler.ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) { | ||
func (handler *TxHandler) postProcessReportErrors(err error) { | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if errors.Is(err, crypto.ErrBatchVerificationFailed) { | ||
transactionMessagesTxnSigVerificationFailed.Inc(nil) | ||
return | ||
} | ||
|
||
var txGroupErr *verify.ErrTxGroupError | ||
if errors.As(err, &txGroupErr) { | ||
switch txGroupErr.Reason { | ||
case verify.TxGroupErrorReasonNotWellFormed: | ||
transactionMessagesTxnNotWellFormed.Inc(nil) | ||
case verify.TxGroupErrorReasonInvalidFee: | ||
transactionMessagesTxGroupInvalidFee.Inc(nil) | ||
case verify.TxGroupErrorReasonHasNoSig: | ||
fallthrough | ||
case verify.TxGroupErrorReasonSigNotWellFormed: | ||
transactionMessagesTxnSigNotWellFormed.Inc(nil) | ||
case verify.TxGroupErrorReasonMsigNotWellFormed: | ||
transactionMessagesTxnMsigNotWellFormed.Inc(nil) | ||
case verify.TxGroupErrorReasonLogicSigFailed: | ||
transactionMessagesTxnLogicSig.Inc(nil) | ||
default: | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
transactionMessagesBacklogErr.Inc(nil) | ||
} | ||
} else { | ||
transactionMessagesBacklogErr.Inc(nil) | ||
} | ||
} | ||
|
||
func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { | ||
if wi.verificationErr != nil { | ||
// disconnect from peer. | ||
handler.postProcessReportErrors(wi.verificationErr) | ||
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr) | ||
handler.net.Disconnect(wi.rawmsg.Sender) | ||
return | ||
|
@@ -185,6 +246,8 @@ func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) { | |
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want to break out any of the errors where Remember() fails? Remember failing is transactionMessagesHandled - transactionMessagesRemember so we already know the total There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. follow-up: will do in separate PR |
||
} | ||
|
||
transactionMessagesRemember.Inc(nil) | ||
|
||
// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. | ||
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) | ||
if err != nil { | ||
|
@@ -203,7 +266,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} { | |
latest := handler.ledger.Latest() | ||
latestHdr, err := handler.ledger.BlockHdr(latest) | ||
if err != nil { | ||
tx.verificationErr = fmt.Errorf("Could not get header for previous block %d: %w", latest, err) | ||
tx.verificationErr = fmt.Errorf("could not get header for previous block %d: %w", latest, err) | ||
logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err) | ||
} else { | ||
// we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock. | ||
|
@@ -289,6 +352,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed | |
unverifiedTxGroup: unverifiedTxGroup, | ||
} | ||
if handler.checkAlreadyCommitted(tx) { | ||
transactionGroupTxSyncAlreadyCommitted.Inc(nil) | ||
return network.OutgoingMessage{}, true | ||
} | ||
|
||
|
@@ -319,6 +383,8 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed | |
return network.OutgoingMessage{}, true | ||
} | ||
|
||
transactionGroupTxSyncRemember.Inc(nil) | ||
|
||
// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. | ||
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think txnBatchPrep is already returning ErrTxGroupError, maybe you would just want to keep the error it created (not make a new one), which preserves .Reason, and set .err on it to have this wrapped fmt?