Skip to content

Commit

Permalink
blobTx: mining + brodcasting (#2253)
Browse files Browse the repository at this point in the history
  • Loading branch information
emailtovamos authored and buddh0 committed Mar 13, 2024
1 parent 90cf86b commit 8c8eac0
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 29 deletions.
9 changes: 9 additions & 0 deletions core/types/tx_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ type BlobTx struct {

type BlobTxSidecars []*BlobTxSidecar

// Len returns the length of s.
func (s BlobTxSidecars) Len() int { return len(s) }

// EncodeIndex encodes the i'th BlobTxSidecar to w. Note that this does not check for errors
// because we assume that BlobTxSidecars will only ever contain valid sidecars
func (s BlobTxSidecars) EncodeIndex(i int, w *bytes.Buffer) {
rlp.Encode(w, s[i])
}

// BlobTxSidecar contains the blobs of a blob transaction.
type BlobTxSidecar struct {
Blobs []kzg4844.Blob // Blobs needed by the blob pool
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
)
blocks := make([]*types.Block, len(results))
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals)
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars)
}
// Downloaded blocks are always regarded as trusted after the
// transition. Because the downloaded chain is guided by the
Expand Down Expand Up @@ -1600,7 +1600,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
blocks := make([]*types.Block, len(results))
receipts := make([]types.Receipts, len(results))
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals)
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars)
receipts[i] = result.Receipts
}
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
Expand All @@ -1611,7 +1611,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
}

func (d *Downloader) commitPivotBlock(result *fetchResult) error {
block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals)
block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles).WithWithdrawals(result.Withdrawals).WithBlobs(result.Sidecars)
log.Debug("Committing snap sync pivot as new head", "number", block.Number(), "hash", block.Hash())

// Commit the pivot block as the new head, will require full sync from here on
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/fetchers_concurrent_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan
// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the body data and delivering it to the downloader's queue.
func (q *bodyQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
txs, uncles, withdrawals := packet.Res.(*eth.BlockBodiesResponse).Unpack()
txs, uncles, withdrawals, sidecars := packet.Res.(*eth.BlockBodiesResponse).Unpack()
hashsets := packet.Meta.([][]common.Hash) // {txs hashes, uncle hashes, withdrawal hashes}

accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2])
accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2], sidecars)
switch {
case err == nil && len(txs) == 0:
peer.log.Trace("Requested bodies delivered")
Expand Down
4 changes: 3 additions & 1 deletion eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type fetchResult struct {
Transactions types.Transactions
Receipts types.Receipts
Withdrawals types.Withdrawals
Sidecars types.BlobTxSidecars
}

func newFetchResult(header *types.Header, fastSync bool, pid string) *fetchResult {
Expand Down Expand Up @@ -776,7 +777,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []comm
// also wakes any threads waiting for data delivery.
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListHashes []common.Hash,
uncleLists [][]*types.Header, uncleListHashes []common.Hash,
withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash) (int, error) {
withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash, sidecars [][]*types.BlobTxSidecar) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()

Expand Down Expand Up @@ -838,6 +839,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
result.Transactions = txLists[index]
result.Uncles = uncleLists[index]
result.Withdrawals = withdrawalLists[index]
result.Sidecars = sidecars[index]
result.SetBodyDone()
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func XTestDelivery(t *testing.T) {
uncleHashes[i] = types.CalcUncleHash(uncles)
}
time.Sleep(100 * time.Millisecond)
_, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil)
_, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil, nil)
if err != nil {
fmt.Printf("delivered %d bodies %v\n", len(txset), err)
}
Expand Down
3 changes: 2 additions & 1 deletion eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ func (f *BlockFetcher) loop() {
case res := <-resCh:
res.Done <- nil
// Ignoring withdrawals here, since the block fetcher is not used post-merge.
txs, uncles, _ := res.Res.(*eth.BlockBodiesResponse).Unpack()
// todo 4844 is it ok to ignore sidecars here too?
txs, uncles, _, _ := res.Res.(*eth.BlockBodiesResponse).Unpack()
f.FilterBodies(peer, txs, uncles, time.Now())

case <-timeout.C:
Expand Down
1 change: 1 addition & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
} else {
transfer = peers[:int(math.Sqrt(float64(len(peers))))]
}

