Skip to content

Commit

Permalink
beacon/engine, core/txpool, eth/catalyst: add engine_getBlobsV1 API (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe authored and holiman committed Nov 19, 2024
1 parent 1660698 commit cfd1489
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 41 deletions.
5 changes: 5 additions & 0 deletions beacon/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ type BlobsBundleV1 struct {
Blobs []hexutil.Bytes `json:"blobs"`
}

type BlobAndProofV1 struct {
Blob hexutil.Bytes `json:"blob"`
Proof hexutil.Bytes `json:"proof"`
}

// JSON type overrides for ExecutionPayloadEnvelope.
type executionPayloadEnvelopeMarshaling struct {
BlockValue *hexutil.Big
Expand Down
104 changes: 79 additions & 25 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -88,9 +89,11 @@ const (
// bare minimum needed fields to keep the size down (and thus number of entries
// larger with the same memory consumption).
type blobTxMeta struct {
hash common.Hash // Transaction hash to maintain the lookup table
id uint64 // Storage ID in the pool's persistent store
size uint32 // Byte size in the pool's persistent store
hash common.Hash // Transaction hash to maintain the lookup table
vhashes []common.Hash // Blob versioned hashes to maintain the lookup table

id uint64 // Storage ID in the pool's persistent store
size uint32 // Byte size in the pool's persistent store

nonce uint64 // Needed to prioritize inclusion order within an account
costCap *uint256.Int // Needed to validate cumulative balance sufficiency
Expand All @@ -113,6 +116,7 @@ type blobTxMeta struct {
func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
meta := &blobTxMeta{
hash: tx.Hash(),
vhashes: tx.BlobHashes(),
id: id,
size: size,
nonce: tx.Nonce(),
Expand Down Expand Up @@ -306,7 +310,7 @@ type BlobPool struct {
state *state.StateDB // Current state at the head of the chain
gasTip *uint256.Int // Currently accepted minimum gas tip

lookup map[common.Hash]uint64 // Lookup table mapping hashes to tx billy entries
lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries
index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full
Expand All @@ -328,7 +332,7 @@ func New(config Config, chain BlockChain) *BlobPool {
config: config,
signer: types.LatestSigner(chain.Config()),
chain: chain,
lookup: make(map[common.Hash]uint64),
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
}
Expand Down Expand Up @@ -471,7 +475,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
}

meta := newBlobTxMeta(id, size, tx)
if _, exists := p.lookup[meta.hash]; exists {
if p.lookup.exists(meta.hash) {
// This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get
// partially resurrected.
Expand All @@ -496,9 +500,8 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
p.index[sender] = append(p.index[sender], meta)
p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap)

p.lookup[meta.hash] = meta.id
p.lookup.track(meta)
p.stored += uint64(meta.size)

return nil
}

Expand Down Expand Up @@ -531,7 +534,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
nonces = append(nonces, txs[i].nonce)

p.stored -= uint64(txs[i].size)
delete(p.lookup, txs[i].hash)
p.lookup.untrack(txs[i])

// Included transactions blobs need to be moved to the limbo
if filled && inclusions != nil {
Expand Down Expand Up @@ -572,7 +575,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap)
p.stored -= uint64(txs[0].size)
delete(p.lookup, txs[0].hash)
p.lookup.untrack(txs[0])

// Included transactions blobs need to be moved to the limbo
if inclusions != nil {
Expand Down Expand Up @@ -621,14 +624,14 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
// crash would result in previously deleted entities being resurrected.
// That could potentially cause a duplicate nonce to appear.
if txs[i].nonce == txs[i-1].nonce {
id := p.lookup[txs[i].hash]
id, _ := p.lookup.storeidOfTx(txs[i].hash)

log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id)
dropRepeatedMeter.Mark(1)

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
p.stored -= uint64(txs[i].size)
delete(p.lookup, txs[i].hash)
p.lookup.untrack(txs[i])

if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
Expand All @@ -650,7 +653,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap)
p.stored -= uint64(txs[j].size)
delete(p.lookup, txs[j].hash)
p.lookup.untrack(txs[j])
}
txs = txs[:i]

Expand Down Expand Up @@ -688,7 +691,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
p.stored -= uint64(last.size)
delete(p.lookup, last.hash)
p.lookup.untrack(last)
}
if len(txs) == 0 {
delete(p.index, addr)
Expand Down Expand Up @@ -728,7 +731,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap)
p.stored -= uint64(last.size)
delete(p.lookup, last.hash)
p.lookup.untrack(last)
}
p.index[addr] = txs

Expand Down Expand Up @@ -1006,7 +1009,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
p.index[addr] = append(p.index[addr], meta)
p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap)
}
p.lookup[meta.hash] = meta.id
p.lookup.track(meta)
p.stored += uint64(meta.size)
return nil
}
Expand All @@ -1033,7 +1036,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
)
p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
p.stored -= uint64(tx.size)
delete(p.lookup, tx.hash)
p.lookup.untrack(tx)
txs[i] = nil

