Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, eth, miner: start propagating and consuming blob txs #28243

Merged
merged 7 commits into from
Oct 4, 2023
55 changes: 40 additions & 15 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type blobTxMeta struct {
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
execFeeCap *uint256.Int // Needed to validate replacement price bump
blobFeeCap *uint256.Int // Needed to validate replacement price bump
execGas uint64 // Needed to check inclusion validity before reading the blob
blobGas uint64 // Needed to check inclusion validity before reading the blob

basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
Expand All @@ -118,6 +120,8 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execGas: tx.Gas(),
blobGas: tx.BlobGas(),
}
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
Expand Down Expand Up @@ -307,8 +311,8 @@ type BlobPool struct {
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full

eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)

lock sync.RWMutex // Mutex protecting the pool during reorg handling
}
Expand Down Expand Up @@ -436,8 +440,6 @@ func (p *BlobPool) Close() error {
if err := p.store.Close(); err != nil {
errs = append(errs, err)
}
p.eventScope.Close()
karalabe marked this conversation as resolved.
Show resolved Hide resolved

switch {
case errs == nil:
return nil
Expand Down Expand Up @@ -758,15 +760,21 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
// Run the reorg between the old and new head and figure out which accounts
// need to be rechecked and which transactions need to be readded
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
var adds []*types.Transaction
for addr, txs := range reinject {
// Blindly push all the lost transactions back into the pool
for _, tx := range txs {
p.reinject(addr, tx.Hash())
if err := p.reinject(addr, tx.Hash()); err == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
// Recheck the account's pooled transactions to drop included and
// invalidated one
p.recheck(addr, inclusions)
}
if len(adds) > 0 {
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
}
// Flush out any blobs from limbo that are older than the latest finality
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
Expand Down Expand Up @@ -921,13 +929,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
// Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool.
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
tx, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return
return err
}
// TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here.
Expand All @@ -936,20 +944,20 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return
return err
}
id, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return
return err
}

// Update the indixes and metrics
meta := newBlobTxMeta(id, p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
return
return err
}
p.index[addr] = []*blobTxMeta{meta}
p.spent[addr] = meta.costCap
Expand All @@ -960,6 +968,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
}
p.lookup[meta.hash] = meta.id
p.stored += uint64(meta.size)
return nil
}

// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
Expand Down Expand Up @@ -1154,9 +1163,19 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errs := make([]error, len(txs))
var (
adds = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs))
)
for i, tx := range txs {
errs[i] = p.add(tx)
if errs[0] == nil {
karalabe marked this conversation as resolved.
Show resolved Hide resolved
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
return errs
}
Expand Down Expand Up @@ -1384,6 +1403,8 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap.ToBig(),
GasTipCap: tx.execTipCap.ToBig(),
Gas: tx.execGas,
BlobGas: tx.blobGas,
})
}
if len(lazies) > 0 {
Expand Down Expand Up @@ -1468,10 +1489,14 @@ func (p *BlobPool) updateLimboMetrics() {
limboSlotusedGauge.Update(int64(slotused))
}

// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
if reorgs {
return p.insertFeed.Subscribe(ch)
} else {
return p.discoverFeed.Subscribe(ch)
}
}

// Nonce returns the next nonce of an account, with all transactions executable
Expand Down
18 changes: 10 additions & 8 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ type LegacyPool struct {
chain BlockChain
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

Expand Down Expand Up @@ -404,9 +403,6 @@ func (pool *LegacyPool) loop() {

// Close terminates the transaction pool.
func (pool *LegacyPool) Close() error {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()

// Terminate the pool reorger and return
close(pool.reorgShutdownCh)
pool.wg.Wait()
Expand All @@ -425,10 +421,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
<-wait
}

// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
// The legacy pool has a very messed up internal shuffling, so it's kind of
// hard to separate newly discovered transaction from resurrected ones. This
// is because the new txs are added to the queue, resurrected ones too and
// reorgs run lazily, so separating the two would need a marker.
return pool.txFeed.Subscribe(ch)
}

// SetGasTip updates the minimum gas tip required by the transaction pool for a
Expand Down Expand Up @@ -552,6 +552,8 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
Expand Down
19 changes: 16 additions & 3 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
// enough for the miner and other APIs to handle large batches of transactions;
// and supports pulling up the entire transaction when really needed.
type LazyTransaction struct {
Pool SubPool // Transaction subpool to pull the real transaction up
Pool LazyResolver // Transaction resolver to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed
Tx *types.Transaction // Transaction if already resolved

Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay

Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction
}

// Resolve retrieves the full transaction belonging to a lazy handle if it is still
Expand All @@ -48,6 +51,14 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction {
return ltx.Tx
}

// LazyResolver is a minimal interface needed for a transaction pool to satisfy
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
// pool being injected into the lazy transaction.
type LazyResolver interface {
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *types.Transaction
}

// AddressReserver is passed by the main transaction pool to subpools, so they
// may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error
Expand Down Expand Up @@ -99,8 +110,10 @@ type SubPool interface {
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction

// SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
Expand Down
8 changes: 4 additions & 4 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
return txs
}

// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
// events to the given channel.
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch)
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
}

