Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/txpool, eth, miner: retrieve plain and blob txs separately #29026

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,12 @@ func (p *BlobPool) drop() {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
// If only plain transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyPlainTxs {
return nil
}
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
// The latter will be pretty much moot, but we've kept it to have symmetric
Expand All @@ -1466,20 +1471,20 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
for _, tx := range txs {
// If transaction filtering was requested, discard badly priced ones
if minTip != nil && baseFee != nil {
if tx.execFeeCap.Lt(baseFee) {
if filter.MinTip != nil && filter.BaseFee != nil {
if tx.execFeeCap.Lt(filter.BaseFee) {
break // basefee too low, cannot be included, discard rest of txs from the account
}
tip := new(uint256.Int).Sub(tx.execFeeCap, baseFee)
tip := new(uint256.Int).Sub(tx.execFeeCap, filter.BaseFee)
if tip.Gt(tx.execTipCap) {
tip = tx.execTipCap
}
if tip.Lt(minTip) {
if tip.Lt(filter.MinTip) {
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
}
}
if blobFee != nil {
if tx.blobFeeCap.Lt(blobFee) {
if filter.BlobFee != nil {
if tx.blobFeeCap.Lt(filter.BlobFee) {
break // blobfee too low, cannot be included, discard rest of txs from the account
}
}
Expand Down
6 changes: 5 additions & 1 deletion core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,11 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
p := pool.Pending(uint256.NewInt(1), chain.basefee, chain.blobfee)
p := pool.Pending(txpool.PendingFilter{
MinTip: uint256.NewInt(1),
BaseFee: chain.basefee,
BlobFee: chain.blobfee,
})
if len(p) != int(capacity) {
b.Fatalf("have %d want %d", len(p), capacity)
}
Expand Down
16 changes: 10 additions & 6 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,12 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
pool.mu.Lock()
defer pool.mu.Unlock()

Expand All @@ -531,13 +536,12 @@ func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobF
minTipBig *big.Int
baseFeeBig *big.Int
)
if minTip != nil {
minTipBig = minTip.ToBig()
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if baseFee != nil {
baseFeeBig = baseFee.ToBig()
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}

pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()
Expand Down
17 changes: 16 additions & 1 deletion core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ type LazyResolver interface {
// may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error

// PendingFilter is a collection of filter rules to allow retrieving a subset
// of transactions for announcement or mining.
//
// Note, the entries here are not arbitrary useful filters, rather each one has
// a very specific call site in mind and each one can be evaluated very cheaply
// by the pool implementations. Only add new ones that satisfy those constraints.
type PendingFilter struct {
MinTip *uint256.Int // Minimum miner tip required to include a transaction
BaseFee *uint256.Int // Minimum 1559 basefee needed to include a transaction
BlobFee *uint256.Int // Minimum 4844 blobfee needed to include a blob transaction

OnlyPlainTxs bool // Return only plain EVM transactions (peer-join announces, block space filling)
OnlyBlobTxs bool // Return only blob transactions (block blob-space filling)
}

// SubPool represents a specialized transaction pool that lives on its own (e.g.
// blob pool). Since independent of how many specialized pools we have, they do
// need to be updated in lockstep and assemble into one coherent view for block
Expand Down Expand Up @@ -118,7 +133,7 @@ type SubPool interface {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction

// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
Expand Down
5 changes: 2 additions & 3 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

// TxStatus is the current status of a transaction as seen by the pool.
Expand Down Expand Up @@ -357,10 +356,10 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction {
func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(minTip, baseFee, blobFee) {
for addr, set := range subpool.Pending(filter) {
txs[addr] = set
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(nil, nil, nil)
pending := b.eth.txPool.Pending(txpool.PendingFilter{})
var txs types.Transactions
for _, batch := range pending {
for _, lazy := range batch {
Expand Down
5 changes: 3 additions & 2 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eip-template.md

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -263,7 +264,7 @@ func (c *SimulatedBeacon) Rollback() {

// Fork sets the head to the provided hash.
func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
return errors.New("pending block dirty")
}
parent := c.eth.BlockChain().GetBlockByHash(parentHash)
Expand All @@ -275,7 +276,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {

// AdjustTime creates a new block with an adjusted timestamp.
func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
return errors.New("could not adjust time on non-empty block")
}
parent := c.eth.BlockChain().CurrentBlock()
Expand Down
3 changes: 1 addition & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -74,7 +73,7 @@ type txPool interface {

// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction
Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction

// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []erro
}

// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
p.lock.RLock()
defer p.lock.RUnlock()

Expand Down
3 changes: 2 additions & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -36,7 +37,7 @@ const (
// syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) {
var hashes []common.Hash
for _, batch := range h.txpool.Pending(nil, nil, nil) {
for _, batch := range h.txpool.Pending(txpool.PendingFilter{OnlyPlainTxs: true}) {
for _, tx := range batch {
hashes = append(hashes, tx.Hash)
}
Expand Down
11 changes: 11 additions & 0 deletions miner/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,14 @@ func (t *transactionsByPriceAndNonce) Shift() {
func (t *transactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

// Empty returns if the price heap is empty. It can be used to check it simpler
// than calling peek and checking for nil return.
func (t *transactionsByPriceAndNonce) Empty() bool {
return len(t.heads) == 0
}

// Clear removes the entire content of the heap.
func (t *transactionsByPriceAndNonce) Clear() {
t.heads, t.txs = nil, nil
}
86 changes: 61 additions & 25 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,11 @@ func (w *worker) mainLoop() {
BlobGas: tx.BlobGas(),
})
}
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
plainTxs := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) // Mixed bag of everrything, yolo
blobTxs := newTransactionsByPriceAndNonce(w.current.signer, nil, w.current.header.BaseFee) // Empty bag, don't bother optimising

tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil, new(uint256.Int))
w.commitTransactions(w.current, plainTxs, blobTxs, nil)

// Only update the snapshot if any new transactions were added
// to the pending block
Expand Down Expand Up @@ -802,7 +804,7 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ
return receipt, err
}

func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int) error {
func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -821,8 +823,33 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// If we don't have enough blob space for any further blob transactions,
// skip that list altogether
if !blobTxs.Empty() && env.blobs*params.BlobTxBlobGasPerBlob >= params.MaxBlobGasPerBlock {
karalabe marked this conversation as resolved.
Show resolved Hide resolved
log.Trace("Not enough blob space for further blob transactions")
blobTxs.Clear()
// Fall though to pick up any plain txs
}
// Retrieve the next transaction and abort if all done.
ltx, tip := txs.Peek()
var (
ltx *txpool.LazyTransaction
txs *transactionsByPriceAndNonce
)
pltx, ptip := plainTxs.Peek()
bltx, btip := blobTxs.Peek()

switch {
case pltx == nil:
txs, ltx = blobTxs, bltx
case bltx == nil:
txs, ltx = plainTxs, pltx
default:
if ptip.Lt(btip) {
txs, ltx = blobTxs, bltx
} else {
txs, ltx = plainTxs, pltx
}
}
if ltx == nil {
break
}
Expand All @@ -837,11 +864,6 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
txs.Pop()
continue
}
// If we don't receive enough tip for the next transaction, skip the account
if tip.Cmp(minTip) < 0 {
log.Trace("Not enough tip for transaction", "hash", ltx.Hash, "tip", tip, "needed", minTip)
break // If the next-best is too low, surely no better will be available
}
// Transaction seems to fit, pull it up from the pool
tx := ltx.Resolve()
if tx == nil {
Expand Down Expand Up @@ -1005,35 +1027,49 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
w.mu.RUnlock()

// Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees
var baseFee *uint256.Int
filter := txpool.PendingFilter{
MinTip: tip,
}
if env.header.BaseFee != nil {
baseFee = uint256.MustFromBig(env.header.BaseFee)
filter.BaseFee = uint256.MustFromBig(env.header.BaseFee)
}
var blobFee *uint256.Int
if env.header.ExcessBlobGas != nil {
blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
}
pending := w.eth.TxPool().Pending(tip, baseFee, blobFee)
filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false
pendingPlainTxs := w.eth.TxPool().Pending(filter)

filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true
pendingBlobTxs := w.eth.TxPool().Pending(filter)

// Split the pending transactions into locals and remotes.
localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
localBlobTxs, remoteBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs

for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 {
delete(remoteTxs, account)
localTxs[account] = txs
if txs := remotePlainTxs[account]; len(txs) > 0 {
delete(remotePlainTxs, account)
localPlainTxs[account] = txs
}
if txs := remoteBlobTxs[account]; len(txs) > 0 {
delete(remoteBlobTxs, account)
localBlobTxs[account] = txs
}
}

// Fill the block with all available pending transactions.
if len(localTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, new(uint256.Int)); err != nil {
if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee)

if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
if len(remoteTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, tip); err != nil {
if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee)

if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
Expand Down
Loading