// Drop everything afterwards, no gaps allowed
Expand All @@ -1043,7 +1046,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap)
p.stored -= uint64(tx.size)
delete(p.lookup, tx.hash)
p.lookup.untrack(tx)
txs[i+1+j] = nil
}
// Clear out the dropped transactions from the index
Expand Down Expand Up @@ -1171,8 +1174,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()

_, ok := p.lookup[hash]
return ok
return p.lookup.exists(hash)
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
Expand All @@ -1189,7 +1191,7 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
}(time.Now())

// Pull the blob from disk and return an assembled response
id, ok := p.lookup[hash]
id, ok := p.lookup.storeidOfTx(hash)
if !ok {
return nil
}
Expand All @@ -1206,6 +1208,58 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
return item
}

// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) {
// Create a map of the blob hash to indices for faster fills
var (
blobs = make([]*kzg4844.Blob, len(vhashes))
proofs = make([]*kzg4844.Proof, len(vhashes))
)
index := make(map[common.Hash]int)
for i, vhash := range vhashes {
index[vhash] = i
}
// Iterate over the blob hashes, pulling transactions that fill it. Take care
// to also fill anything else the transaction might include (probably will).
for i, vhash := range vhashes {
// If already filled by a previous fetch, skip
if blobs[i] != nil {
continue
}
// Unfilled, retrieve the datastore item (in a short lock)
p.lock.RLock()
id, exists := p.lookup.storeidOfBlob(vhash)
if !exists {
p.lock.RUnlock()
continue
}
data, err := p.store.Get(id)
p.lock.RUnlock()

// After releasing the lock, try to fill any blobs requested
if err != nil {
log.Error("Tracked blob transaction missing from store", "id", id, "err", err)
continue
}
item := new(types.Transaction)
if err = rlp.DecodeBytes(data, item); err != nil {
log.Error("Blobs corrupted for traced transaction", "id", id, "err", err)
continue
}
// Fill anything requested, not just the current versioned hash
sidecar := item.BlobTxSidecar()
for j, blobhash := range item.BlobHashes() {
if idx, ok := index[blobhash]; ok {
blobs[idx] = &sidecar.Blobs[j]
proofs[idx] = &sidecar.Proofs[j]
}
}
}
return blobs, proofs
}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
Expand Down Expand Up @@ -1319,8 +1373,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap)
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)

delete(p.lookup, prev.hash)
p.lookup[meta.hash] = meta.id
p.lookup.untrack(prev)
p.lookup.track(meta)
p.stored += uint64(meta.size) - uint64(prev.size)
} else {
// Transaction extends previously scheduled ones
Expand All @@ -1330,7 +1384,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
newacc = true
}
p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap)
p.lookup[meta.hash] = meta.id
p.lookup.track(meta)
p.stored += uint64(meta.size)
}
// Recompute the rolling eviction fields. In case of a replacement, this will
Expand Down Expand Up @@ -1419,7 +1473,7 @@ func (p *BlobPool) drop() {
p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap)
}
p.stored -= uint64(drop.size)
delete(p.lookup, drop.hash)
p.lookup.untrack(drop)

// Remove the transaction from the pool's eviction heap:
// - If the entire account was dropped, pop off the address
Expand Down
Loading

0 comments on commit cfd1489

Please sign in to comment.