func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeNewTxsEvent(ch)
return b.eth.txPool.SubscribeTransactions(ch, true)
}

func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {
Expand Down
2 changes: 1 addition & 1 deletion eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error {
func (c *SimulatedBeacon) loopOnDemand() {
var (
newTxs = make(chan core.NewTxsEvent)
sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs)
sub = c.eth.TxPool().SubscribeTransactions(newTxs, true)
)
defer sub.Unsubscribe()

Expand Down
17 changes: 11 additions & 6 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ type txPool interface {
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction

// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down Expand Up @@ -509,10 +510,10 @@ func (h *handler) unregisterPeer(id string) {
func (h *handler) Start(maxPeers int) {
h.maxPeers = maxPeers

// broadcast transactions
// broadcast and announce transactions (only new ones, not resurrected ones)
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop()

// broadcast mined blocks
Expand Down Expand Up @@ -592,7 +593,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
}

// BroadcastTransactions will propagate a batch of transactions
// - To a square root of all peers
// - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to
// already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions) {
Expand All @@ -608,6 +609,10 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
)
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
// Blob transactions are never broadcast, only ever announced
if tx.Type() == types.BlobTxType {
continue
}
karalabe marked this conversation as resolved.
Show resolved Hide resolved
peers := h.peers.peersWithoutTransaction(tx.Hash())

var numDirect int
Expand Down
14 changes: 8 additions & 6 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package eth

import (
"errors"
"fmt"
"math/big"
"time"
Expand Down Expand Up @@ -73,6 +74,11 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.txFetcher.Notify(peer.ID(), packet.Hashes)

case *eth.TransactionsPacket:
for _, tx := range *packet {
if tx.Type() == types.BlobTxType {
return errors.New("disallowed broadcast blob transaction")
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

case *eth.PooledTransactionsResponse:
Expand All @@ -90,9 +96,7 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,
// the chain already entered the pos stage and disconnect the
// remote peer.
if h.merger.PoSFinalized() {
// TODO (MariusVanDerWijden) drop non-updated peers after the merge
return nil
// return errors.New("unexpected block announces")
return errors.New("disallowed block announcement")
}
// Schedule all the unknown hashes for retrieval
var (
Expand All @@ -118,9 +122,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
// the chain already entered the pos stage and disconnect the
// remote peer.
if h.merger.PoSFinalized() {
// TODO (MariusVanDerWijden) drop non-updated peers after the merge
return nil
// return errors.New("unexpected block announces")
return errors.New("disallowed block broadcast")
}
// Schedule the block for import
h.blockFetcher.Enqueue(peer.ID(), block)
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
handler.handler.synced.Store(true) // mark synced to accept transactions

txs := make(chan core.NewTxsEvent)
sub := handler.txpool.SubscribeNewTxsEvent(txs)
sub := handler.txpool.SubscribeTransactions(txs, false)
defer sub.Unsubscribe()

// Create a source peer to send messages through and a sink handler to receive them
Expand Down Expand Up @@ -424,7 +424,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
for i := 0; i < len(sinks); i++ {
txChs[i] = make(chan core.NewTxsEvent, 1024)

sub := sinks[i].txpool.SubscribeNewTxsEvent(txChs[i])
sub := sinks[i].txpool.SubscribeTransactions(txChs[i], false)
defer sub.Unsubscribe()
}
// Fill the source pool with transactions and wait for them at the sinks
Expand Down
Loading