for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
Expand Down
11 changes: 9 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.handleBlockAnnounces(peer, hashes, numbers)

case *eth.NewBlockPacket:
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)
return h.handleBlockBroadcast(peer, packet)

case *eth.NewPooledTransactionHashesPacket:
return h.txFetcher.Notify(peer.ID(), packet.Types, packet.Sizes, packet.Hashes)
Expand Down Expand Up @@ -115,13 +115,20 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,

// handleBlockBroadcast is invoked from a peer's message handler when it transmits a
// block broadcast for the local node to process.
func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error {
func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPacket) error {
// Drop all incoming block announces from the p2p network if
// the chain already entered the pos stage and disconnect the
// remote peer.
if h.merger.PoSFinalized() {
return errors.New("disallowed block broadcast")
}
block := packet.Block
td := packet.TD
sidecars := packet.Sidecars
if sidecars != nil {
block = block.WithBlobs(sidecars)
}

// Schedule the block for import
h.blockFetcher.Enqueue(peer.ID(), block)

Expand Down
11 changes: 8 additions & 3 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
// blockPropagation is a block propagation event, waiting for its turn in the
// broadcast queue.
type blockPropagation struct {
block *types.Block
td *big.Int
block *types.Block
td *big.Int
sidecars types.BlobTxSidecars `rlp:"optional"`
}

// broadcastBlocks is a write loop that multiplexes blocks and block announcements
Expand All @@ -47,7 +48,11 @@ func (p *Peer) broadcastBlocks() {
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
}
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
if len(prop.sidecars) > 0 {
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td, "sidecars", prop.sidecars.Len())
} else {
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
}

case block := <-p.queuedBlockAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
Expand Down
162 changes: 161 additions & 1 deletion eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package eth

import (
rand2 "crypto/rand"
"io"
"math"
"math/big"
"math/rand"
Expand All @@ -33,10 +35,14 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth/protocols/bsc"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)

var (
Expand Down Expand Up @@ -146,7 +152,7 @@ func (b *testBackend) AcceptTxs() bool {
panic("data processing tests should be done in the handler package")
}
func (b *testBackend) Handle(*Peer, Packet) error {
panic("data processing tests should be done in the handler package")
return nil
}

// Tests that block headers can be retrieved from a remote chain based on user queries.
Expand Down Expand Up @@ -499,3 +505,157 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
t.Errorf("receipts mismatch: %v", err)
}
}

func TestHandleNewBlock(t *testing.T) {
t.Parallel()

gen := func(n int, g *core.BlockGen) {
if n%2 == 0 {
w := &types.Withdrawal{
Address: common.Address{0xaa},
Amount: 42,
}
g.AddWithdrawal(w)
}
}

backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen)
defer backend.close()

peer, _ := newTestPeer("peer", ETH68, backend)
defer peer.close()

v := new(uint32)
*v = 1
genBlobs := makeBlkBlobs(1, 2)
tx1 := types.NewTx(&types.BlobTx{
ChainID: new(uint256.Int).SetUint64(1),
GasTipCap: new(uint256.Int),
GasFeeCap: new(uint256.Int),
Gas: 0,
Value: new(uint256.Int),
Data: nil,
BlobFeeCap: new(uint256.Int),
BlobHashes: []common.Hash{common.HexToHash("0x34ec6e64f9cda8fe0451a391e4798085a3ef51a65ed1bfb016e34fc1a2028f8f"), common.HexToHash("0xb9a412e875f29fac436acde234f954e91173c4cf79814f6dcf630d8a6345747f")},
Sidecar: genBlobs[0],
V: new(uint256.Int),
R: new(uint256.Int),
S: new(uint256.Int),
})
sidecars := types.BlobTxSidecars{tx1.BlobTxSidecar()}
block := types.NewBlockWithHeader(&types.Header{
Number: big.NewInt(0),
Extra: []byte("test block"),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
})
dataNil := NewBlockPacket{
Block: block,
TD: big.NewInt(1),
Sidecars: nil,
}
dataNonNil := NewBlockPacket{
Block: block,
TD: big.NewInt(1),
Sidecars: sidecars,
}
sizeNonNil, rNonNil, _ := rlp.EncodeToReader(dataNonNil)
sizeNil, rNil, _ := rlp.EncodeToReader(dataNil)

