Skip to content

Commit

Permalink
core, eth, miner: start propagating and consuming blob txs (ethereum#…
Browse files Browse the repository at this point in the history
…28243)

* core, eth, miner: start propagating and consuming blob txs

* eth/protocols/eth: disable eth/67 if Cancun is enabled

* core/txpool, eth, miner: pass gas limit infos in lazy tx for mienr filtering

* core/txpool, miner: add lazy resolver for pending txs too

* core, eth: fix review noticed bugs

* eth, miner: minor polishes in the mining and announcing logs

* core/expool: unsubscribe the event scope
  • Loading branch information
karalabe authored and tyler-smith committed Oct 16, 2023
1 parent 81774bd commit 36f77b3
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 94 deletions.
63 changes: 45 additions & 18 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"sync"
"time"

"github.com/holiman/billy"
"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
Expand All @@ -41,8 +44,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/billy"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -97,6 +98,8 @@ type blobTxMeta struct {
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
execFeeCap *uint256.Int // Needed to validate replacement price bump
blobFeeCap *uint256.Int // Needed to validate replacement price bump
execGas uint64 // Needed to check inclusion validity before reading the blob
blobGas uint64 // Needed to check inclusion validity before reading the blob

basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
Expand All @@ -118,6 +121,8 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execGas: tx.Gas(),
blobGas: tx.BlobGas(),
}
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
Expand Down Expand Up @@ -307,10 +312,12 @@ type BlobPool struct {
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full

eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
dropTxFeed event.Feed
rejectTxFeed event.Feed
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)

lock sync.RWMutex // Mutex protecting the pool during reorg handling
}
Expand Down Expand Up @@ -438,8 +445,6 @@ func (p *BlobPool) Close() error {
if err := p.store.Close(); err != nil {
errs = append(errs, err)
}
p.eventScope.Close()

switch {
case errs == nil:
return nil
Expand Down Expand Up @@ -760,15 +765,21 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
// Run the reorg between the old and new head and figure out which accounts
// need to be rechecked and which transactions need to be readded
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
var adds []*types.Transaction
for addr, txs := range reinject {
// Blindly push all the lost transactions back into the pool
for _, tx := range txs {
p.reinject(addr, tx.Hash())
if err := p.reinject(addr, tx.Hash()); err == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
// Recheck the account's pooled transactions to drop included and
// invalidated one
p.recheck(addr, inclusions)
}
if len(adds) > 0 {
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
}
// Flush out any blobs from limbo that are older than the latest finality
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
Expand Down Expand Up @@ -923,13 +934,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
// Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool.
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
tx, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return
return err
}
// TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here.
Expand All @@ -938,20 +949,20 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return
return err
}
id, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return
return err
}

// Update the indixes and metrics
meta := newBlobTxMeta(id, p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
return
return err
}
p.index[addr] = []*blobTxMeta{meta}
p.spent[addr] = meta.costCap
Expand All @@ -962,6 +973,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
}
p.lookup[meta.hash] = meta.id
p.stored += uint64(meta.size)
return nil
}

// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
Expand Down Expand Up @@ -1156,9 +1168,19 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errs := make([]error, len(txs))
var (
adds = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs))
)
for i, tx := range txs {
errs[i] = p.add(tx)
if errs[i] == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
return errs
}
Expand Down Expand Up @@ -1386,6 +1408,8 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap.ToBig(),
GasTipCap: tx.execTipCap.ToBig(),
Gas: tx.execGas,
BlobGas: tx.blobGas,
})
}
if len(lazies) > 0 {
Expand Down Expand Up @@ -1470,10 +1494,14 @@ func (p *BlobPool) updateLimboMetrics() {
limboSlotusedGauge.Update(int64(slotused))
}

// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
if reorgs {
return p.insertFeed.Subscribe(ch)
} else {
return p.discoverFeed.Subscribe(ch)
}
}

// Nonce returns the next nonce of an account, with all transactions executable
Expand Down Expand Up @@ -1535,7 +1563,6 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
return txpool.TxStatusUnknown
}


// SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and
// starts sending event to the given channel.
func (pool *BlobPool) SubscribeDropTxsEvent(ch chan<- core.DropTxsEvent) event.Subscription {
Expand Down
53 changes: 27 additions & 26 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,16 @@ func (config *Config) sanitize() Config {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type LegacyPool struct {
config Config
chainconfig *params.ChainConfig
chain BlockChain
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
config Config
chainconfig *params.ChainConfig
chain BlockChain
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
dropTxFeed event.Feed
rejectTxFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

currentHead atomic.Pointer[types.Header] // Current head of the blockchain
currentState *state.StateDB // Current state in the blockchain head
Expand Down Expand Up @@ -410,9 +410,6 @@ func (pool *LegacyPool) loop() {

// Close terminates the transaction pool.
func (pool *LegacyPool) Close() error {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()

// Terminate the pool reorger and return
close(pool.reorgShutdownCh)
pool.wg.Wait()
Expand All @@ -431,10 +428,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
<-wait
}

// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
// The legacy pool has a very messed up internal shuffling, so it's kind of
// hard to separate newly discovered transaction from resurrected ones. This
// is because the new txs are added to the queue, resurrected ones too and
// reorgs run lazily, so separating the two would need a marker.
return pool.txFeed.Subscribe(ch)
}

// SubscribeDropTxsEvent registers a subscription of core.DropTxsEvent and
Expand All @@ -449,7 +450,6 @@ func (pool *LegacyPool) SubscribeRejectedTxEvent(ch chan<- core.RejectedTxEvent)
return pool.scope.Track(pool.rejectTxFeed.Subscribe(ch))
}


// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *LegacyPool) SetGasTip(tip *big.Int) {
Expand Down Expand Up @@ -575,6 +575,8 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
Expand Down Expand Up @@ -2027,16 +2029,15 @@ func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
}


const (
dropUnderpriced = "underpriced-txs"
dropLowNonce = "low-nonce-txs"
dropUnpayable = "unpayable-txs"

dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
dropReplaced = "replaced-txs"
dropUnexecutable = "unexecutable-txs"
dropTruncating = "truncating-txs"
dropOld = "old-txs"
dropLowNonce = "low-nonce-txs"
dropUnpayable = "unpayable-txs"

dropAccountCap = "account-cap-txs" // Accounts exceeding txpool.accountslots transactions
dropReplaced = "replaced-txs"
dropUnexecutable = "unexecutable-txs"
dropTruncating = "truncating-txs"
dropOld = "old-txs"
dropGasPriceUpdated = "updated-gas-price"
)
)
19 changes: 16 additions & 3 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
// enough for the miner and other APIs to handle large batches of transactions;
// and supports pulling up the entire transaction when really needed.
type LazyTransaction struct {
Pool SubPool // Transaction subpool to pull the real transaction up
Pool LazyResolver // Transaction resolver to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed
Tx *types.Transaction // Transaction if already resolved

Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay

Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction
}

// Resolve retrieves the full transaction belonging to a lazy handle if it is still
Expand All @@ -48,6 +51,14 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction {
return ltx.Tx
}

// LazyResolver is a minimal interface needed for a transaction pool to satisfy
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
// pool being injected into the lazy transaction.
type LazyResolver interface {
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *types.Transaction
}

// AddressReserver is passed by the main transaction pool to subpools, so they
// may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error
Expand Down Expand Up @@ -99,8 +110,10 @@ type SubPool interface {
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction

// SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
Expand Down
12 changes: 7 additions & 5 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,15 @@ func (p *TxPool) Close() error {
if err := <-errc; err != nil {
errs = append(errs, err)
}

// Terminate each subpool
for _, subpool := range p.subpools {
if err := subpool.Close(); err != nil {
errs = append(errs, err)
}
}
// Unsubscribe anyone still listening for tx events
p.subs.Close()

if len(errs) > 0 {
return fmt.Errorf("subpool close errors: %v", errs)
}
Expand Down Expand Up @@ -316,12 +318,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
return txs
}

// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
// events to the given channel.
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch)
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
}

func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeNewTxsEvent(ch)
return b.eth.txPool.SubscribeTransactions(ch, true)
}

func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {
Expand Down
2 changes: 1 addition & 1 deletion eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error {
func (c *SimulatedBeacon) loopOnDemand() {
var (
newTxs = make(chan core.NewTxsEvent)
sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs)
sub = c.eth.TxPool().SubscribeTransactions(newTxs, true)
)
defer sub.Unsubscribe()

Expand Down
Loading

0 comments on commit 36f77b3

Please sign in to comment.