Skip to content

Commit

Permalink
core: refactor TxPool.add and .addTxsLocked to batch-verify txs
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed Mar 17, 2022
1 parent d3ab1dd commit 6f21779
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 31 deletions.
136 changes: 106 additions & 30 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -92,6 +93,9 @@ var (

// ErrBadWrapData is returned if wrap data is not valid for the transaction
ErrBadWrapData = errors.New("bad wrap data")

// ErrMissingWrapData is returned if wrap data is missing for the transaction
ErrMissingWrapData = errors.New("missing wrap data")
)

var (
Expand Down Expand Up @@ -592,6 +596,9 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
//
// This does NOT validate wrap-data of the transaction, other than ensuring the
// tx completeness and limited size checks.
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType {
Expand Down Expand Up @@ -660,38 +667,30 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
// Check if wrap data is valid
if err := tx.CheckWrapData(); err != nil {
return ErrBadWrapData
if tx.IsIncomplete() {
return ErrMissingWrapData
}
return nil
}

// add validates a transaction and inserts it into the non-executable queue for later
// pending promotion and execution. If the transaction is a replacement for an already
// pending or queued one, it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will be
// be added to the allowlist, preventing any associated transaction from being dropped
// out of the pool due to pricing constraints.
// deprecated: use addTxsLocked to add multiple txs at once, batching is encouraged to improve performance.
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
knownTxMeter.Mark(1)
return false, ErrAlreadyKnown
}
// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)

// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, isLocal); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1)
return false, err
// reuse the addTxsLocked staged validation system
txs := []*types.Transaction{tx}
errs := []error{nil}
pool.filterKnownTxsLocked(txs, errs)
pool.filterInvalidTxsLocked(txs, errs, local)
pool.filterInvalidBlobTxsLocked(txs, errs)
if errs[0] != nil {
return pool.addValidTx(tx, local)
}
return false, errs[0]
}

func (pool *TxPool) addValidTx(tx *types.Transaction, local bool) (replaced bool, err error) {
inLocalPool := pool.locals.containsTx(tx)
isLocal := local || inLocalPool
hash := tx.Hash()
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
Expand Down Expand Up @@ -760,7 +759,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
return false, err
}
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
if local && !inLocalPool {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
Expand Down Expand Up @@ -960,18 +959,95 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {

// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
//
// It first validates the txs in stages, and then inserts the valid txs into
// the non-executable queue for later pending promotion and execution.
//
// If a transaction is a replacement for an already pending or queued one,
// it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will
// be added to the allowlist, preventing any associated transaction from
// being dropped out of the pool due to pricing constraints.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
// note: the transaction validation and adding happens in stages, so expensive work can be batched.
errs := make([]error, len(txs))
pool.filterKnownTxsLocked(txs, errs)
pool.filterInvalidTxsLocked(txs, errs, local)
pool.filterInvalidBlobTxsLocked(txs, errs)
dirty := pool.addValidTxsLocked(txs, errs, local)
return errs, dirty
}

// filterKnownTxsLocked marks all known transactions with ErrAlreadyKnown
func (pool *TxPool) filterKnownTxsLocked(txs []*types.Transaction, errs []error) {
for i, tx := range txs {
if pool.Has(tx.Hash()) {
log.Trace("Discarding already known transaction", "hash", tx.Hash())
knownTxMeter.Mark(1)
errs[i] = ErrAlreadyKnown
}
}
}

// filterInvalidTxsLocked marks all invalid txs with respective error, this excludes blob validation
func (pool *TxPool) filterInvalidTxsLocked(txs []*types.Transaction, errs []error, local bool) {
for i, tx := range txs {
replaced, err := pool.add(tx, local)
if errs[i] != nil {
continue
}

// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
inLocalPool := pool.locals.containsTx(tx)
isLocal := local || inLocalPool
err := pool.validateTx(tx, isLocal)
if err != nil {
log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err)
invalidTxMeter.Mark(1)
errs[i] = err
}
}
}

// filterInvalidBlobTxsLocked marks all blob txs (if any) with an error if the blobs or kzg commitments are invalid
func (pool *TxPool) filterInvalidBlobTxsLocked(txs []*types.Transaction, errs []error) {
// We batch multiple transactions together.
var batchVerify kzg.BlobsBatch
for i, tx := range txs {
if errs[i] != nil {
continue
}
errs[i] = tx.VerifyBlobsBatched(batchVerify.Join)
}
if err := batchVerify.Verify(); err != nil {
// we'll have to verify each individual blob tx (can still use batch per tx)
// to not throw away the good ones because of some bad tx.
for i, tx := range txs {
if errs[i] != nil {
continue
}
// all blobs within the tx can still be batched together
errs[i] = tx.VerifyBlobsBatched(kzg.VerifyBlobs)
}
}
}

// addValidTxsLocked adds all transactions to the pool that are not marked with an error.
func (pool *TxPool) addValidTxsLocked(txs []*types.Transaction, errs []error, local bool) *accountSet {
dirty := newAccountSet(pool.signer)
for i, tx := range txs {
if errs[i] != nil {
continue
}
replaced, err := pool.addValidTx(tx, local)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx)
}
}
validTxMeter.Mark(int64(len(dirty.accounts)))
return errs, dirty
return dirty
}

// Status returns the status (unknown/pending/queued) of a batch of transactions
Expand Down
3 changes: 2 additions & 1 deletion core/types/transaction_marshalling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/crypto/kzg"
"github.com/protolambda/ztyp/view"
"math/big"

Expand Down Expand Up @@ -351,7 +352,7 @@ func (t *Transaction) UnmarshalJSON(input []byte) error {
Blobs: dec.Blobs,
}
// Verify that versioned hashes match kzgs, and kzgs match blobs.
if err := t.wrapData.checkWrapping(&itx); err != nil {
if err := t.wrapData.verifyBlobsBatched(&itx, kzg.VerifyBlobs); err != nil {
return fmt.Errorf("blob wrapping data is invalid: %v", err)
}
default:
Expand Down

0 comments on commit 6f21779

Please sign in to comment.