// Define the test cases
testCases := []struct {
name string
msg p2p.Msg
err error
}{
{
name: "Valid block",
msg: p2p.Msg{
Code: 1,
Size: uint32(sizeNonNil),
Payload: rNonNil,
},
err: nil,
},
{
name: "Nil sidecars",
msg: p2p.Msg{
Code: 2,
Size: uint32(sizeNil),
Payload: rNil,
},
err: nil,
},
}

protos := []p2p.Protocol{
{
Name: "eth",
Version: ETH67,

Check failure on line 595 in eth/protocols/eth/handler_test.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.21.x, ubuntu-latest)

undefined: ETH67

Check failure on line 595 in eth/protocols/eth/handler_test.go

View workflow job for this annotation

GitHub Actions / unit-test (1.21.x, ubuntu-latest)

undefined: ETH67
},
{
Name: "eth",
Version: ETH68,
},
{
Name: "bsc",
Version: bsc.Bsc1,
},
}
caps := []p2p.Cap{
{
Name: "eth",
Version: ETH67,

Check failure on line 609 in eth/protocols/eth/handler_test.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.21.x, ubuntu-latest)

undefined: ETH67 (typecheck)

Check failure on line 609 in eth/protocols/eth/handler_test.go

View workflow job for this annotation

GitHub Actions / unit-test (1.21.x, ubuntu-latest)

undefined: ETH67
},
{
Name: "eth",
Version: ETH68,
},
{
Name: "bsc",
Version: bsc.Bsc1,
},
}
// Create a source handler to send messages through and a sink peer to receive them
p2pEthSrc, p2pEthSink := p2p.MsgPipe()
defer p2pEthSrc.Close()
defer p2pEthSink.Close()

localEth := NewPeer(ETH68, p2p.NewPeerWithProtocols(enode.ID{1}, protos, "", caps), p2pEthSrc, nil)

// Run the tests
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
err := handleNewBlock(backend, tc.msg, localEth)
if err != tc.err {
t.Errorf("expected error %v, got %v", tc.err, err)
}
})
}

}

func makeBlkBlobs(n, nPerTx int) types.BlobTxSidecars {
if n <= 0 {
return nil
}
ret := make(types.BlobTxSidecars, n)
for i := 0; i < n; i++ {
blobs := make([]kzg4844.Blob, nPerTx)
commitments := make([]kzg4844.Commitment, nPerTx)
proofs := make([]kzg4844.Proof, nPerTx)
for i := 0; i < nPerTx; i++ {
io.ReadFull(rand2.Reader, blobs[i][:])
commitments[i], _ = kzg4844.BlobToCommitment(blobs[i])
proofs[i], _ = kzg4844.ComputeBlobProof(blobs[i], commitments[i])
}
ret[i] = &types.BlobTxSidecar{
Blobs: blobs,
Commitments: commitments,
Proofs: proofs,
}
}
return ret
}
23 changes: 20 additions & 3 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,24 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ
lookups >= 2*maxBodiesServe {
break
}
if data := chain.GetBodyRLP(hash); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
body := chain.GetBody(hash)
if body == nil {
continue
}
blobs := chain.GetBlobsByHash(hash)
bodyWithBlobs := &BlockBody{
Transactions: body.Transactions,
Uncles: body.Uncles,
Withdrawals: body.Withdrawals,
Sidecars: blobs,
}
enc, err := rlp.EncodeToBytes(bodyWithBlobs)
if err != nil {
log.Error("block body encode err", "hash", hash, "err", err)
continue
}
bodies = append(bodies, enc)
bytes += len(enc)
}
return bodies
}
Expand Down Expand Up @@ -293,9 +307,12 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}

// Now that we have our packet, perform operations using the interface methods
if err := ann.sanityCheck(); err != nil {
return err
}

if hash := types.CalcUncleHash(ann.Block.Uncles()); hash != ann.Block.UncleHash() {
log.Warn("Propagated block has invalid uncles", "have", hash, "exp", ann.Block.UncleHash())
return nil // TODO(karalabe): return error eventually, but wait a few releases
Expand Down
Loading

0 comments on commit 8c8eac0

Please sign in to comment.